Vibeship-spawner-skills discrete-event-simulation

id: discrete-event-simulation

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: simulation/discrete-event-simulation/skill.yaml
source content

id: discrete-event-simulation name: Discrete Event Simulation category: simulation description: | Design and implement discrete event simulations (DES) for systems where state changes occur at discrete time points: queues, manufacturing, logistics, networks. version: 1.0.0

triggers:

  • "discrete event"
  • "DES simulation"
  • "queuing system"
  • "queue model"
  • "manufacturing simulation"
  • "process simulation"
  • "SimPy"
  • "event-driven simulation"

provides:

  • Event scheduling and processing
  • Process-oriented simulation design
  • Resource and queue modeling
  • Statistical output analysis
  • Warm-up and termination strategies
  • Animation and visualization

patterns: event_scheduling: description: "Core event-driven simulation engine" example: | import heapq from dataclasses import dataclass, field from typing import Callable, Any, List, Dict, Optional from abc import ABC, abstractmethod

  @dataclass(order=True)
  class Event:
      """Simulation event with scheduled time."""
      time: float
      priority: int = 0  # Lower = higher priority (tiebreaker)
      event_type: str = field(compare=False, default="")
      data: Any = field(compare=False, default=None)
      callback: Callable = field(compare=False, default=None)

  class SimulationEngine:
      """
      Discrete event simulation engine.

      Uses priority queue for event scheduling.
      Supports event creation, cancellation, and conditional events.
      """

      def __init__(self):
          self.current_time: float = 0.0
          self.event_queue: List[Event] = []
          self.event_count: int = 0
          self.running: bool = False

          # Statistics
          self.events_processed: int = 0
          self.event_type_counts: Dict[str, int] = {}

          # Event handlers by type
          self.handlers: Dict[str, Callable] = {}

      def schedule(
          self,
          delay: float,
          event_type: str,
          data: Any = None,
          priority: int = 0
      ) -> Event:
          """Schedule event to occur after delay."""
          event = Event(
              time=self.current_time + delay,
              priority=priority,
              event_type=event_type,
              data=data
          )
          heapq.heappush(self.event_queue, event)
          self.event_count += 1
          return event

      def schedule_at(
          self,
          time: float,
          event_type: str,
          data: Any = None,
          priority: int = 0
      ) -> Event:
          """Schedule event at absolute time."""
          if time < self.current_time:
              raise ValueError(f"Cannot schedule event in past: {time} < {self.current_time}")

          event = Event(
              time=time,
              priority=priority,
              event_type=event_type,
              data=data
          )
          heapq.heappush(self.event_queue, event)
          self.event_count += 1
          return event

      def cancel(self, event: Event) -> bool:
          """Cancel a scheduled event."""
          try:
              self.event_queue.remove(event)
              heapq.heapify(self.event_queue)
              return True
          except ValueError:
              return False

      def register_handler(self, event_type: str, handler: Callable) -> None:
          """Register handler for event type."""
          self.handlers[event_type] = handler

      def run(self, until: Optional[float] = None, max_events: Optional[int] = None) -> None:
          """Run simulation until time limit or max events."""
          self.running = True

          while self.running and self.event_queue:
              # Check termination conditions
              if until is not None and self.event_queue[0].time > until:
                  break
              if max_events is not None and self.events_processed >= max_events:
                  break

              # Pop next event
              event = heapq.heappop(self.event_queue)
              self.current_time = event.time

              # Process event
              if event.callback:
                  event.callback(event)
              elif event.event_type in self.handlers:
                  self.handlers[event.event_type](event)

              self.events_processed += 1
              self.event_type_counts[event.event_type] = \
                  self.event_type_counts.get(event.event_type, 0) + 1

      def stop(self) -> None:
          """Stop simulation."""
          self.running = False

      @property
      def now(self) -> float:
          """Current simulation time."""
          return self.current_time

  # Example: Simple M/M/1 queue
  class MM1Queue:
      def __init__(self, engine: SimulationEngine, arrival_rate: float, service_rate: float):
          self.engine = engine
          self.arrival_rate = arrival_rate
          self.service_rate = service_rate
          self.queue: List[float] = []  # Arrival times of waiting customers
          self.server_busy = False

          # Statistics
          self.customers_served = 0
          self.total_wait_time = 0.0

          engine.register_handler('arrival', self.handle_arrival)
          engine.register_handler('departure', self.handle_departure)

      def start(self):
          """Schedule first arrival."""
          self.engine.schedule(
              delay=np.random.exponential(1/self.arrival_rate),
              event_type='arrival'
          )

      def handle_arrival(self, event: Event):
          # Schedule next arrival
          self.engine.schedule(
              delay=np.random.exponential(1/self.arrival_rate),
              event_type='arrival'
          )

          if not self.server_busy:
              # Start service immediately
              self.server_busy = True
              self.engine.schedule(
                  delay=np.random.exponential(1/self.service_rate),
                  event_type='departure',
                  data=self.engine.now
              )
          else:
              # Join queue
              self.queue.append(self.engine.now)

      def handle_departure(self, event: Event):
          arrival_time = event.data
          self.total_wait_time += self.engine.now - arrival_time
          self.customers_served += 1

          if self.queue:
              # Serve next customer
              next_arrival = self.queue.pop(0)
              self.engine.schedule(
                  delay=np.random.exponential(1/self.service_rate),
                  event_type='departure',
                  data=next_arrival
              )
          else:
              self.server_busy = False

process_oriented: description: "SimPy-style process-oriented simulation" example: | import simpy from typing import Generator, List import numpy as np

  class ManufacturingLine:
      """
      Manufacturing simulation using SimPy processes.

      Entities flow through stations, compete for resources.
      """

      def __init__(self, env: simpy.Environment):
          self.env = env

          # Resources
          self.machines = {
              'cutting': simpy.Resource(env, capacity=2),
              'welding': simpy.Resource(env, capacity=3),
              'assembly': simpy.Resource(env, capacity=2),
              'testing': simpy.Resource(env, capacity=1)
          }

          # Shared buffer between stations
          self.buffers = {
              'cutting_out': simpy.Store(env, capacity=10),
              'welding_out': simpy.Store(env, capacity=10),
              'assembly_out': simpy.Store(env, capacity=10)
          }

          # Statistics
          self.completed_jobs = []
          self.blocked_time = {k: 0 for k in self.machines}

      def job_process(self, job_id: int) -> Generator:
          """Main job process through manufacturing line."""
          arrival_time = self.env.now

          # Stage 1: Cutting
          yield from self.process_at_station('cutting', job_id, 5, 2)

          # Stage 2: Welding
          yield from self.process_at_station('welding', job_id, 8, 3)

          # Stage 3: Assembly
          yield from self.process_at_station('assembly', job_id, 10, 4)

          # Stage 4: Testing
          yield from self.process_at_station('testing', job_id, 3, 1)

          # Job complete
          self.completed_jobs.append({
              'job_id': job_id,
              'arrival': arrival_time,
              'completion': self.env.now,
              'flow_time': self.env.now - arrival_time
          })

      def process_at_station(
          self,
          station: str,
          job_id: int,
          mean_time: float,
          std_time: float
      ) -> Generator:
          """Process job at a station."""
          request_time = self.env.now

          with self.machines[station].request() as req:
              yield req
              wait_time = self.env.now - request_time

              # Process
              process_time = max(0, np.random.normal(mean_time, std_time))
              yield self.env.timeout(process_time)

      def job_arrivals(self, mean_interarrival: float) -> Generator:
          """Generate arriving jobs."""
          job_id = 0
          while True:
              yield self.env.timeout(np.random.exponential(mean_interarrival))
              self.env.process(self.job_process(job_id))
              job_id += 1

  # Run simulation
  def run_manufacturing_sim(duration: float = 1000):
      env = simpy.Environment()
      line = ManufacturingLine(env)

      # Start job arrivals
      env.process(line.job_arrivals(mean_interarrival=5))

      # Run
      env.run(until=duration)

      # Analyze results
      flow_times = [j['flow_time'] for j in line.completed_jobs]
      print(f"Jobs completed: {len(line.completed_jobs)}")
      print(f"Avg flow time: {np.mean(flow_times):.2f}")
      print(f"Throughput: {len(line.completed_jobs)/duration:.2f} jobs/time")

      return line

resource_modeling: description: "Resource pools, preemption, and priorities" example: | import simpy from typing import Optional, List from dataclasses import dataclass from enum import IntEnum

  class Priority(IntEnum):
      LOW = 3
      NORMAL = 2
      HIGH = 1
      URGENT = 0

  @dataclass
  class Job:
      id: int
      priority: Priority
      preemptable: bool = False
      processing_time: float = 0

  class PreemptiveResourcePool:
      """
      Resource pool with preemption support.

      Higher priority jobs can preempt lower priority ones.
      """

      def __init__(self, env: simpy.Environment, capacity: int):
          self.env = env
          self.resource = simpy.PreemptiveResource(env, capacity=capacity)
          self.preemption_count = 0
          self.utilization_time = 0
          self.last_utilization_check = 0

      def process(self, job: Job) -> simpy.events.Process:
          """Process a job, potentially preempting others."""
          return self.env.process(self._process(job))

      def _process(self, job: Job):
          remaining_time = job.processing_time
          start_time = self.env.now

          while remaining_time > 0:
              try:
                  with self.resource.request(priority=job.priority) as req:
                      yield req
                      process_start = self.env.now

                      # Try to complete processing
                      yield self.env.timeout(remaining_time)
                      remaining_time = 0

              except simpy.Interrupt:
                  # Preempted! Record progress
                  self.preemption_count += 1
                  elapsed = self.env.now - process_start
                  remaining_time -= elapsed

                  # Re-enter queue with remaining work
                  # (Loop continues)

          self.utilization_time += job.processing_time

  class ResourcePool:
      """Pool of identical resources with failure modeling."""

      def __init__(
          self,
          env: simpy.Environment,
          capacity: int,
          mttf: Optional[float] = None,  # Mean time to failure
          mttr: Optional[float] = None   # Mean time to repair
      ):
          self.env = env
          self.capacity = capacity
          self.available = capacity
          self.container = simpy.Container(env, capacity=capacity, init=capacity)

          self.mttf = mttf
          self.mttr = mttr

          # Track failures
          self.failures: List[dict] = []

          if mttf and mttr:
              for i in range(capacity):
                  env.process(self._failure_process(i))

      def _failure_process(self, unit_id: int):
          """Model random failures and repairs."""
          while True:
              # Time until failure
              yield self.env.timeout(np.random.exponential(self.mttf))

              # Fail: remove from pool
              yield self.container.get(1)
              failure_time = self.env.now
              self.failures.append({
                  'unit': unit_id,
                  'fail_time': failure_time
              })

              # Repair duration
              yield self.env.timeout(np.random.exponential(self.mttr))

              # Restore to pool
              yield self.container.put(1)
              self.failures[-1]['repair_time'] = self.env.now

      def request(self, amount: int = 1):
          """Request resources from pool."""
          return self.container.get(amount)

      def release(self, amount: int = 1):
          """Return resources to pool."""
          return self.container.put(amount)

statistical_analysis: description: "Output analysis for simulation experiments" example: | import numpy as np from typing import List, Tuple, Dict from dataclasses import dataclass from scipy import stats

  @dataclass
  class SimulationOutput:
      """Output from a single simulation run."""
      observations: np.ndarray
      time_stamps: np.ndarray
      run_id: int

  class OutputAnalyzer:
      """
      Statistical analysis of simulation outputs.

      Handles warm-up detection, batch means, confidence intervals.
      """

      def __init__(self, outputs: List[SimulationOutput]):
          self.outputs = outputs
          self.n_runs = len(outputs)

      def detect_warmup(
          self,
          observations: np.ndarray,
          method: str = 'mser'
      ) -> int:
          """
          Detect warm-up period to truncate.

          Methods:
          - mser: Marginal Standard Error Rule
          - visual: Return data for visual inspection
          """
          n = len(observations)

          if method == 'mser':
              # MSER: find truncation that minimizes std error
              best_d = 0
              min_mser = float('inf')

              for d in range(n // 2):
                  truncated = observations[d:]
                  if len(truncated) < 10:
                      break
                  mser = np.var(truncated) / len(truncated)
                  if mser < min_mser:
                      min_mser = mser
                      best_d = d

              return best_d

          return 0

      def batch_means(
          self,
          observations: np.ndarray,
          n_batches: int = 20
      ) -> Tuple[float, float]:
          """
          Batch means method for autocorrelated data.

          Returns (mean, std_error).
          """
          batch_size = len(observations) // n_batches
          if batch_size < 1:
              return np.mean(observations), np.std(observations) / np.sqrt(len(observations))

          batches = [
              np.mean(observations[i*batch_size:(i+1)*batch_size])
              for i in range(n_batches)
          ]

          mean = np.mean(batches)
          std_error = np.std(batches) / np.sqrt(n_batches)

          return mean, std_error

      def confidence_interval(
          self,
          confidence: float = 0.95
      ) -> Dict[str, Tuple[float, float, float]]:
          """
          Compute confidence intervals across replications.

          Returns {metric: (mean, lower, upper)}.
          """
          results = {}

          # Get final values from each run
          final_values = [out.observations[-1] for out in self.outputs]

          mean = np.mean(final_values)
          std = np.std(final_values, ddof=1)
          n = len(final_values)

          # t-distribution for small samples
          t_crit = stats.t.ppf((1 + confidence) / 2, n - 1)
          margin = t_crit * std / np.sqrt(n)

          results['final_value'] = (mean, mean - margin, mean + margin)

          return results

      def paired_comparison(
          self,
          other_outputs: List[SimulationOutput],
          confidence: float = 0.95
      ) -> Dict[str, any]:
          """
          Paired t-test comparison of two systems.

          Uses common random numbers for variance reduction.
          """
          assert len(self.outputs) == len(other_outputs), \
              "Need same number of replications"

          differences = [
              self.outputs[i].observations[-1] - other_outputs[i].observations[-1]
              for i in range(self.n_runs)
          ]

          mean_diff = np.mean(differences)
          std_diff = np.std(differences, ddof=1)
          n = len(differences)

          # t-test
          t_stat = mean_diff / (std_diff / np.sqrt(n))
          p_value = 2 * (1 - stats.t.cdf(abs(t_stat), n - 1))

          # Confidence interval on difference
          t_crit = stats.t.ppf((1 + confidence) / 2, n - 1)
          margin = t_crit * std_diff / np.sqrt(n)

          return {
              'mean_difference': mean_diff,
              'ci': (mean_diff - margin, mean_diff + margin),
              'p_value': p_value,
              'significant': p_value < (1 - confidence)
          }

      def ranking_and_selection(
          self,
          systems: List[List[SimulationOutput]],
          indifference_zone: float = 0.1,
          confidence: float = 0.95
      ) -> int:
          """
          Select best system with statistical guarantee.

          Returns index of best system.
          """
          n_systems = len(systems)
          means = []

          for system_outputs in systems:
              values = [out.observations[-1] for out in system_outputs]
              means.append(np.mean(values))

          # Rinott's procedure for indifference zone
          best_idx = np.argmin(means)  # Assuming lower is better

          return best_idx

common_random_numbers: description: "Variance reduction via synchronized random streams" example: | import numpy as np from typing import Callable, List, Dict

  class CRNManager:
      """
      Common Random Numbers for fair system comparison.

      Ensures same random events occur in both systems.
      """

      def __init__(self, seed: int = 42):
          self.base_seed = seed
          self.stream_seeds: Dict[str, int] = {}
          self.stream_rngs: Dict[str, np.random.Generator] = {}
          self.stream_index = 0

      def get_stream(self, name: str) -> np.random.Generator:
          """Get or create a random stream by name."""
          if name not in self.stream_rngs:
              self.stream_seeds[name] = self.base_seed + self.stream_index
              self.stream_rngs[name] = np.random.default_rng(
                  self.stream_seeds[name]
              )
              self.stream_index += 1

          return self.stream_rngs[name]

      def reset_all_streams(self) -> None:
          """Reset all streams to initial state for replication."""
          for name, seed in self.stream_seeds.items():
              self.stream_rngs[name] = np.random.default_rng(seed)

      def advance_replication(self) -> None:
          """Advance to next replication (different base seed)."""
          self.base_seed += 1000  # Large jump
          for name in self.stream_seeds:
              self.stream_seeds[name] = self.base_seed + \
                  list(self.stream_seeds.keys()).index(name)
              self.stream_rngs[name] = np.random.default_rng(
                  self.stream_seeds[name]
              )

  def compare_with_crn(
      system_a: Callable[[CRNManager], float],
      system_b: Callable[[CRNManager], float],
      n_replications: int = 30
  ) -> Dict[str, float]:
      """
      Compare two systems using CRN.

      Both systems see same random events per replication.
      """
      crn = CRNManager()
      differences = []

      for rep in range(n_replications):
          # Reset streams so both systems see same random numbers
          crn.reset_all_streams()
          result_a = system_a(crn)

          crn.reset_all_streams()
          result_b = system_b(crn)

          differences.append(result_a - result_b)
          crn.advance_replication()

      mean_diff = np.mean(differences)
      std_diff = np.std(differences, ddof=1)
      se = std_diff / np.sqrt(n_replications)

      # Variance reduction achieved
      # Compare with independent sampling variance
      var_with_crn = np.var(differences)

      return {
          'mean_difference': mean_diff,
          'std_error': se,
          'variance': var_with_crn
      }

anti_patterns:

  • pattern: "Floating point time comparison" problem: "Accumulated errors cause missed or duplicate events" solution: "Use integer time units or tolerance-based comparison"

  • pattern: "No warm-up period" problem: "Transient effects bias steady-state estimates" solution: "Detect and truncate warm-up using MSER or other methods"

  • pattern: "Single replication" problem: "Can't quantify output variability" solution: "Run multiple replications, compute confidence intervals"

  • pattern: "Independent streams for compared systems" problem: "High variance obscures differences" solution: "Use Common Random Numbers for paired comparison"

  • pattern: "Infinite queue assumption" problem: "Real systems have limits, blocking, abandonment" solution: "Model finite buffers, abandonment, blocking explicitly"

  • pattern: "Ignoring event priority for simultaneous events" problem: "Tie-breaking affects results" solution: "Define explicit priority rules, document assumptions"

handoffs:

  • to: agent-based-modeling when: "Entities need complex behavior beyond queuing" context: "Hybrid DES-ABM simulation"

  • to: monte-carlo when: "Need uncertainty quantification on inputs" context: "Parameter uncertainty propagation"

  • to: physics-simulation when: "Physical processes between events" context: "Hybrid continuous-discrete simulation"

  • to: backend when: "Building simulation as service" context: "API design for simulation inputs/outputs"

ecosystem: python_libraries: - "SimPy - Process-oriented DES" - "Salabim - DES with animation" - "Ciw - Queueing networks" - "Simulus - High-performance DES"

commercial: - "Arena - General purpose DES" - "AnyLogic - Multi-method simulation" - "FlexSim - Manufacturing focus" - "SIMUL8 - Healthcare, services"

analysis: - "SciPy - Statistical analysis" - "Pandas - Data analysis" - "Matplotlib - Visualization"

references: books: - "Simulation Modeling and Analysis - Law" - "Discrete-Event System Simulation - Banks et al." - "Simulation with Arena - Kelton et al."

papers: - "Output Analysis for Simulations - Law & Kelton" - "Common Random Numbers - Glasserman & Yao" - "Ranking and Selection - Kim & Nelson"