Vibeship-spawner-skills agent-based-modeling

id: agent-based-modeling

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: simulation/agent-based-modeling/skill.yaml
source content

id: agent-based-modeling name: Agent-Based Modeling category: simulation description: | Design and implement agent-based models (ABM) for simulating complex systems with emergent behavior from individual agent interactions. version: 1.0.0

triggers:

  • "agent-based"
  • "multi-agent"
  • "emergent behavior"
  • "swarm simulation"
  • "social simulation"
  • "crowd modeling"
  • "population dynamics"
  • "individual-based"

provides:

  • Agent architecture design
  • Environment and spatial modeling
  • Interaction rule specification
  • Emergence detection and analysis
  • Calibration and validation methods
  • Scalable ABM implementation

patterns: agent_architecture: description: "Core agent design with state, perception, and action" example: | from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Dict, List, Any, Optional, Tuple from enum import Enum import numpy as np

  @dataclass
  class AgentState:
      """Internal state of an agent."""
      position: np.ndarray
      velocity: np.ndarray
      energy: float = 100.0
      age: int = 0
      alive: bool = True
      memory: Dict[str, Any] = field(default_factory=dict)

  class Agent(ABC):
      """
      Base agent class with perception-decision-action loop.
      """

      def __init__(self, agent_id: int, initial_state: AgentState):
          self.id = agent_id
          self.state = initial_state
          self.action_history: List[str] = []

      @abstractmethod
      def perceive(self, environment: 'Environment') -> Dict[str, Any]:
          """
          Gather information from environment.

          Returns perception dictionary with:
          - Nearby agents and their states
          - Environmental features
          - Resources in range
          """
          pass

      @abstractmethod
      def decide(self, perception: Dict[str, Any]) -> str:
          """
          Choose action based on perception and internal state.

          Returns action name to execute.
          """
          pass

      @abstractmethod
      def act(self, action: str, environment: 'Environment') -> None:
          """
          Execute action, modifying self and/or environment.
          """
          pass

      def step(self, environment: 'Environment') -> None:
          """Execute one simulation step."""
          if not self.state.alive:
              return

          perception = self.perceive(environment)
          action = self.decide(perception)
          self.act(action, environment)

          self.action_history.append(action)
          self.state.age += 1

  class SimpleAgent(Agent):
      """Example: simple foraging agent."""

      def __init__(self, agent_id: int, initial_state: AgentState):
          super().__init__(agent_id, initial_state)
          self.vision_range = 5.0
          self.speed = 1.0

      def perceive(self, environment: 'Environment') -> Dict[str, Any]:
          # Find nearby agents
          neighbors = environment.get_neighbors(
              self.state.position,
              self.vision_range
          )

          # Find nearby resources
          resources = environment.get_resources_in_range(
              self.state.position,
              self.vision_range
          )

          return {
              'neighbors': neighbors,
              'resources': resources,
              'my_energy': self.state.energy
          }

      def decide(self, perception: Dict[str, Any]) -> str:
          # Low energy: seek food
          if perception['my_energy'] < 30:
              if perception['resources']:
                  return 'seek_resource'
              return 'random_walk'

          # Avoid crowding
          if len(perception['neighbors']) > 5:
              return 'flee'

          return 'random_walk'

      def act(self, action: str, environment: 'Environment') -> None:
          if action == 'seek_resource':
              nearest = min(
                  environment.get_resources_in_range(
                      self.state.position, self.vision_range
                  ),
                  key=lambda r: np.linalg.norm(r.position - self.state.position),
                  default=None
              )
              if nearest:
                  direction = nearest.position - self.state.position
                  direction = direction / (np.linalg.norm(direction) + 1e-6)
                  self.state.velocity = direction * self.speed

          elif action == 'flee':
              neighbors = environment.get_neighbors(
                  self.state.position, self.vision_range
              )
              if neighbors:
                  center = np.mean([n.state.position for n in neighbors], axis=0)
                  direction = self.state.position - center
                  direction = direction / (np.linalg.norm(direction) + 1e-6)
                  self.state.velocity = direction * self.speed

          elif action == 'random_walk':
              angle = np.random.uniform(0, 2 * np.pi)
              self.state.velocity = np.array([np.cos(angle), np.sin(angle)]) * self.speed

          # Apply movement
          self.state.position += self.state.velocity
          self.state.energy -= 0.1  # Movement cost

          # Check for resource consumption
          for resource in environment.get_resources_at(self.state.position):
              self.state.energy += resource.value
              environment.remove_resource(resource)

          # Death check
          if self.state.energy <= 0:
              self.state.alive = False

environment_design: description: "Spatial environment with resources and boundaries" example: | from dataclasses import dataclass, field from typing import List, Dict, Optional, Callable import numpy as np from scipy.spatial import cKDTree

  @dataclass
  class Resource:
      """Resource in the environment."""
      id: int
      position: np.ndarray
      value: float
      respawn_time: Optional[int] = None

  @dataclass
  class EnvironmentConfig:
      """Configuration for environment."""
      width: float = 100.0
      height: float = 100.0
      periodic_boundary: bool = True  # Wrap around edges
      initial_resources: int = 100
      resource_respawn: bool = True

  class Environment:
      """
      Spatial environment with agents and resources.

      Uses KD-tree for efficient neighbor queries.
      """

      def __init__(self, config: EnvironmentConfig):
          self.config = config
          self.agents: Dict[int, Agent] = {}
          self.resources: Dict[int, Resource] = {}
          self.time_step = 0
          self._agent_tree: Optional[cKDTree] = None
          self._tree_dirty = True

      def add_agent(self, agent: Agent) -> None:
          """Add agent to environment."""
          # Ensure position is within bounds
          agent.state.position = self._wrap_position(agent.state.position)
          self.agents[agent.id] = agent
          self._tree_dirty = True

      def remove_agent(self, agent_id: int) -> None:
          """Remove agent from environment."""
          if agent_id in self.agents:
              del self.agents[agent_id]
              self._tree_dirty = True

      def add_resource(self, resource: Resource) -> None:
          """Add resource to environment."""
          resource.position = self._wrap_position(resource.position)
          self.resources[resource.id] = resource

      def remove_resource(self, resource: Resource) -> None:
          """Remove resource (consumed or expired)."""
          if resource.id in self.resources:
              del self.resources[resource.id]

      def get_neighbors(
          self,
          position: np.ndarray,
          radius: float,
          exclude_id: Optional[int] = None
      ) -> List[Agent]:
          """Get agents within radius of position."""
          self._rebuild_tree_if_needed()

          if self._agent_tree is None:
              return []

          # Query KD-tree
          indices = self._agent_tree.query_ball_point(position, radius)

          neighbors = []
          agent_list = list(self.agents.values())
          for i in indices:
              agent = agent_list[i]
              if exclude_id is None or agent.id != exclude_id:
                  neighbors.append(agent)

          return neighbors

      def get_resources_in_range(
          self,
          position: np.ndarray,
          radius: float
      ) -> List[Resource]:
          """Get resources within radius."""
          in_range = []
          for resource in self.resources.values():
              dist = self._distance(position, resource.position)
              if dist <= radius:
                  in_range.append(resource)
          return in_range

      def get_resources_at(self, position: np.ndarray, tolerance: float = 0.5) -> List[Resource]:
          """Get resources at exact position (within tolerance)."""
          return [r for r in self.resources.values()
                  if self._distance(position, r.position) < tolerance]

      def step(self) -> None:
          """Advance simulation by one time step."""
          # Shuffle agent order for fairness
          agent_order = list(self.agents.values())
          np.random.shuffle(agent_order)

          # Each agent takes a step
          for agent in agent_order:
              agent.step(self)

          # Remove dead agents
          dead_ids = [a.id for a in self.agents.values() if not a.state.alive]
          for agent_id in dead_ids:
              self.remove_agent(agent_id)

          # Resource respawn
          if self.config.resource_respawn:
              self._respawn_resources()

          self.time_step += 1
          self._tree_dirty = True

      def _rebuild_tree_if_needed(self) -> None:
          """Rebuild KD-tree if agent positions changed."""
          if self._tree_dirty and self.agents:
              positions = np.array([a.state.position for a in self.agents.values()])
              self._agent_tree = cKDTree(positions)
              self._tree_dirty = False

      def _wrap_position(self, position: np.ndarray) -> np.ndarray:
          """Apply periodic boundary conditions."""
          if self.config.periodic_boundary:
              return np.array([
                  position[0] % self.config.width,
                  position[1] % self.config.height
              ])
          else:
              return np.clip(
                  position,
                  [0, 0],
                  [self.config.width, self.config.height]
              )

      def _distance(self, p1: np.ndarray, p2: np.ndarray) -> float:
          """Compute distance respecting periodic boundaries."""
          diff = p1 - p2
          if self.config.periodic_boundary:
              diff[0] = min(abs(diff[0]), self.config.width - abs(diff[0]))
              diff[1] = min(abs(diff[1]), self.config.height - abs(diff[1]))
          return np.linalg.norm(diff)

      def _respawn_resources(self) -> None:
          """Respawn resources to maintain density."""
          target = self.config.initial_resources
          current = len(self.resources)
          if current < target * 0.8:
              for _ in range(target - current):
                  self.add_resource(Resource(
                      id=np.random.randint(1000000),
                      position=np.random.uniform(
                          [0, 0],
                          [self.config.width, self.config.height]
                      ),
                      value=np.random.uniform(5, 20)
                  ))

behavioral_rules: description: "Common behavioral patterns for agents" example: | import numpy as np from typing import List, Callable

  class BehaviorRules:
      """Collection of common agent behavioral rules."""

      @staticmethod
      def flocking_boids(
          agent: Agent,
          neighbors: List[Agent],
          separation_weight: float = 1.5,
          alignment_weight: float = 1.0,
          cohesion_weight: float = 1.0,
          separation_dist: float = 2.0
      ) -> np.ndarray:
          """
          Reynolds flocking rules (Boids).

          Returns steering vector combining:
          - Separation: avoid crowding
          - Alignment: match neighbors' direction
          - Cohesion: move toward center of group
          """
          if not neighbors:
              return np.zeros(2)

          pos = agent.state.position
          vel = agent.state.velocity

          # Separation
          separation = np.zeros(2)
          for neighbor in neighbors:
              diff = pos - neighbor.state.position
              dist = np.linalg.norm(diff)
              if dist < separation_dist and dist > 0:
                  separation += diff / (dist ** 2)

          # Alignment
          if neighbors:
              avg_velocity = np.mean([n.state.velocity for n in neighbors], axis=0)
              alignment = avg_velocity - vel
          else:
              alignment = np.zeros(2)

          # Cohesion
          center = np.mean([n.state.position for n in neighbors], axis=0)
          cohesion = center - pos

          # Combine with weights
          steering = (
              separation_weight * separation +
              alignment_weight * alignment +
              cohesion_weight * cohesion
          )

          return steering

      @staticmethod
      def follow_gradient(
          agent: Agent,
          environment: 'Environment',
          gradient_func: Callable[[np.ndarray], float],
          step_size: float = 1.0
      ) -> np.ndarray:
          """
          Move uphill on a gradient field (chemotaxis, etc).
          """
          pos = agent.state.position
          current_value = gradient_func(pos)

          # Sample nearby directions
          best_direction = np.zeros(2)
          best_value = current_value

          for angle in np.linspace(0, 2 * np.pi, 8, endpoint=False):
              direction = np.array([np.cos(angle), np.sin(angle)])
              test_pos = pos + direction * step_size
              value = gradient_func(test_pos)
              if value > best_value:
                  best_value = value
                  best_direction = direction

          return best_direction

      @staticmethod
      def social_force(
          agent: Agent,
          neighbors: List[Agent],
          target: np.ndarray,
          desired_speed: float = 1.0,
          relaxation_time: float = 0.5,
          repulsion_strength: float = 2.0,
          repulsion_range: float = 2.0
      ) -> np.ndarray:
          """
          Social force model for pedestrian dynamics (Helbing).

          Balances goal-seeking with collision avoidance.
          """
          pos = agent.state.position
          vel = agent.state.velocity

          # Driving force toward target
          to_target = target - pos
          desired_direction = to_target / (np.linalg.norm(to_target) + 1e-6)
          desired_velocity = desired_direction * desired_speed
          driving_force = (desired_velocity - vel) / relaxation_time

          # Repulsive forces from neighbors
          repulsion = np.zeros(2)
          for neighbor in neighbors:
              diff = pos - neighbor.state.position
              dist = np.linalg.norm(diff)
              if dist < repulsion_range and dist > 0:
                  direction = diff / dist
                  force = repulsion_strength * np.exp(-dist / repulsion_range)
                  repulsion += force * direction

          return driving_force + repulsion

      @staticmethod
      def utility_based_decision(
          agent: Agent,
          actions: List[str],
          utility_func: Callable[[Agent, str], float],
          temperature: float = 1.0
      ) -> str:
          """
          Choose action based on utility with softmax exploration.
          """
          utilities = np.array([utility_func(agent, a) for a in actions])

          # Softmax for probabilistic selection
          exp_utilities = np.exp((utilities - np.max(utilities)) / temperature)
          probabilities = exp_utilities / np.sum(exp_utilities)

          return np.random.choice(actions, p=probabilities)

emergence_detection: description: "Detect and measure emergent patterns" example: | import numpy as np from typing import List, Dict, Tuple from scipy.spatial import distance from scipy.cluster.hierarchy import fclusterdata from dataclasses import dataclass

  @dataclass
  class EmergenceMetrics:
      """Metrics for detecting emergence."""
      clustering_coefficient: float
      polarization: float  # Alignment of velocities
      segregation: float   # Separation of types
      entropy: float       # Behavioral diversity

  class EmergenceAnalyzer:
      """Detect and quantify emergent behavior in ABM."""

      def __init__(self, environment: 'Environment'):
          self.env = environment
          self.history: List[EmergenceMetrics] = []

      def analyze(self) -> EmergenceMetrics:
          """Compute emergence metrics for current state."""
          agents = list(self.env.agents.values())
          if len(agents) < 2:
              return EmergenceMetrics(0, 0, 0, 0)

          metrics = EmergenceMetrics(
              clustering_coefficient=self._compute_clustering(agents),
              polarization=self._compute_polarization(agents),
              segregation=self._compute_segregation(agents),
              entropy=self._compute_entropy(agents)
          )

          self.history.append(metrics)
          return metrics

      def _compute_clustering(self, agents: List[Agent]) -> float:
          """Measure spatial clustering of agents."""
          if len(agents) < 3:
              return 0

          positions = np.array([a.state.position for a in agents])

          # Use hierarchical clustering
          try:
              clusters = fclusterdata(positions, t=5.0, criterion='distance')
              n_clusters = len(set(clusters))
              # Normalize by max possible clusters
              return 1 - (n_clusters / len(agents))
          except:
              return 0

      def _compute_polarization(self, agents: List[Agent]) -> float:
          """Measure alignment of agent velocities (0 = random, 1 = aligned)."""
          velocities = np.array([a.state.velocity for a in agents])
          speeds = np.linalg.norm(velocities, axis=1, keepdims=True)

          # Normalize velocities
          moving_mask = speeds.flatten() > 0.01
          if not any(moving_mask):
              return 0

          normalized = velocities[moving_mask] / speeds[moving_mask]
          avg_direction = np.mean(normalized, axis=0)

          return np.linalg.norm(avg_direction)

      def _compute_segregation(self, agents: List[Agent]) -> float:
          """Measure segregation between agent types."""
          # Assuming agents have a 'type' attribute
          if not hasattr(agents[0], 'agent_type'):
              return 0

          types = set(a.agent_type for a in agents)
          if len(types) < 2:
              return 0

          # Compute average distance to same vs different type
          same_type_dist = []
          diff_type_dist = []

          for a in agents:
              for b in agents:
                  if a.id == b.id:
                      continue
                  dist = np.linalg.norm(a.state.position - b.state.position)
                  if a.agent_type == b.agent_type:
                      same_type_dist.append(dist)
                  else:
                      diff_type_dist.append(dist)

          if not same_type_dist or not diff_type_dist:
              return 0

          # Segregation = ratio of cross-type to same-type distance
          return np.mean(diff_type_dist) / (np.mean(same_type_dist) + 1e-6) - 1

      def _compute_entropy(self, agents: List[Agent]) -> float:
          """Measure behavioral diversity from action distributions."""
          all_actions = []
          for agent in agents:
              if agent.action_history:
                  all_actions.append(agent.action_history[-1])

          if not all_actions:
              return 0

          # Compute action frequency distribution
          action_counts = {}
          for action in all_actions:
              action_counts[action] = action_counts.get(action, 0) + 1

          probs = np.array(list(action_counts.values())) / len(all_actions)
          entropy = -np.sum(probs * np.log(probs + 1e-10))

          # Normalize by max entropy
          max_entropy = np.log(len(action_counts))
          return entropy / max_entropy if max_entropy > 0 else 0

      def detect_phase_transition(
          self,
          metric: str = 'polarization',
          window: int = 100
      ) -> bool:
          """Detect sudden change in emergence metric."""
          if len(self.history) < window * 2:
              return False

          values = [getattr(m, metric) for m in self.history]
          recent = values[-window:]
          previous = values[-2*window:-window]

          # Compare distributions
          from scipy.stats import ks_2samp
          stat, p_value = ks_2samp(recent, previous)

          return p_value < 0.01  # Significant change

calibration_validation: description: "Calibrate and validate ABM against data" example: | import numpy as np from typing import Dict, List, Callable, Tuple from scipy.optimize import minimize, differential_evolution from dataclasses import dataclass

  @dataclass
  class ValidationResult:
      """Result of ABM validation."""
      metrics_match: Dict[str, float]  # Metric name -> error
      overall_score: float
      passed: bool

  class ABMCalibrator:
      """
      Calibrate ABM parameters to match empirical data.
      """

      def __init__(
          self,
          model_factory: Callable[[Dict], 'Environment'],
          param_bounds: Dict[str, Tuple[float, float]],
          target_metrics: Dict[str, float]
      ):
          self.model_factory = model_factory
          self.param_bounds = param_bounds
          self.target_metrics = target_metrics

      def calibrate(
          self,
          n_iterations: int = 100,
          n_replications: int = 10
      ) -> Dict[str, float]:
          """
          Find parameters that minimize distance to target metrics.
          """
          param_names = list(self.param_bounds.keys())
          bounds = [self.param_bounds[p] for p in param_names]

          def objective(params_array):
              params = dict(zip(param_names, params_array))

              # Run multiple replications
              all_metrics = []
              for _ in range(n_replications):
                  env = self.model_factory(params)
                  for _ in range(100):  # Run simulation
                      env.step()
                  metrics = self.compute_metrics(env)
                  all_metrics.append(metrics)

              # Average metrics across replications
              avg_metrics = {
                  k: np.mean([m[k] for m in all_metrics])
                  for k in all_metrics[0].keys()
              }

              # Compute distance to targets
              error = sum(
                  ((avg_metrics[k] - self.target_metrics[k]) / self.target_metrics[k]) ** 2
                  for k in self.target_metrics.keys()
              )

              return error

          # Global optimization
          result = differential_evolution(
              objective,
              bounds,
              maxiter=n_iterations,
              seed=42
          )

          return dict(zip(param_names, result.x))

      def compute_metrics(self, env: 'Environment') -> Dict[str, float]:
          """Compute metrics to compare with targets."""
          agents = list(env.agents.values())
          return {
              'population': len(agents),
              'avg_energy': np.mean([a.state.energy for a in agents]) if agents else 0,
              'clustering': self._compute_clustering(agents),
              # Add more metrics as needed
          }

      def _compute_clustering(self, agents: List[Agent]) -> float:
          """Compute clustering metric."""
          if len(agents) < 2:
              return 0
          positions = np.array([a.state.position for a in agents])
          from scipy.spatial import distance
          dists = distance.pdist(positions)
          return 1.0 / (np.mean(dists) + 1e-6)

  class ABMValidator:
      """Validate ABM against empirical patterns."""

      def __init__(self, model: 'Environment'):
          self.model = model

      def validate(
          self,
          empirical_data: Dict[str, any],
          tolerance: float = 0.1
      ) -> ValidationResult:
          """
          Validate model outputs against empirical data.
          """
          # Run model and collect outputs
          model_metrics = self.run_and_collect()

          # Compare each metric
          metrics_match = {}
          for metric, target in empirical_data.items():
              if metric in model_metrics:
                  model_value = model_metrics[metric]
                  if isinstance(target, (int, float)):
                      error = abs(model_value - target) / (abs(target) + 1e-6)
                  else:
                      # Distribution comparison
                      from scipy.stats import ks_2samp
                      stat, _ = ks_2samp(model_value, target)
                      error = stat
                  metrics_match[metric] = error

          overall_score = np.mean(list(metrics_match.values()))
          passed = all(v < tolerance for v in metrics_match.values())

          return ValidationResult(
              metrics_match=metrics_match,
              overall_score=overall_score,
              passed=passed
          )

      def run_and_collect(self, n_steps: int = 1000) -> Dict[str, any]:
          """Run model and collect time series."""
          populations = []
          for _ in range(n_steps):
              self.model.step()
              populations.append(len(self.model.agents))

          return {
              'final_population': populations[-1],
              'population_trajectory': populations,
              'avg_population': np.mean(populations)
          }

anti_patterns:

  • pattern: "Sequential agent updates in fixed order" problem: "Creates artificial advantage for agents earlier in order" solution: "Shuffle agent order each step or use simultaneous update"

  • pattern: "O(N^2) neighbor search" problem: "Doesn't scale beyond hundreds of agents" solution: "Use spatial data structures: KD-tree, grid, quadtree"

  • pattern: "Perfect information for agents" problem: "Unrealistic, agents should have limited perception" solution: "Implement vision range, occlusion, information delay"

  • pattern: "No stochasticity in agent behavior" problem: "Deterministic behavior unrealistic, no exploration" solution: "Add randomness: probabilistic actions, noise in perception"

  • pattern: "Validating only average outcomes" problem: "Misses variance and distributional properties" solution: "Validate distributions, extreme events, temporal patterns"

  • pattern: "Ignoring boundary effects" problem: "Edge agents behave differently, biases results" solution: "Use periodic boundaries or buffer zones"

handoffs:

  • to: monte-carlo when: "Need uncertainty quantification in ABM outputs" context: "Run Monte Carlo over ABM parameter space"

  • to: discrete-event-simulation when: "Agents interact through discrete events" context: "Hybrid ABM-DES for efficiency"

  • to: physics-simulation when: "Agents have physical dynamics" context: "Realistic agent movement and collisions"

  • to: neural-networks when: "Learning agent behaviors" context: "Reinforcement learning for agent policies"

ecosystem: python_frameworks: - "Mesa - Python ABM framework" - "NetLogo - Classic ABM platform" - "MASON - Java high-performance ABM" - "Agents.jl - Julia ABM"

visualization: - "Mesa visualization - Built-in web viz" - "matplotlib animation - Python plots" - "pygame - Real-time 2D" - "Unity - 3D agent viz"

analysis: - "SALib - Sensitivity analysis" - "EMA Workbench - Exploratory modeling" - "pyABC - Approximate Bayesian computation"

references: books: - "Agent-Based and Individual-Based Modeling - Railsback & Grimm" - "Simulation for the Social Scientist - Gilbert & Troitzsch" - "Complex Adaptive Systems - Miller & Page"

papers: - "Flocks, Herds, and Schools - Reynolds" - "Social Force Model - Helbing & Molnar" - "ODD Protocol for ABM Description - Grimm et al."