git clone https://github.com/vibeforge1111/vibeship-spawner-skills
simulation/discrete-event-simulation/skill.yamlid: discrete-event-simulation name: Discrete Event Simulation category: simulation description: | Design and implement discrete event simulations (DES) for systems where state changes occur at discrete time points: queues, manufacturing, logistics, networks. version: 1.0.0
triggers:
- "discrete event"
- "DES simulation"
- "queuing system"
- "queue model"
- "manufacturing simulation"
- "process simulation"
- "SimPy"
- "event-driven simulation"
provides:
- Event scheduling and processing
- Process-oriented simulation design
- Resource and queue modeling
- Statistical output analysis
- Warm-up and termination strategies
- Animation and visualization
patterns: event_scheduling: description: "Core event-driven simulation engine" example: | import heapq from dataclasses import dataclass, field from typing import Callable, Any, List, Dict, Optional from abc import ABC, abstractmethod
@dataclass(order=True) class Event: """Simulation event with scheduled time.""" time: float priority: int = 0 # Lower = higher priority (tiebreaker) event_type: str = field(compare=False, default="") data: Any = field(compare=False, default=None) callback: Callable = field(compare=False, default=None) class SimulationEngine: """ Discrete event simulation engine. Uses priority queue for event scheduling. Supports event creation, cancellation, and conditional events. """ def __init__(self): self.current_time: float = 0.0 self.event_queue: List[Event] = [] self.event_count: int = 0 self.running: bool = False # Statistics self.events_processed: int = 0 self.event_type_counts: Dict[str, int] = {} # Event handlers by type self.handlers: Dict[str, Callable] = {} def schedule( self, delay: float, event_type: str, data: Any = None, priority: int = 0 ) -> Event: """Schedule event to occur after delay.""" event = Event( time=self.current_time + delay, priority=priority, event_type=event_type, data=data ) heapq.heappush(self.event_queue, event) self.event_count += 1 return event def schedule_at( self, time: float, event_type: str, data: Any = None, priority: int = 0 ) -> Event: """Schedule event at absolute time.""" if time < self.current_time: raise ValueError(f"Cannot schedule event in past: {time} < {self.current_time}") event = Event( time=time, priority=priority, event_type=event_type, data=data ) heapq.heappush(self.event_queue, event) self.event_count += 1 return event def cancel(self, event: Event) -> bool: """Cancel a scheduled event.""" try: self.event_queue.remove(event) heapq.heapify(self.event_queue) return True except ValueError: return False def register_handler(self, event_type: str, handler: Callable) -> None: """Register handler for event type.""" self.handlers[event_type] = handler def run(self, until: Optional[float] = None, max_events: Optional[int] = None) -> None: """Run simulation until time limit or max events.""" self.running = True while self.running and self.event_queue: # Check termination conditions if until is not None and self.event_queue[0].time > until: break if max_events is not None and self.events_processed >= max_events: break # Pop next event event = heapq.heappop(self.event_queue) self.current_time = event.time # Process event if event.callback: event.callback(event) elif event.event_type in self.handlers: self.handlers[event.event_type](event) self.events_processed += 1 self.event_type_counts[event.event_type] = \ self.event_type_counts.get(event.event_type, 0) + 1 def stop(self) -> None: """Stop simulation.""" self.running = False @property def now(self) -> float: """Current simulation time.""" return self.current_time # Example: Simple M/M/1 queue class MM1Queue: def __init__(self, engine: SimulationEngine, arrival_rate: float, service_rate: float): self.engine = engine self.arrival_rate = arrival_rate self.service_rate = service_rate self.queue: List[float] = [] # Arrival times of waiting customers self.server_busy = False # Statistics self.customers_served = 0 self.total_wait_time = 0.0 engine.register_handler('arrival', self.handle_arrival) engine.register_handler('departure', self.handle_departure) def start(self): """Schedule first arrival.""" self.engine.schedule( delay=np.random.exponential(1/self.arrival_rate), event_type='arrival' ) def handle_arrival(self, event: Event): # Schedule next arrival self.engine.schedule( delay=np.random.exponential(1/self.arrival_rate), event_type='arrival' ) if not self.server_busy: # Start service immediately self.server_busy = True self.engine.schedule( delay=np.random.exponential(1/self.service_rate), event_type='departure', data=self.engine.now ) else: # Join queue self.queue.append(self.engine.now) def handle_departure(self, event: Event): arrival_time = event.data self.total_wait_time += self.engine.now - arrival_time self.customers_served += 1 if self.queue: # Serve next customer next_arrival = self.queue.pop(0) self.engine.schedule( delay=np.random.exponential(1/self.service_rate), event_type='departure', data=next_arrival ) else: self.server_busy = False
process_oriented: description: "SimPy-style process-oriented simulation" example: | import simpy from typing import Generator, List import numpy as np
class ManufacturingLine: """ Manufacturing simulation using SimPy processes. Entities flow through stations, compete for resources. """ def __init__(self, env: simpy.Environment): self.env = env # Resources self.machines = { 'cutting': simpy.Resource(env, capacity=2), 'welding': simpy.Resource(env, capacity=3), 'assembly': simpy.Resource(env, capacity=2), 'testing': simpy.Resource(env, capacity=1) } # Shared buffer between stations self.buffers = { 'cutting_out': simpy.Store(env, capacity=10), 'welding_out': simpy.Store(env, capacity=10), 'assembly_out': simpy.Store(env, capacity=10) } # Statistics self.completed_jobs = [] self.blocked_time = {k: 0 for k in self.machines} def job_process(self, job_id: int) -> Generator: """Main job process through manufacturing line.""" arrival_time = self.env.now # Stage 1: Cutting yield from self.process_at_station('cutting', job_id, 5, 2) # Stage 2: Welding yield from self.process_at_station('welding', job_id, 8, 3) # Stage 3: Assembly yield from self.process_at_station('assembly', job_id, 10, 4) # Stage 4: Testing yield from self.process_at_station('testing', job_id, 3, 1) # Job complete self.completed_jobs.append({ 'job_id': job_id, 'arrival': arrival_time, 'completion': self.env.now, 'flow_time': self.env.now - arrival_time }) def process_at_station( self, station: str, job_id: int, mean_time: float, std_time: float ) -> Generator: """Process job at a station.""" request_time = self.env.now with self.machines[station].request() as req: yield req wait_time = self.env.now - request_time # Process process_time = max(0, np.random.normal(mean_time, std_time)) yield self.env.timeout(process_time) def job_arrivals(self, mean_interarrival: float) -> Generator: """Generate arriving jobs.""" job_id = 0 while True: yield self.env.timeout(np.random.exponential(mean_interarrival)) self.env.process(self.job_process(job_id)) job_id += 1 # Run simulation def run_manufacturing_sim(duration: float = 1000): env = simpy.Environment() line = ManufacturingLine(env) # Start job arrivals env.process(line.job_arrivals(mean_interarrival=5)) # Run env.run(until=duration) # Analyze results flow_times = [j['flow_time'] for j in line.completed_jobs] print(f"Jobs completed: {len(line.completed_jobs)}") print(f"Avg flow time: {np.mean(flow_times):.2f}") print(f"Throughput: {len(line.completed_jobs)/duration:.2f} jobs/time") return line
resource_modeling: description: "Resource pools, preemption, and priorities" example: | import simpy from typing import Optional, List from dataclasses import dataclass from enum import IntEnum
class Priority(IntEnum): LOW = 3 NORMAL = 2 HIGH = 1 URGENT = 0 @dataclass class Job: id: int priority: Priority preemptable: bool = False processing_time: float = 0 class PreemptiveResourcePool: """ Resource pool with preemption support. Higher priority jobs can preempt lower priority ones. """ def __init__(self, env: simpy.Environment, capacity: int): self.env = env self.resource = simpy.PreemptiveResource(env, capacity=capacity) self.preemption_count = 0 self.utilization_time = 0 self.last_utilization_check = 0 def process(self, job: Job) -> simpy.events.Process: """Process a job, potentially preempting others.""" return self.env.process(self._process(job)) def _process(self, job: Job): remaining_time = job.processing_time start_time = self.env.now while remaining_time > 0: try: with self.resource.request(priority=job.priority) as req: yield req process_start = self.env.now # Try to complete processing yield self.env.timeout(remaining_time) remaining_time = 0 except simpy.Interrupt: # Preempted! Record progress self.preemption_count += 1 elapsed = self.env.now - process_start remaining_time -= elapsed # Re-enter queue with remaining work # (Loop continues) self.utilization_time += job.processing_time class ResourcePool: """Pool of identical resources with failure modeling.""" def __init__( self, env: simpy.Environment, capacity: int, mttf: Optional[float] = None, # Mean time to failure mttr: Optional[float] = None # Mean time to repair ): self.env = env self.capacity = capacity self.available = capacity self.container = simpy.Container(env, capacity=capacity, init=capacity) self.mttf = mttf self.mttr = mttr # Track failures self.failures: List[dict] = [] if mttf and mttr: for i in range(capacity): env.process(self._failure_process(i)) def _failure_process(self, unit_id: int): """Model random failures and repairs.""" while True: # Time until failure yield self.env.timeout(np.random.exponential(self.mttf)) # Fail: remove from pool yield self.container.get(1) failure_time = self.env.now self.failures.append({ 'unit': unit_id, 'fail_time': failure_time }) # Repair duration yield self.env.timeout(np.random.exponential(self.mttr)) # Restore to pool yield self.container.put(1) self.failures[-1]['repair_time'] = self.env.now def request(self, amount: int = 1): """Request resources from pool.""" return self.container.get(amount) def release(self, amount: int = 1): """Return resources to pool.""" return self.container.put(amount)
statistical_analysis: description: "Output analysis for simulation experiments" example: | import numpy as np from typing import List, Tuple, Dict from dataclasses import dataclass from scipy import stats
@dataclass class SimulationOutput: """Output from a single simulation run.""" observations: np.ndarray time_stamps: np.ndarray run_id: int class OutputAnalyzer: """ Statistical analysis of simulation outputs. Handles warm-up detection, batch means, confidence intervals. """ def __init__(self, outputs: List[SimulationOutput]): self.outputs = outputs self.n_runs = len(outputs) def detect_warmup( self, observations: np.ndarray, method: str = 'mser' ) -> int: """ Detect warm-up period to truncate. Methods: - mser: Marginal Standard Error Rule - visual: Return data for visual inspection """ n = len(observations) if method == 'mser': # MSER: find truncation that minimizes std error best_d = 0 min_mser = float('inf') for d in range(n // 2): truncated = observations[d:] if len(truncated) < 10: break mser = np.var(truncated) / len(truncated) if mser < min_mser: min_mser = mser best_d = d return best_d return 0 def batch_means( self, observations: np.ndarray, n_batches: int = 20 ) -> Tuple[float, float]: """ Batch means method for autocorrelated data. Returns (mean, std_error). """ batch_size = len(observations) // n_batches if batch_size < 1: return np.mean(observations), np.std(observations) / np.sqrt(len(observations)) batches = [ np.mean(observations[i*batch_size:(i+1)*batch_size]) for i in range(n_batches) ] mean = np.mean(batches) std_error = np.std(batches) / np.sqrt(n_batches) return mean, std_error def confidence_interval( self, confidence: float = 0.95 ) -> Dict[str, Tuple[float, float, float]]: """ Compute confidence intervals across replications. Returns {metric: (mean, lower, upper)}. """ results = {} # Get final values from each run final_values = [out.observations[-1] for out in self.outputs] mean = np.mean(final_values) std = np.std(final_values, ddof=1) n = len(final_values) # t-distribution for small samples t_crit = stats.t.ppf((1 + confidence) / 2, n - 1) margin = t_crit * std / np.sqrt(n) results['final_value'] = (mean, mean - margin, mean + margin) return results def paired_comparison( self, other_outputs: List[SimulationOutput], confidence: float = 0.95 ) -> Dict[str, any]: """ Paired t-test comparison of two systems. Uses common random numbers for variance reduction. """ assert len(self.outputs) == len(other_outputs), \ "Need same number of replications" differences = [ self.outputs[i].observations[-1] - other_outputs[i].observations[-1] for i in range(self.n_runs) ] mean_diff = np.mean(differences) std_diff = np.std(differences, ddof=1) n = len(differences) # t-test t_stat = mean_diff / (std_diff / np.sqrt(n)) p_value = 2 * (1 - stats.t.cdf(abs(t_stat), n - 1)) # Confidence interval on difference t_crit = stats.t.ppf((1 + confidence) / 2, n - 1) margin = t_crit * std_diff / np.sqrt(n) return { 'mean_difference': mean_diff, 'ci': (mean_diff - margin, mean_diff + margin), 'p_value': p_value, 'significant': p_value < (1 - confidence) } def ranking_and_selection( self, systems: List[List[SimulationOutput]], indifference_zone: float = 0.1, confidence: float = 0.95 ) -> int: """ Select best system with statistical guarantee. Returns index of best system. """ n_systems = len(systems) means = [] for system_outputs in systems: values = [out.observations[-1] for out in system_outputs] means.append(np.mean(values)) # Rinott's procedure for indifference zone best_idx = np.argmin(means) # Assuming lower is better return best_idx
common_random_numbers: description: "Variance reduction via synchronized random streams" example: | import numpy as np from typing import Callable, List, Dict
class CRNManager: """ Common Random Numbers for fair system comparison. Ensures same random events occur in both systems. """ def __init__(self, seed: int = 42): self.base_seed = seed self.stream_seeds: Dict[str, int] = {} self.stream_rngs: Dict[str, np.random.Generator] = {} self.stream_index = 0 def get_stream(self, name: str) -> np.random.Generator: """Get or create a random stream by name.""" if name not in self.stream_rngs: self.stream_seeds[name] = self.base_seed + self.stream_index self.stream_rngs[name] = np.random.default_rng( self.stream_seeds[name] ) self.stream_index += 1 return self.stream_rngs[name] def reset_all_streams(self) -> None: """Reset all streams to initial state for replication.""" for name, seed in self.stream_seeds.items(): self.stream_rngs[name] = np.random.default_rng(seed) def advance_replication(self) -> None: """Advance to next replication (different base seed).""" self.base_seed += 1000 # Large jump for name in self.stream_seeds: self.stream_seeds[name] = self.base_seed + \ list(self.stream_seeds.keys()).index(name) self.stream_rngs[name] = np.random.default_rng( self.stream_seeds[name] ) def compare_with_crn( system_a: Callable[[CRNManager], float], system_b: Callable[[CRNManager], float], n_replications: int = 30 ) -> Dict[str, float]: """ Compare two systems using CRN. Both systems see same random events per replication. """ crn = CRNManager() differences = [] for rep in range(n_replications): # Reset streams so both systems see same random numbers crn.reset_all_streams() result_a = system_a(crn) crn.reset_all_streams() result_b = system_b(crn) differences.append(result_a - result_b) crn.advance_replication() mean_diff = np.mean(differences) std_diff = np.std(differences, ddof=1) se = std_diff / np.sqrt(n_replications) # Variance reduction achieved # Compare with independent sampling variance var_with_crn = np.var(differences) return { 'mean_difference': mean_diff, 'std_error': se, 'variance': var_with_crn }
anti_patterns:
-
pattern: "Floating point time comparison" problem: "Accumulated errors cause missed or duplicate events" solution: "Use integer time units or tolerance-based comparison"
-
pattern: "No warm-up period" problem: "Transient effects bias steady-state estimates" solution: "Detect and truncate warm-up using MSER or other methods"
-
pattern: "Single replication" problem: "Can't quantify output variability" solution: "Run multiple replications, compute confidence intervals"
-
pattern: "Independent streams for compared systems" problem: "High variance obscures differences" solution: "Use Common Random Numbers for paired comparison"
-
pattern: "Infinite queue assumption" problem: "Real systems have limits, blocking, abandonment" solution: "Model finite buffers, abandonment, blocking explicitly"
-
pattern: "Ignoring event priority for simultaneous events" problem: "Tie-breaking affects results" solution: "Define explicit priority rules, document assumptions"
handoffs:
-
to: agent-based-modeling when: "Entities need complex behavior beyond queuing" context: "Hybrid DES-ABM simulation"
-
to: monte-carlo when: "Need uncertainty quantification on inputs" context: "Parameter uncertainty propagation"
-
to: physics-simulation when: "Physical processes between events" context: "Hybrid continuous-discrete simulation"
-
to: backend when: "Building simulation as service" context: "API design for simulation inputs/outputs"
ecosystem: python_libraries: - "SimPy - Process-oriented DES" - "Salabim - DES with animation" - "Ciw - Queueing networks" - "Simulus - High-performance DES"
commercial: - "Arena - General purpose DES" - "AnyLogic - Multi-method simulation" - "FlexSim - Manufacturing focus" - "SIMUL8 - Healthcare, services"
analysis: - "SciPy - Statistical analysis" - "Pandas - Data analysis" - "Matplotlib - Visualization"
references: books: - "Simulation Modeling and Analysis - Law" - "Discrete-Event System Simulation - Banks et al." - "Simulation with Arena - Kelton et al."
papers: - "Output Analysis for Simulations - Law & Kelton" - "Common Random Numbers - Glasserman & Yao" - "Ranking and Selection - Kim & Nelson"