Claude-skill-registry crystal-concurrency
Use when implementing concurrent programming in Crystal using fibers, channels, and parallel execution patterns for high-performance, non-blocking applications.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/crystal-concurrency" ~/.claude/skills/majiayu000-claude-skill-registry-crystal-concurrency && rm -rf "$T"
skills/data/crystal-concurrency/SKILL.mdCrystal Concurrency
You are Claude Code, an expert in Crystal's concurrency model. You specialize in building high-performance, concurrent applications using fibers, channels, and Crystal's lightweight concurrency primitives.
Your core responsibilities:
- Implement fiber-based concurrent operations for non-blocking execution
- Design channel-based communication patterns for inter-fiber coordination
- Build parallel processing pipelines with proper synchronization
- Implement worker pools and task distribution systems
- Handle concurrent resource access with mutexes and atomic operations
- Design fault-tolerant concurrent systems with proper error handling
- Optimize fiber scheduling and resource utilization
- Implement backpressure and flow control mechanisms
- Build real-time data processing systems
- Design concurrent I/O operations for network and file systems
Fibers: Lightweight Concurrency
Crystal uses fibers (also known as green threads or coroutines) for concurrency. Fibers are cooperatively scheduled by the Crystal runtime and are much lighter weight than OS threads.
Basic Fiber Spawning
# Simple fiber spawning spawn do puts "Running in a fiber" sleep 1 puts "Fiber completed" end # Fiber with arguments def process_data(id : Int32, data : String) puts "Processing #{data} with id #{id}" sleep 0.5 puts "Completed #{id}" end spawn process_data(1, "task A") spawn process_data(2, "task B") # Wait for fibers to complete sleep 1
Fiber with Return Values via Channels
# Fibers don't return values directly, use channels instead result_channel = Channel(Int32).new spawn do result = expensive_computation(42) result_channel.send(result) end # Do other work... puts "Doing other work" # Wait for result result = result_channel.receive puts "Got result: #{result}" def expensive_computation(n : Int32) : Int32 sleep 1 n * 2 end
Named Fibers for Debugging
# Give fibers descriptive names for debugging spawn(name: "data-processor") do process_large_dataset end spawn(name: "cache-updater") do update_cache_periodically end # Fiber names appear in exception backtraces spawn(name: "failing-worker") do raise "Something went wrong" end
Channels: Inter-Fiber Communication
Channels are the primary mechanism for communication between fibers. They provide thread-safe message passing with optional buffering.
Unbuffered Channels
# Unbuffered channel - blocks until both sender and receiver are ready channel = Channel(String).new spawn do puts "Sending message" channel.send("Hello") puts "Message sent" end spawn do sleep 0.1 # Small delay puts "Receiving message" msg = channel.receive puts "Received: #{msg}" end sleep 1
Buffered Channels
# Buffered channel - allows sending without blocking up to buffer size channel = Channel(Int32).new(capacity: 3) # These sends won't block channel.send(1) channel.send(2) channel.send(3) # This would block until someone receives # channel.send(4) # Receive values puts channel.receive # 1 puts channel.receive # 2 puts channel.receive # 3
Channel Closing and Iteration
# Producer-consumer with channel closing channel = Channel(Int32).new # Producer spawn do 5.times do |i| channel.send(i) sleep 0.1 end channel.close # Signal no more values end # Consumer - iterate until channel is closed spawn do channel.each do |value| puts "Received: #{value}" end puts "Channel closed, consumer exiting" end sleep 1
Checking if Channel is Closed
channel = Channel(String).new spawn do channel.send("message 1") channel.send("message 2") channel.close end sleep 0.1 # Check before receiving unless channel.closed? puts channel.receive end # Or handle the exception begin puts channel.receive puts channel.receive puts channel.receive # Will raise Channel::ClosedError rescue Channel::ClosedError puts "Channel is closed" end
Select: Multiplexing Channels
The
select statement allows waiting on multiple channel operations simultaneously,
similar to Go's select statement.
Basic Select with Multiple Channels
ch1 = Channel(String).new ch2 = Channel(Int32).new spawn do sleep 0.2 ch1.send("from channel 1") end spawn do sleep 0.1 ch2.send(42) end # Wait for whichever channel is ready first select when msg = ch1.receive puts "Got string: #{msg}" when num = ch2.receive puts "Got number: #{num}" end sleep 1
Select with Timeout
channel = Channel(String).new spawn do sleep 2 # Takes too long channel.send("delayed message") end # Wait with timeout select when msg = channel.receive puts "Received: #{msg}" when timeout(1.second) puts "Timed out waiting for message" end
Select with Default Case (Non-blocking)
channel = Channel(Int32).new # Non-blocking receive select when value = channel.receive puts "Got value: #{value}" else puts "No value available, continuing immediately" end
Select in a Loop
results = Channel(String).new done = Channel(Nil).new output = [] of String # Multiple workers sending results 3.times do |i| spawn do sleep rand(0.5..1.5) results.send("Worker #{i} done") end end # Collector fiber spawn do 3.times do output << results.receive end done.send(nil) end # Wait for completion with timeout select when done.receive puts "All workers completed" output.each { |msg| puts msg } when timeout(5.seconds) puts "Timeout - not all workers completed" end
Worker Pools
Worker pools distribute tasks across a fixed number of concurrent workers.
Basic Worker Pool
class WorkerPool(T, R) def initialize(@size : Int32) @tasks = Channel(T).new @results = Channel(R).new @workers = [] of Fiber @size.times do |i| @workers << spawn(name: "worker-#{i}") do worker_loop end end end private def worker_loop @tasks.each do |task| result = process(task) @results.send(result) end end def process(task : T) : R # Override in subclass or pass block raise "Not implemented" end def submit(task : T) @tasks.send(task) end def get_result : R @results.receive end def shutdown @tasks.close end end # Usage example class IntSquarePool < WorkerPool(Int32, Int32) def process(task : Int32) : Int32 sleep 0.1 # Simulate work task * task end end pool = IntSquarePool.new(size: 3) # Submit tasks 10.times { |i| pool.submit(i) } # Collect results results = [] of Int32 10.times { results << pool.get_result } pool.shutdown puts results.sort
Worker Pool with Error Handling
struct Task property id : Int32 property data : String def initialize(@id, @data) end end struct Result property task_id : Int32 property success : Bool property value : String? property error : String? def initialize(@task_id, @success, @value = nil, @error = nil) end end class RobustWorkerPool def initialize(@worker_count : Int32) @tasks = Channel(Task).new(capacity: 100) @results = Channel(Result).new(capacity: 100) @worker_count.times do |i| spawn(name: "worker-#{i}") do process_tasks end end end private def process_tasks @tasks.each do |task| begin result_value = process_task(task) @results.send(Result.new( task_id: task.id, success: true, value: result_value )) rescue ex @results.send(Result.new( task_id: task.id, success: false, error: ex.message )) end end end private def process_task(task : Task) : String # Simulate processing that might fail raise "Invalid data" if task.data.empty? sleep 0.1 "Processed: #{task.data}" end def submit(task : Task) @tasks.send(task) end def get_result : Result @results.receive end def shutdown @tasks.close end end
Parallel Map and Reduce
Implement parallel processing of collections.
Parallel Map
def parallel_map(collection : Array(T), workers : Int32 = 4, &block : T -> R) : Array(R) forall T, R tasks = Channel(Tuple(Int32, T)).new results = Channel(Tuple(Int32, R)).new # Spawn workers workers.times do spawn do tasks.each do |index, item| result = yield item results.send({index, result}) end end end # Send tasks spawn do collection.each_with_index do |item, index| tasks.send({index, item}) end tasks.close end # Collect results in order result_map = {} of Int32 => R collection.size.times do index, result = results.receive result_map[index] = result end collection.indices.map { |i| result_map[i] } end # Usage numbers = (1..100).to_a squares = parallel_map(numbers, workers: 8) do |n| sleep 0.01 # Simulate work n * n end puts squares.first(10)
Parallel Reduce with Pipeline
def parallel_reduce(collection : Array(T), workers : Int32 = 4, initial : R, &block : R, T -> R) : R forall T, R chunk_size = (collection.size / workers.to_f).ceil.to_i chunks = collection.each_slice(chunk_size).to_a results = Channel(R).new chunks.each do |chunk| spawn do chunk_result = chunk.reduce(initial) { |acc, item| yield acc, item } results.send(chunk_result) end end # Reduce the partial results final_result = initial chunks.size.times do final_result = yield final_result, results.receive end final_result end # Usage - sum of squares numbers = (1..1000).to_a sum = parallel_reduce(numbers, initial: 0) do |acc, n| acc + n * n end puts "Sum of squares: #{sum}"
Mutex: Protecting Shared State
When fibers need to share mutable state, use mutexes to prevent race conditions.
Basic Mutex Usage
require "mutex" class Counter def initialize @count = 0 @mutex = Mutex.new end def increment @mutex.synchronize do current = @count sleep 0.001 # Simulate some work @count = current + 1 end end def value : Int32 @mutex.synchronize { @count } end end counter = Counter.new # Spawn 100 fibers that each increment 10 times 100.times do spawn do 10.times { counter.increment } end end sleep 2 puts "Final count: #{counter.value}" # Should be 1000
Read-Write Lock Pattern
require "mutex" class CachedData def initialize @data = {} of String => String @mutex = Mutex.new @version = 0 end def read(key : String) : String? @mutex.synchronize do @data[key]? end end def write(key : String, value : String) @mutex.synchronize do @data[key] = value @version += 1 end end def batch_update(updates : Hash(String, String)) @mutex.synchronize do updates.each do |key, value| @data[key] = value end @version += 1 end end def snapshot : Hash(String, String) @mutex.synchronize do @data.dup end end end
Atomic Operations
For simple counters and flags, atomic operations are more efficient than mutexes.
Atomic Counter
require "atomic" class AtomicCounter def initialize(initial : Int32 = 0) @count = Atomic(Int32).new(initial) end def increment : Int32 @count.add(1) end def decrement : Int32 @count.sub(1) end def value : Int32 @count.get end def compare_and_set(expected : Int32, new_value : Int32) : Bool @count.compare_and_set(expected, new_value) end end counter = AtomicCounter.new # Safe concurrent increments without mutex 1000.times do spawn { counter.increment } end sleep 1 puts "Count: #{counter.value}"
Atomic Flag for Coordination
require "atomic" class ShutdownCoordinator def initialize @shutdown_flag = Atomic(Int32).new(0) end def shutdown! @shutdown_flag.set(1) end def shutdown? : Bool @shutdown_flag.get == 1 end def run_until_shutdown(&block) until shutdown? yield sleep 0.1 end end end coordinator = ShutdownCoordinator.new # Worker that checks shutdown flag spawn(name: "worker") do coordinator.run_until_shutdown do puts "Working..." end puts "Worker shutdown gracefully" end sleep 1 coordinator.shutdown! sleep 0.5
When to Use This Skill
Use the crystal-concurrency skill when you need to:
- Process multiple I/O operations concurrently (network requests, file operations)
- Implement real-time data processing pipelines
- Build worker pools for parallel task processing
- Handle multiple client connections simultaneously (web servers, chat systems)
- Perform background processing without blocking main execution
- Aggregate results from multiple concurrent operations
- Implement producer-consumer patterns
- Build rate limiters and backpressure mechanisms
- Process large datasets in parallel
- Coordinate multiple asynchronous operations
- Implement timeout and cancellation patterns
- Build concurrent caches with synchronized access
- Stream data processing with multiple stages
- Implement fan-out/fan-in patterns
Best Practices
- Always Close Channels: Close channels when done sending to signal completion to receivers
- Use Buffered Channels for Performance: Buffer channels when producers/consumers run at different speeds
- Limit Fiber Count: Don't spawn unlimited fibers; use worker pools for bounded concurrency
- Handle Channel Closure: Always handle
or checkChannel::ClosedError
before operationsclosed? - Use Select for Timeouts: Implement timeouts with
andselect
to prevent infinite blockingtimeout() - Prefer Channels Over Shared State: Use message passing (channels) instead of shared memory when possible
- Synchronize Shared State: Always use
or atomics when sharing mutable state between fibersMutex - Clean Up Resources: Use
blocks to guarantee resource cleanup even on errorsensure - Name Your Fibers: Give fibers descriptive names for easier debugging and profiling
- Avoid Blocking Operations in Fibers: Use non-blocking I/O; blocking operations prevent other fibers from running
- Use Atomic Operations for Counters: Atomics are more efficient than mutexes for simple counters and flags
- Implement Graceful Shutdown: Design systems to shut down cleanly, draining channels and waiting for fibers
- Handle Fiber Panics: Wrap fiber code in exception handlers to prevent silent failures
- Size Channel Buffers Appropriately: Too small causes blocking; too large wastes memory
- Use Select Default for Polling: Non-blocking checks with
for polling patternsselect ... else
Common Pitfalls
- Forgetting to Close Channels: Receivers will wait forever if channels aren't closed after sending completes
- Deadlocks from Unbuffered Channels: Sending to unbuffered channel blocks until receiver is ready
- Race Conditions on Shared State: Not using mutexes/atomics when multiple fibers access same data
- Channel Buffer Overflow: Sending more items than buffer capacity without receivers causes blocking
- Not Handling Closed Channels: Receiving from closed channel raises exception; always handle it
- Spawning Too Many Fibers: Unlimited fiber spawning exhausts memory; use worker pools instead
- Blocking the Scheduler: CPU-intensive work in fibers prevents other fibers from running
- Resource Leaks: Not closing channels, files, or connections in all code paths including errors
- Order Assumptions: Fibers execute in non-deterministic order; don't assume execution sequence
- Timeout Too Short: Aggressive timeouts cause false failures; balance responsiveness with reliability
- Mutex Held Too Long: Long critical sections reduce concurrency; minimize mutex hold time
- Send/Receive Mismatch: Imbalanced producers/consumers leads to memory buildup or starvation
- Ignoring Fiber Exceptions: Exceptions in fibers don't propagate to spawner; handle explicitly
- Nested Mutex Locks: Can cause deadlocks; avoid acquiring multiple mutexes or use consistent order
- Not Using
: Forgetting to wrap mutex usage insynchronize
block causes race conditionssynchronize