git clone https://github.com/vibeforge1111/vibeship-spawner-skills
simulation/agent-based-modeling/skill.yamlid: agent-based-modeling name: Agent-Based Modeling category: simulation description: | Design and implement agent-based models (ABM) for simulating complex systems with emergent behavior from individual agent interactions. version: 1.0.0
triggers:
- "agent-based"
- "multi-agent"
- "emergent behavior"
- "swarm simulation"
- "social simulation"
- "crowd modeling"
- "population dynamics"
- "individual-based"
provides:
- Agent architecture design
- Environment and spatial modeling
- Interaction rule specification
- Emergence detection and analysis
- Calibration and validation methods
- Scalable ABM implementation
patterns: agent_architecture: description: "Core agent design with state, perception, and action" example: | from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Dict, List, Any, Optional, Tuple from enum import Enum import numpy as np
@dataclass class AgentState: """Internal state of an agent.""" position: np.ndarray velocity: np.ndarray energy: float = 100.0 age: int = 0 alive: bool = True memory: Dict[str, Any] = field(default_factory=dict) class Agent(ABC): """ Base agent class with perception-decision-action loop. """ def __init__(self, agent_id: int, initial_state: AgentState): self.id = agent_id self.state = initial_state self.action_history: List[str] = [] @abstractmethod def perceive(self, environment: 'Environment') -> Dict[str, Any]: """ Gather information from environment. Returns perception dictionary with: - Nearby agents and their states - Environmental features - Resources in range """ pass @abstractmethod def decide(self, perception: Dict[str, Any]) -> str: """ Choose action based on perception and internal state. Returns action name to execute. """ pass @abstractmethod def act(self, action: str, environment: 'Environment') -> None: """ Execute action, modifying self and/or environment. """ pass def step(self, environment: 'Environment') -> None: """Execute one simulation step.""" if not self.state.alive: return perception = self.perceive(environment) action = self.decide(perception) self.act(action, environment) self.action_history.append(action) self.state.age += 1 class SimpleAgent(Agent): """Example: simple foraging agent.""" def __init__(self, agent_id: int, initial_state: AgentState): super().__init__(agent_id, initial_state) self.vision_range = 5.0 self.speed = 1.0 def perceive(self, environment: 'Environment') -> Dict[str, Any]: # Find nearby agents neighbors = environment.get_neighbors( self.state.position, self.vision_range ) # Find nearby resources resources = environment.get_resources_in_range( self.state.position, self.vision_range ) return { 'neighbors': neighbors, 'resources': resources, 'my_energy': self.state.energy } def decide(self, perception: Dict[str, Any]) -> str: # Low energy: seek food if perception['my_energy'] < 30: if perception['resources']: return 'seek_resource' return 'random_walk' # Avoid crowding if len(perception['neighbors']) > 5: return 'flee' return 'random_walk' def act(self, action: str, environment: 'Environment') -> None: if action == 'seek_resource': nearest = min( environment.get_resources_in_range( self.state.position, self.vision_range ), key=lambda r: np.linalg.norm(r.position - self.state.position), default=None ) if nearest: direction = nearest.position - self.state.position direction = direction / (np.linalg.norm(direction) + 1e-6) self.state.velocity = direction * self.speed elif action == 'flee': neighbors = environment.get_neighbors( self.state.position, self.vision_range ) if neighbors: center = np.mean([n.state.position for n in neighbors], axis=0) direction = self.state.position - center direction = direction / (np.linalg.norm(direction) + 1e-6) self.state.velocity = direction * self.speed elif action == 'random_walk': angle = np.random.uniform(0, 2 * np.pi) self.state.velocity = np.array([np.cos(angle), np.sin(angle)]) * self.speed # Apply movement self.state.position += self.state.velocity self.state.energy -= 0.1 # Movement cost # Check for resource consumption for resource in environment.get_resources_at(self.state.position): self.state.energy += resource.value environment.remove_resource(resource) # Death check if self.state.energy <= 0: self.state.alive = False
environment_design: description: "Spatial environment with resources and boundaries" example: | from dataclasses import dataclass, field from typing import List, Dict, Optional, Callable import numpy as np from scipy.spatial import cKDTree
@dataclass class Resource: """Resource in the environment.""" id: int position: np.ndarray value: float respawn_time: Optional[int] = None @dataclass class EnvironmentConfig: """Configuration for environment.""" width: float = 100.0 height: float = 100.0 periodic_boundary: bool = True # Wrap around edges initial_resources: int = 100 resource_respawn: bool = True class Environment: """ Spatial environment with agents and resources. Uses KD-tree for efficient neighbor queries. """ def __init__(self, config: EnvironmentConfig): self.config = config self.agents: Dict[int, Agent] = {} self.resources: Dict[int, Resource] = {} self.time_step = 0 self._agent_tree: Optional[cKDTree] = None self._tree_dirty = True def add_agent(self, agent: Agent) -> None: """Add agent to environment.""" # Ensure position is within bounds agent.state.position = self._wrap_position(agent.state.position) self.agents[agent.id] = agent self._tree_dirty = True def remove_agent(self, agent_id: int) -> None: """Remove agent from environment.""" if agent_id in self.agents: del self.agents[agent_id] self._tree_dirty = True def add_resource(self, resource: Resource) -> None: """Add resource to environment.""" resource.position = self._wrap_position(resource.position) self.resources[resource.id] = resource def remove_resource(self, resource: Resource) -> None: """Remove resource (consumed or expired).""" if resource.id in self.resources: del self.resources[resource.id] def get_neighbors( self, position: np.ndarray, radius: float, exclude_id: Optional[int] = None ) -> List[Agent]: """Get agents within radius of position.""" self._rebuild_tree_if_needed() if self._agent_tree is None: return [] # Query KD-tree indices = self._agent_tree.query_ball_point(position, radius) neighbors = [] agent_list = list(self.agents.values()) for i in indices: agent = agent_list[i] if exclude_id is None or agent.id != exclude_id: neighbors.append(agent) return neighbors def get_resources_in_range( self, position: np.ndarray, radius: float ) -> List[Resource]: """Get resources within radius.""" in_range = [] for resource in self.resources.values(): dist = self._distance(position, resource.position) if dist <= radius: in_range.append(resource) return in_range def get_resources_at(self, position: np.ndarray, tolerance: float = 0.5) -> List[Resource]: """Get resources at exact position (within tolerance).""" return [r for r in self.resources.values() if self._distance(position, r.position) < tolerance] def step(self) -> None: """Advance simulation by one time step.""" # Shuffle agent order for fairness agent_order = list(self.agents.values()) np.random.shuffle(agent_order) # Each agent takes a step for agent in agent_order: agent.step(self) # Remove dead agents dead_ids = [a.id for a in self.agents.values() if not a.state.alive] for agent_id in dead_ids: self.remove_agent(agent_id) # Resource respawn if self.config.resource_respawn: self._respawn_resources() self.time_step += 1 self._tree_dirty = True def _rebuild_tree_if_needed(self) -> None: """Rebuild KD-tree if agent positions changed.""" if self._tree_dirty and self.agents: positions = np.array([a.state.position for a in self.agents.values()]) self._agent_tree = cKDTree(positions) self._tree_dirty = False def _wrap_position(self, position: np.ndarray) -> np.ndarray: """Apply periodic boundary conditions.""" if self.config.periodic_boundary: return np.array([ position[0] % self.config.width, position[1] % self.config.height ]) else: return np.clip( position, [0, 0], [self.config.width, self.config.height] ) def _distance(self, p1: np.ndarray, p2: np.ndarray) -> float: """Compute distance respecting periodic boundaries.""" diff = p1 - p2 if self.config.periodic_boundary: diff[0] = min(abs(diff[0]), self.config.width - abs(diff[0])) diff[1] = min(abs(diff[1]), self.config.height - abs(diff[1])) return np.linalg.norm(diff) def _respawn_resources(self) -> None: """Respawn resources to maintain density.""" target = self.config.initial_resources current = len(self.resources) if current < target * 0.8: for _ in range(target - current): self.add_resource(Resource( id=np.random.randint(1000000), position=np.random.uniform( [0, 0], [self.config.width, self.config.height] ), value=np.random.uniform(5, 20) ))
behavioral_rules: description: "Common behavioral patterns for agents" example: | import numpy as np from typing import List, Callable
class BehaviorRules: """Collection of common agent behavioral rules.""" @staticmethod def flocking_boids( agent: Agent, neighbors: List[Agent], separation_weight: float = 1.5, alignment_weight: float = 1.0, cohesion_weight: float = 1.0, separation_dist: float = 2.0 ) -> np.ndarray: """ Reynolds flocking rules (Boids). Returns steering vector combining: - Separation: avoid crowding - Alignment: match neighbors' direction - Cohesion: move toward center of group """ if not neighbors: return np.zeros(2) pos = agent.state.position vel = agent.state.velocity # Separation separation = np.zeros(2) for neighbor in neighbors: diff = pos - neighbor.state.position dist = np.linalg.norm(diff) if dist < separation_dist and dist > 0: separation += diff / (dist ** 2) # Alignment if neighbors: avg_velocity = np.mean([n.state.velocity for n in neighbors], axis=0) alignment = avg_velocity - vel else: alignment = np.zeros(2) # Cohesion center = np.mean([n.state.position for n in neighbors], axis=0) cohesion = center - pos # Combine with weights steering = ( separation_weight * separation + alignment_weight * alignment + cohesion_weight * cohesion ) return steering @staticmethod def follow_gradient( agent: Agent, environment: 'Environment', gradient_func: Callable[[np.ndarray], float], step_size: float = 1.0 ) -> np.ndarray: """ Move uphill on a gradient field (chemotaxis, etc). """ pos = agent.state.position current_value = gradient_func(pos) # Sample nearby directions best_direction = np.zeros(2) best_value = current_value for angle in np.linspace(0, 2 * np.pi, 8, endpoint=False): direction = np.array([np.cos(angle), np.sin(angle)]) test_pos = pos + direction * step_size value = gradient_func(test_pos) if value > best_value: best_value = value best_direction = direction return best_direction @staticmethod def social_force( agent: Agent, neighbors: List[Agent], target: np.ndarray, desired_speed: float = 1.0, relaxation_time: float = 0.5, repulsion_strength: float = 2.0, repulsion_range: float = 2.0 ) -> np.ndarray: """ Social force model for pedestrian dynamics (Helbing). Balances goal-seeking with collision avoidance. """ pos = agent.state.position vel = agent.state.velocity # Driving force toward target to_target = target - pos desired_direction = to_target / (np.linalg.norm(to_target) + 1e-6) desired_velocity = desired_direction * desired_speed driving_force = (desired_velocity - vel) / relaxation_time # Repulsive forces from neighbors repulsion = np.zeros(2) for neighbor in neighbors: diff = pos - neighbor.state.position dist = np.linalg.norm(diff) if dist < repulsion_range and dist > 0: direction = diff / dist force = repulsion_strength * np.exp(-dist / repulsion_range) repulsion += force * direction return driving_force + repulsion @staticmethod def utility_based_decision( agent: Agent, actions: List[str], utility_func: Callable[[Agent, str], float], temperature: float = 1.0 ) -> str: """ Choose action based on utility with softmax exploration. """ utilities = np.array([utility_func(agent, a) for a in actions]) # Softmax for probabilistic selection exp_utilities = np.exp((utilities - np.max(utilities)) / temperature) probabilities = exp_utilities / np.sum(exp_utilities) return np.random.choice(actions, p=probabilities)
emergence_detection: description: "Detect and measure emergent patterns" example: | import numpy as np from typing import List, Dict, Tuple from scipy.spatial import distance from scipy.cluster.hierarchy import fclusterdata from dataclasses import dataclass
@dataclass class EmergenceMetrics: """Metrics for detecting emergence.""" clustering_coefficient: float polarization: float # Alignment of velocities segregation: float # Separation of types entropy: float # Behavioral diversity class EmergenceAnalyzer: """Detect and quantify emergent behavior in ABM.""" def __init__(self, environment: 'Environment'): self.env = environment self.history: List[EmergenceMetrics] = [] def analyze(self) -> EmergenceMetrics: """Compute emergence metrics for current state.""" agents = list(self.env.agents.values()) if len(agents) < 2: return EmergenceMetrics(0, 0, 0, 0) metrics = EmergenceMetrics( clustering_coefficient=self._compute_clustering(agents), polarization=self._compute_polarization(agents), segregation=self._compute_segregation(agents), entropy=self._compute_entropy(agents) ) self.history.append(metrics) return metrics def _compute_clustering(self, agents: List[Agent]) -> float: """Measure spatial clustering of agents.""" if len(agents) < 3: return 0 positions = np.array([a.state.position for a in agents]) # Use hierarchical clustering try: clusters = fclusterdata(positions, t=5.0, criterion='distance') n_clusters = len(set(clusters)) # Normalize by max possible clusters return 1 - (n_clusters / len(agents)) except: return 0 def _compute_polarization(self, agents: List[Agent]) -> float: """Measure alignment of agent velocities (0 = random, 1 = aligned).""" velocities = np.array([a.state.velocity for a in agents]) speeds = np.linalg.norm(velocities, axis=1, keepdims=True) # Normalize velocities moving_mask = speeds.flatten() > 0.01 if not any(moving_mask): return 0 normalized = velocities[moving_mask] / speeds[moving_mask] avg_direction = np.mean(normalized, axis=0) return np.linalg.norm(avg_direction) def _compute_segregation(self, agents: List[Agent]) -> float: """Measure segregation between agent types.""" # Assuming agents have a 'type' attribute if not hasattr(agents[0], 'agent_type'): return 0 types = set(a.agent_type for a in agents) if len(types) < 2: return 0 # Compute average distance to same vs different type same_type_dist = [] diff_type_dist = [] for a in agents: for b in agents: if a.id == b.id: continue dist = np.linalg.norm(a.state.position - b.state.position) if a.agent_type == b.agent_type: same_type_dist.append(dist) else: diff_type_dist.append(dist) if not same_type_dist or not diff_type_dist: return 0 # Segregation = ratio of cross-type to same-type distance return np.mean(diff_type_dist) / (np.mean(same_type_dist) + 1e-6) - 1 def _compute_entropy(self, agents: List[Agent]) -> float: """Measure behavioral diversity from action distributions.""" all_actions = [] for agent in agents: if agent.action_history: all_actions.append(agent.action_history[-1]) if not all_actions: return 0 # Compute action frequency distribution action_counts = {} for action in all_actions: action_counts[action] = action_counts.get(action, 0) + 1 probs = np.array(list(action_counts.values())) / len(all_actions) entropy = -np.sum(probs * np.log(probs + 1e-10)) # Normalize by max entropy max_entropy = np.log(len(action_counts)) return entropy / max_entropy if max_entropy > 0 else 0 def detect_phase_transition( self, metric: str = 'polarization', window: int = 100 ) -> bool: """Detect sudden change in emergence metric.""" if len(self.history) < window * 2: return False values = [getattr(m, metric) for m in self.history] recent = values[-window:] previous = values[-2*window:-window] # Compare distributions from scipy.stats import ks_2samp stat, p_value = ks_2samp(recent, previous) return p_value < 0.01 # Significant change
calibration_validation: description: "Calibrate and validate ABM against data" example: | import numpy as np from typing import Dict, List, Callable, Tuple from scipy.optimize import minimize, differential_evolution from dataclasses import dataclass
@dataclass class ValidationResult: """Result of ABM validation.""" metrics_match: Dict[str, float] # Metric name -> error overall_score: float passed: bool class ABMCalibrator: """ Calibrate ABM parameters to match empirical data. """ def __init__( self, model_factory: Callable[[Dict], 'Environment'], param_bounds: Dict[str, Tuple[float, float]], target_metrics: Dict[str, float] ): self.model_factory = model_factory self.param_bounds = param_bounds self.target_metrics = target_metrics def calibrate( self, n_iterations: int = 100, n_replications: int = 10 ) -> Dict[str, float]: """ Find parameters that minimize distance to target metrics. """ param_names = list(self.param_bounds.keys()) bounds = [self.param_bounds[p] for p in param_names] def objective(params_array): params = dict(zip(param_names, params_array)) # Run multiple replications all_metrics = [] for _ in range(n_replications): env = self.model_factory(params) for _ in range(100): # Run simulation env.step() metrics = self.compute_metrics(env) all_metrics.append(metrics) # Average metrics across replications avg_metrics = { k: np.mean([m[k] for m in all_metrics]) for k in all_metrics[0].keys() } # Compute distance to targets error = sum( ((avg_metrics[k] - self.target_metrics[k]) / self.target_metrics[k]) ** 2 for k in self.target_metrics.keys() ) return error # Global optimization result = differential_evolution( objective, bounds, maxiter=n_iterations, seed=42 ) return dict(zip(param_names, result.x)) def compute_metrics(self, env: 'Environment') -> Dict[str, float]: """Compute metrics to compare with targets.""" agents = list(env.agents.values()) return { 'population': len(agents), 'avg_energy': np.mean([a.state.energy for a in agents]) if agents else 0, 'clustering': self._compute_clustering(agents), # Add more metrics as needed } def _compute_clustering(self, agents: List[Agent]) -> float: """Compute clustering metric.""" if len(agents) < 2: return 0 positions = np.array([a.state.position for a in agents]) from scipy.spatial import distance dists = distance.pdist(positions) return 1.0 / (np.mean(dists) + 1e-6) class ABMValidator: """Validate ABM against empirical patterns.""" def __init__(self, model: 'Environment'): self.model = model def validate( self, empirical_data: Dict[str, any], tolerance: float = 0.1 ) -> ValidationResult: """ Validate model outputs against empirical data. """ # Run model and collect outputs model_metrics = self.run_and_collect() # Compare each metric metrics_match = {} for metric, target in empirical_data.items(): if metric in model_metrics: model_value = model_metrics[metric] if isinstance(target, (int, float)): error = abs(model_value - target) / (abs(target) + 1e-6) else: # Distribution comparison from scipy.stats import ks_2samp stat, _ = ks_2samp(model_value, target) error = stat metrics_match[metric] = error overall_score = np.mean(list(metrics_match.values())) passed = all(v < tolerance for v in metrics_match.values()) return ValidationResult( metrics_match=metrics_match, overall_score=overall_score, passed=passed ) def run_and_collect(self, n_steps: int = 1000) -> Dict[str, any]: """Run model and collect time series.""" populations = [] for _ in range(n_steps): self.model.step() populations.append(len(self.model.agents)) return { 'final_population': populations[-1], 'population_trajectory': populations, 'avg_population': np.mean(populations) }
anti_patterns:
-
pattern: "Sequential agent updates in fixed order" problem: "Creates artificial advantage for agents earlier in order" solution: "Shuffle agent order each step or use simultaneous update"
-
pattern: "O(N^2) neighbor search" problem: "Doesn't scale beyond hundreds of agents" solution: "Use spatial data structures: KD-tree, grid, quadtree"
-
pattern: "Perfect information for agents" problem: "Unrealistic, agents should have limited perception" solution: "Implement vision range, occlusion, information delay"
-
pattern: "No stochasticity in agent behavior" problem: "Deterministic behavior unrealistic, no exploration" solution: "Add randomness: probabilistic actions, noise in perception"
-
pattern: "Validating only average outcomes" problem: "Misses variance and distributional properties" solution: "Validate distributions, extreme events, temporal patterns"
-
pattern: "Ignoring boundary effects" problem: "Edge agents behave differently, biases results" solution: "Use periodic boundaries or buffer zones"
handoffs:
-
to: monte-carlo when: "Need uncertainty quantification in ABM outputs" context: "Run Monte Carlo over ABM parameter space"
-
to: discrete-event-simulation when: "Agents interact through discrete events" context: "Hybrid ABM-DES for efficiency"
-
to: physics-simulation when: "Agents have physical dynamics" context: "Realistic agent movement and collisions"
-
to: neural-networks when: "Learning agent behaviors" context: "Reinforcement learning for agent policies"
ecosystem: python_frameworks: - "Mesa - Python ABM framework" - "NetLogo - Classic ABM platform" - "MASON - Java high-performance ABM" - "Agents.jl - Julia ABM"
visualization: - "Mesa visualization - Built-in web viz" - "matplotlib animation - Python plots" - "pygame - Real-time 2D" - "Unity - 3D agent viz"
analysis: - "SALib - Sensitivity analysis" - "EMA Workbench - Exploratory modeling" - "pyABC - Approximate Bayesian computation"
references: books: - "Agent-Based and Individual-Based Modeling - Railsback & Grimm" - "Simulation for the Social Scientist - Gilbert & Troitzsch" - "Complex Adaptive Systems - Miller & Page"
papers: - "Flocks, Herds, and Schools - Reynolds" - "Social Force Model - Helbing & Molnar" - "ODD Protocol for ABM Description - Grimm et al."