SimPy — Discrete-Event Simulation
Overview
SimPy is a process-based discrete-event simulation framework using standard Python generators. Model systems where entities (customers, vehicles, packets) interact with shared resources (servers, machines, bandwidth) over time, with event-driven scheduling and optional real-time synchronization.
When to Use
- Modeling queue-based systems with resource contention (servers, machines, staff)
- Manufacturing process simulation (production lines, scheduling, bottleneck analysis)
- Network simulation (packet routing, bandwidth allocation, latency analysis)
- Capacity planning (determining optimal resource levels for target throughput)
- Healthcare operations (ER patient flow, staff allocation, bed management)
- Logistics and transportation (warehouse operations, vehicle routing)
- For continuous-time ODE systems → use SciPy
solve_ivp
- For agent-based modeling → use Mesa
Prerequisites
# pip install simpy
import simpy
import random
Quick Start
import simpy
import random
def customer(env, name, server):
"""Customer arrives, waits for server, gets served, departs."""
arrival = env.now
with server.request() as req:
yield req # Wait in queue
wait = env.now - arrival
yield env.timeout(random.expovariate(1/3)) # Service time
print(f'{name}: waited {wait:.1f}, served at {env.now:.1f}')
def arrivals(env, server):
for i in range(20):
yield env.timeout(random.expovariate(1/2)) # Inter-arrival
env.process(customer(env, f'C{i}', server))
env = simpy.Environment()
server = simpy.Resource(env, capacity=2)
env.process(arrivals(env, server))
env.run(until=50)
Core API
1. Environment & Processes
import simpy
# Standard environment
env = simpy.Environment(initial_time=0)
# Processes are Python generators that yield events
def machine(env, name, repair_time):
while True:
yield env.timeout(random.expovariate(1/10)) # Time to failure
print(f'{name} broke at {env.now:.1f}')
yield env.timeout(repair_time)
print(f'{name} repaired at {env.now:.1f}')
# Start processes — returns a Process event
proc = env.process(machine(env, 'Machine-1', repair_time=2))
# Run until time limit or no events remain
env.run(until=100)
# env.run() # Run until no more events
# Current simulation time
print(f'Final time: {env.now}')
# Processes can return values and be awaited
def subtask(env, duration):
yield env.timeout(duration)
return f'completed in {duration}'
def main_task(env):
# Sequential: wait for one process
result = yield env.process(subtask(env, 5))
print(f'Subtask {result} at {env.now}')
# Parallel: wait for ALL (AllOf)
t1 = env.process(subtask(env, 3))
t2 = env.process(subtask(env, 4))
results = yield t1 & t2 # AllOf — resumes when both done
print(f'Both done at {env.now}')
# Race: wait for ANY (AnyOf)
t3 = env.process(subtask(env, 2))
t4 = env.process(subtask(env, 6))
result = yield t3 | t4 # AnyOf — resumes when first completes
print(f'First done at {env.now}')
env = simpy.Environment()
env.process(main_task(env))
env.run()
2. Resources
import simpy
env = simpy.Environment()
# Basic resource — capacity-limited (e.g., 2 servers)
server = simpy.Resource(env, capacity=2)
print(f'Capacity: {server.capacity}, In use: {server.count}, Queue: {len(server.queue)}')
# Priority resource — lower number = higher priority
priority_server = simpy.PriorityResource(env, capacity=1)
def vip_customer(env, res):
with res.request(priority=1) as req: # Higher priority
yield req
yield env.timeout(3)
def regular_customer(env, res):
with res.request(priority=10) as req: # Lower priority
yield req
yield env.timeout(3)
# Preemptive resource — high priority interrupts low priority
preemptive = simpy.PreemptiveResource(env, capacity=1)
def urgent_job(env, res):
with res.request(priority=0, preempt=True) as req:
yield req # May interrupt current user
yield env.timeout(1)
# Container — bulk material (fuel, water, inventory)
tank = simpy.Container(env, capacity=100, init=50)
def refuel(env, tank):
yield tank.put(30) # Add 30 units
print(f'Tank level: {tank.level}/{tank.capacity}')
def consume(env, tank):
yield tank.get(20) # Remove 20 units
print(f'Tank level: {tank.level}/{tank.capacity}')
# Store — FIFO object storage
warehouse = simpy.Store(env, capacity=10)
def producer(env, store):
for i in range(5):
yield env.timeout(2)
yield store.put(f'Item-{i}')
def consumer(env, store):
while True:
item = yield store.get()
print(f'Got {item} at {env.now}')
yield env.timeout(3)
# FilterStore — selective retrieval
parts = simpy.FilterStore(env, capacity=20)
def picker(env, store):
# Get specific item matching condition
item = yield store.get(lambda x: x['color'] == 'red')
print(f'Found red item: {item}')
3. Events & Synchronization
import simpy
env = simpy.Environment()
# Basic event — manual trigger for signaling between processes
signal = env.event()
def waiter(env, event):
print(f'Waiting at {env.now}')
value = yield event # Blocks until triggered
print(f'Got signal "{value}" at {env.now}')
def sender(env, event):
yield env.timeout(5)
event.succeed(value='go') # Trigger with value
env.process(waiter(env, signal))
env.process(sender(env, signal))
env.run()
# Output: Waiting at 0, Got signal "go" at 5
# Timeout — most common event
yield env.timeout(delay=5)
# Process interruption
def interruptible(env, name):
try:
yield env.timeout(10)
except simpy.Interrupt as interrupt:
print(f'{name} interrupted: {interrupt.cause} at {env.now}')
def interruptor(env, proc):
yield env.timeout(3)
proc.interrupt('maintenance')
proc = env.process(interruptible(env, 'Worker'))
env.process(interruptor(env, proc))
# Barrier synchronization — wait for N processes
class Barrier:
def __init__(self, env, n):
self.env = env
self.n = n
self.count = 0
self.event = env.event()
def wait(self):
self.count += 1
if self.count >= self.n:
self.event.succeed()
return self.event
def phase_worker(env, name, barrier):
yield env.timeout(random.uniform(1, 5)) # Phase work
print(f'{name} reached barrier at {env.now:.1f}')
yield barrier.wait() # Wait for all workers
print(f'{name} passed barrier at {env.now:.1f}')
env = simpy.Environment()
barrier = Barrier(env, n=3)
for i in range(3):
env.process(phase_worker(env, f'W{i}', barrier))
env.run()
4. Monitoring & Statistics
import simpy
# Inline statistics collection
class Stats:
def __init__(self):
self.wait_times = []
self.queue_lengths = []
def report(self):
if self.wait_times:
avg_wait = sum(self.wait_times) / len(self.wait_times)
max_wait = max(self.wait_times)
print(f'Avg wait: {avg_wait:.2f}, Max wait: {max_wait:.2f}')
print(f'Customers served: {len(self.wait_times)}')
def customer(env, name, server, stats):
arrival = env.now
with server.request() as req:
yield req
wait = env.now - arrival
stats.wait_times.append(wait)
stats.queue_lengths.append(len(server.queue))
yield env.timeout(random.expovariate(1/3))
env = simpy.Environment()
server = simpy.Resource(env, capacity=2)
stats = Stats()
def gen(env, server, stats):
for i in range(100):
yield env.timeout(random.expovariate(1/2))
env.process(customer(env, f'C{i}', server, stats))
env.process(gen(env, server, stats))
env.run(until=200)
stats.report()
# Resource monitoring via monkey-patching
def patch_resource(resource, data):
"""Patch resource to log request/release events."""
original_request = resource.request
original_release = resource.release
def monitored_request(*args, **kwargs):
req = original_request(*args, **kwargs)
data.append((resource._env.now, 'request', resource.count, len(resource.queue)))
return req
def monitored_release(*args, **kwargs):
result = original_release(*args, **kwargs)
data.append((resource._env.now, 'release', resource.count, len(resource.queue)))
return result
resource.request = monitored_request
resource.release = monitored_release
log = []
patch_resource(server, log)
# After simulation: analyze log for utilization, queue dynamics
5. Real-Time Simulation
import simpy.rt
# Real-time environment — synchronized with wall clock
env = simpy.rt.RealtimeEnvironment(factor=1.0) # 1 sim unit = 1 second
# factor=0.1 → 10x faster (1 sim unit = 0.1 seconds)
# factor=60 → 1 sim unit = 1 minute
# Strict mode raises RuntimeError if simulation can't keep up
env_strict = simpy.rt.RealtimeEnvironment(factor=1.0, strict=True)
# Non-strict mode (default) allows slower-than-real-time execution
env_relaxed = simpy.rt.RealtimeEnvironment(factor=1.0, strict=False)
def periodic_task(env, interval):
while True:
print(f'Tick at sim time {env.now:.1f}')
yield env.timeout(interval)
env = simpy.rt.RealtimeEnvironment(factor=1.0)
env.process(periodic_task(env, 2.0))
env.run(until=10)
# Prints "Tick" every ~2 real seconds
Key Concepts
Resource Selection Guide
| Need | Resource Type | Key Feature |
|---|
| Limited servers/machines | Resource
| FIFO queue, capacity limit |
| Priority queuing | PriorityResource
| Lower number = higher priority |
| Preemptive scheduling | PreemptiveResource
| High priority interrupts current user |
| Bulk material (fuel, water) | Container
| put(amount) / get(amount) , continuous level |
| Object queue (FIFO) | Store
| put(item) / get() , ordered retrieval |
| Conditional retrieval | FilterStore
| get(lambda x: condition)
|
| Priority-ordered items | PriorityStore
| Items sorted by priority |
Process Interaction Mechanisms
| Mechanism | Use When | Code Pattern |
|---|
| Event signaling | Broadcast to multiple waiters | event = env.event() → yield event / event.succeed()
|
| Process yield | Sequential or parallel execution | yield env.process(func()) or yield p1 & p2
|
| Interruption | Preemption, maintenance, cancellation | proc.interrupt(cause) + try/except simpy.Interrupt
|
| Timeout racing | Timeout with cancellation | `yield event |
Common Workflows
1. Manufacturing Line Simulation
import simpy
import random
def part(env, name, machines, buffer, stats):
"""Part flows through sequential machines with intermediate buffer."""
for i, machine in enumerate(machines):
with machine.request() as req:
yield req
process_time = random.triangular(1, 3, 2)
yield env.timeout(process_time)
if buffer.level < buffer.capacity:
yield buffer.put(1)
stats['produced'] += 1
def part_generator(env, machines, buffer, stats):
i = 0
while True:
yield env.timeout(random.expovariate(1/2))
env.process(part(env, f'Part-{i}', machines, buffer, stats))
i += 1
random.seed(42)
env = simpy.Environment()
machines = [simpy.Resource(env, capacity=1) for _ in range(3)]
output_buffer = simpy.Container(env, capacity=100, init=0)
stats = {'produced': 0}
env.process(part_generator(env, machines, output_buffer, stats))
env.run(until=480) # 8-hour shift
print(f'Parts produced: {stats["produced"]}')
print(f'Buffer level: {output_buffer.level}')
2. Multi-Server Queue with Priority
import simpy
import random
def patient(env, name, priority, er, stats):
arrival = env.now
with er.request(priority=priority) as req:
yield req
wait = env.now - arrival
stats['waits'].append((name, priority, wait))
service = random.expovariate(1/15) # ~15 min avg
yield env.timeout(service)
def patient_arrivals(env, er, stats):
i = 0
while True:
yield env.timeout(random.expovariate(1/5)) # ~5 min between arrivals
pri = random.choices([1, 2, 3], weights=[0.1, 0.3, 0.6])[0]
env.process(patient(env, f'P{i}', pri, er, stats))
i += 1
random.seed(42)
env = simpy.Environment()
er = simpy.PriorityResource(env, capacity=3)
stats = {'waits': []}
env.process(patient_arrivals(env, er, stats))
env.run(until=480)
# Analyze by priority
for pri in [1, 2, 3]:
waits = [w for _, p, w in stats['waits'] if p == pri]
if waits:
print(f'Priority {pri}: avg wait {sum(waits)/len(waits):.1f}, n={len(waits)}')
3. Producer-Consumer with Monitoring
Text-only workflow (combines Core API modules 2, 3, 4):
- Create
simpy.Store
with bounded capacity (Module 2: Resources)
- Implement producer process that
yield store.put(item)
with production delay (Module 2)
- Implement consumer process that
yield store.get()
with processing delay (Module 2)
- Add event signaling for backpressure when store full (Module 3: Events)
- Collect throughput, queue length, and idle time statistics (Module 4: Monitoring)
- Run simulation and generate report
Key Parameters
| Parameter | Module | Default | Range | Effect |
|---|
capacity
| Resource | 1 | 1–∞ | Number of concurrent users |
priority
| PriorityResource.request | 0 | int | Lower = higher priority |
preempt
| PreemptiveResource.request | True | bool | Whether to interrupt lower-priority |
capacity
| Container | float('inf') | 0–∞ | Maximum level |
init
| Container | 0 | 0–capacity | Initial level |
capacity
| Store | float('inf') | 0–∞ | Maximum items |
factor
| RealtimeEnvironment | 1.0 | >0 | Sim-to-wall-clock ratio |
strict
| RealtimeEnvironment | False | bool | Raise error if behind schedule |
initial_time
| Environment | 0 | any float | Simulation start time |
Best Practices
- Always use context managers for resources:
with resource.request() as req: yield req
ensures automatic release even on exceptions
- Set random seeds for reproducibility:
random.seed(42)
before creating processes; use numpy.random
for more distributions
- Collect statistics inline: Append to lists during simulation, compute aggregates after
env.run()
— don't query mid-simulation
- Use triangular distribution for process times:
random.triangular(min, max, mode)
is more realistic than uniform for service times
- Anti-pattern — forgetting yield:
env.timeout(5)
without yield
creates the event but doesn't pause the process. Always yield env.timeout(5)
- Anti-pattern — reusing events: Events can only be triggered once. Create new
env.event()
for each signal cycle; for repeatable signals, create fresh events in a loop
Common Recipes
Recipe: Simulation with Multiple Replications
import simpy
import random
import statistics
def run_single(seed, sim_time=480, n_servers=2):
random.seed(seed)
env = simpy.Environment()
server = simpy.Resource(env, capacity=n_servers)
waits = []
def customer(env, server):
arrival = env.now
with server.request() as req:
yield req
waits.append(env.now - arrival)
yield env.timeout(random.expovariate(1/3))
def gen(env, server):
while True:
yield env.timeout(random.expovariate(1/2))
env.process(customer(env, server))
env.process(gen(env, server))
env.run(until=sim_time)
return sum(waits) / len(waits) if waits else 0
# Run 30 replications
results = [run_single(seed=i) for i in range(30)]
print(f'Mean avg wait: {statistics.mean(results):.2f}')
print(f'95% CI: ±{1.96 * statistics.stdev(results) / len(results)**0.5:.2f}')
Recipe: Interrupt-Based Maintenance
import simpy
import random
def machine(env, name, repair_crew):
while True:
try:
# Operate until failure
ttf = random.expovariate(1/50) # Mean 50 time units to failure
yield env.timeout(ttf)
print(f'{name} failed at {env.now:.1f}')
except simpy.Interrupt:
print(f'{name} interrupted for maintenance at {env.now:.1f}')
# Repair (needs repair crew)
with repair_crew.request() as req:
yield req
repair = random.uniform(2, 5)
yield env.timeout(repair)
print(f'{name} repaired at {env.now:.1f}')
def maintenance_scheduler(env, machines_procs):
"""Periodic preventive maintenance every 40 time units."""
while True:
yield env.timeout(40)
for proc in machines_procs:
if proc.is_alive:
proc.interrupt('scheduled maintenance')
env = simpy.Environment()
repair_crew = simpy.Resource(env, capacity=1)
procs = [env.process(machine(env, f'M{i}', repair_crew)) for i in range(3)]
env.process(maintenance_scheduler(env, procs))
env.run(until=200)
Recipe: Container-Based Supply Chain
import simpy
import random
def supplier(env, warehouse):
"""Deliver batch when level drops below reorder point."""
while True:
if warehouse.level < 20: # Reorder point
yield env.timeout(random.uniform(5, 10)) # Lead time
amount = min(50, warehouse.capacity - warehouse.level)
yield warehouse.put(amount)
print(f'Delivered {amount} units at {env.now:.1f}, level={warehouse.level}')
yield env.timeout(1) # Check interval
def demand(env, warehouse, stats):
while True:
yield env.timeout(random.expovariate(1/2))
qty = random.randint(1, 5)
if warehouse.level >= qty:
yield warehouse.get(qty)
stats['fulfilled'] += qty
else:
stats['stockouts'] += 1
env = simpy.Environment()
warehouse = simpy.Container(env, capacity=100, init=80)
stats = {'fulfilled': 0, 'stockouts': 0}
env.process(supplier(env, warehouse))
env.process(demand(env, warehouse, stats))
env.run(until=500)
print(f'Fulfilled: {stats["fulfilled"]}, Stockouts: {stats["stockouts"]}')
Troubleshooting
| Problem | Cause | Solution |
|---|
| Process doesn't pause | Missing yield before event | Always yield env.timeout(x) , not just env.timeout(x)
|
RuntimeError: Event already triggered
| Reusing a triggered event | Create new env.event() for each signal cycle |
| Resource never released | Not using context manager | Use with resource.request() as req: pattern |
| Simulation runs forever | No until parameter and infinite process | Add env.run(until=time) or ensure processes terminate |
simpy.Interrupt not caught | Missing try/except in interruptible process | Wrap yield in try: ... except simpy.Interrupt:
|
| Wrong queue order | Using Resource instead of PriorityResource | Switch to simpy.PriorityResource for priority queuing |
| Real-time too slow | Computation exceeds wall-clock budget | Set strict=False or increase factor
|
Container put blocks | Container at capacity | Check container.level < container.capacity before put |
FilterStore get blocks forever | No matching items | Ensure producers create items matching the filter criteria |
| Statistics are empty | Collecting before env.run()
| Call stats.report() after env.run() completes |
Bundled Resources
references/process_events_guide.md
— Detailed event lifecycle (triggered→processed), composite events (AllOf/AnyOf), process interaction patterns (signaling, barriers, interruption, handshake), and advanced synchronization. Consolidated from original events.md (375 lines) + process-interaction.md (425 lines)
references/resources_monitoring_guide.md
— Complete resource type reference (Resource, Priority, Preemptive, Container, Store, FilterStore, PriorityStore), monitoring via monkey-patching (ResourceMonitor, ContainerMonitor classes), statistical collection patterns, CSV/matplotlib export, and real-time simulation (RealtimeEnvironment, time scaling, strict mode, HIL patterns). Consolidated from original resources.md (276 lines) + monitoring.md (476 lines) + real-time.md (396 lines). Scripts functionality (basic_simulation_template.py, resource_monitor.py) incorporated into Core API monitoring examples and Common Recipes
Related Skills
- matplotlib-scientific-plotting — Visualize simulation results (queue lengths, utilization over time)
- polars-dataframes — Analyze large simulation output datasets
References