Agents patterns-concurrency-dev
Cross-cutting patterns for concurrency and async programming across languages. Use when translating async/await between languages, converting goroutines to tokio tasks, mapping channel patterns, or designing concurrent code for language conversions.
git clone https://github.com/aRustyDev/agents
T=$(mktemp -d) && git clone --depth=1 https://github.com/aRustyDev/agents "$T" && mkdir -p ~/.claude/skills && cp -r "$T/content/skills/patterns-concurrency-dev" ~/.claude/skills/arustydev-agents-patterns-concurrency-dev && rm -rf "$T"
content/skills/patterns-concurrency-dev/SKILL.mdConcurrency Patterns
Cross-language reference for concurrency mechanisms including async/await, goroutines, channels, threads, and synchronization primitives. This skill helps translate concurrent code between languages during code conversion.
Overview
This skill covers:
- Async/await pattern comparison
- Goroutines, tasks, and green threads
- Channel and message passing patterns
- Synchronization primitives
- Cancellation and timeout patterns
This skill does NOT cover:
- Building applications with async frameworks (see
skills)lang-*-dev - Distributed systems patterns (see dedicated skills)
- Database connection pooling (see database skills)
Concurrency Model Comparison
| Language | Primary Model | Runtime | Threading | Channels |
|---|---|---|---|---|
| TypeScript | async/await | V8 event loop | Workers (limited) | N/A |
| Python | async/await | asyncio | threading/multiprocessing | Queue |
| Rust | async/await | tokio/async-std | std::thread | mpsc, crossbeam |
| Go | Goroutines | Go scheduler | Built-in | (first-class) |
| Java | Virtual Threads | JVM | Thread, ExecutorService | BlockingQueue |
| Elixir | Processes | BEAM | N/A (processes) | Built-in messaging |
Model Characteristics
Event Loop (JS/TS, Python asyncio) ├── Single-threaded by default ├── Non-blocking I/O ├── Cooperative scheduling └── Cannot utilize multiple cores directly Goroutines (Go) ├── Multiplexed onto OS threads ├── Preemptive scheduling ├── Built-in channel communication └── Automatic multi-core utilization Tokio/async-std (Rust) ├── Multi-threaded runtime ├── Work-stealing scheduler ├── Zero-cost futures └── Explicit spawning for parallelism BEAM Processes (Elixir/Erlang) ├── Lightweight isolated processes ├── Message passing only ├── Preemptive scheduling └── Fault tolerance built-in
Async/Await Translation
Basic Async Function
TypeScript:
async function fetchUser(id: string): Promise<User> { const response = await fetch(`/users/${id}`); return response.json(); }
Python:
async def fetch_user(id: str) -> User: async with httpx.AsyncClient() as client: response = await client.get(f"/users/{id}") return User(**response.json())
Rust:
async fn fetch_user(id: &str) -> Result<User, Error> { let response = reqwest::get(format!("/users/{}", id)).await?; let user: User = response.json().await?; Ok(user) }
Go:
// Go doesn't have async/await - use goroutines + channels func fetchUser(id string) (*User, error) { resp, err := http.Get(fmt.Sprintf("/users/%s", id)) if err != nil { return nil, err } defer resp.Body.Close() var user User err = json.NewDecoder(resp.Body).Decode(&user) return &user, err }
Parallel Execution
Promise.all / join!
TypeScript:
const [users, orders] = await Promise.all([ fetchUsers(), fetchOrders() ]);
Python:
import asyncio users, orders = await asyncio.gather( fetch_users(), fetch_orders() )
Rust:
let (users, orders) = tokio::join!( fetch_users(), fetch_orders() ); // Or with try_join for Result types let (users, orders) = tokio::try_join!( fetch_users(), fetch_orders() )?;
Go:
var wg sync.WaitGroup var users []User var orders []Order var usersErr, ordersErr error wg.Add(2) go func() { defer wg.Done() users, usersErr = fetchUsers() }() go func() { defer wg.Done() orders, ordersErr = fetchOrders() }() wg.Wait()
Race / select
TypeScript:
const result = await Promise.race([ fetchFromPrimary(), fetchFromBackup() ]);
Python:
done, pending = await asyncio.wait( [fetch_from_primary(), fetch_from_backup()], return_when=asyncio.FIRST_COMPLETED ) result = done.pop().result() for task in pending: task.cancel()
Rust:
tokio::select! { result = fetch_from_primary() => result, result = fetch_from_backup() => result, }
Go:
select { case result := <-primaryCh: return result case result := <-backupCh: return result }
Channel Patterns
Basic Channel Usage
Go (native channels):
// Unbuffered channel ch := make(chan int) // Send go func() { ch <- 42 }() // Receive value := <-ch // Buffered channel buffered := make(chan int, 10)
Rust (mpsc):
use tokio::sync::mpsc; // Create channel let (tx, mut rx) = mpsc::channel(32); // Send tokio::spawn(async move { tx.send(42).await.unwrap(); }); // Receive while let Some(value) = rx.recv().await { println!("Received: {}", value); }
Python (asyncio.Queue):
import asyncio queue = asyncio.Queue() # Send await queue.put(42) # Receive value = await queue.get()
TypeScript (no native channels):
// Use a library or implement with EventEmitter/streams import { Channel } from './channel'; const ch = new Channel<number>(); await ch.send(42); const value = await ch.receive();
Fan-out / Fan-in
Go:
func fanOut(input <-chan int, workers int) []<-chan int { outputs := make([]<-chan int, workers) for i := 0; i < workers; i++ { outputs[i] = worker(input) } return outputs } func fanIn(inputs ...<-chan int) <-chan int { output := make(chan int) var wg sync.WaitGroup for _, input := range inputs { wg.Add(1) go func(ch <-chan int) { defer wg.Done() for v := range ch { output <- v } }(input) } go func() { wg.Wait() close(output) }() return output }
Rust:
use tokio::sync::mpsc; use futures::stream::{self, StreamExt}; async fn fan_out<T: Send + 'static>( mut input: mpsc::Receiver<T>, workers: usize, ) -> Vec<mpsc::Receiver<T>> { // Implementation using multiple channels }
Cancellation Patterns
Timeout
TypeScript:
const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 5000); try { const result = await fetch(url, { signal: controller.signal }); clearTimeout(timeout); return result; } catch (err) { if (err.name === 'AbortError') { throw new Error('Request timed out'); } throw err; }
Python:
import asyncio try: result = await asyncio.wait_for(fetch_data(), timeout=5.0) except asyncio.TimeoutError: raise Exception("Request timed out")
Rust:
use tokio::time::{timeout, Duration}; match timeout(Duration::from_secs(5), fetch_data()).await { Ok(result) => result?, Err(_) => return Err(Error::Timeout), }
Go:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() result, err := fetchData(ctx) if err == context.DeadlineExceeded { return nil, errors.New("request timed out") }
Cancellation Token / Context
Go (Context):
func worker(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() default: // Do work } } } // Usage ctx, cancel := context.WithCancel(context.Background()) go worker(ctx) // Later... cancel()
Rust (CancellationToken):
use tokio_util::sync::CancellationToken; async fn worker(token: CancellationToken) { loop { tokio::select! { _ = token.cancelled() => { return; } _ = do_work() => {} } } } // Usage let token = CancellationToken::new(); tokio::spawn(worker(token.clone())); // Later... token.cancel();
TypeScript (AbortController):
async function worker(signal: AbortSignal): Promise<void> { while (!signal.aborted) { await doWork(); } } // Usage const controller = new AbortController(); worker(controller.signal); // Later... controller.abort();
Synchronization Primitives
Mutex
| Language | Type | Usage |
|---|---|---|
| TypeScript | N/A (single-threaded) | Use for async coordination |
| Python | | |
| Rust | | |
| Go | | |
Rust (async mutex):
use tokio::sync::Mutex; use std::sync::Arc; let data = Arc::new(Mutex::new(0)); let data_clone = data.clone(); tokio::spawn(async move { let mut guard = data_clone.lock().await; *guard += 1; });
Go:
var mu sync.Mutex var count int go func() { mu.Lock() defer mu.Unlock() count++ }()
Semaphore
Rust:
use tokio::sync::Semaphore; use std::sync::Arc; let semaphore = Arc::new(Semaphore::new(10)); // Max 10 concurrent async fn limited_task(sem: Arc<Semaphore>) { let _permit = sem.acquire().await.unwrap(); // Do work - permit released on drop }
Go:
// Using buffered channel as semaphore sem := make(chan struct{}, 10) func limitedTask() { sem <- struct{}{} // Acquire defer func() { <-sem }() // Release // Do work }
Python:
import asyncio semaphore = asyncio.Semaphore(10) async def limited_task(): async with semaphore: # Do work pass
Translation Patterns
Goroutine → Tokio Task
// Go go func() { result := doWork() resultCh <- result }()
// Rust tokio::spawn(async move { let result = do_work().await; tx.send(result).await.unwrap(); });
Promise → Future
// TypeScript function fetchData(): Promise<Data> { return new Promise((resolve, reject) => { // ... }); }
// Rust async fn fetch_data() -> Result<Data, Error> { // async fn returns impl Future automatically } // Or explicitly fn fetch_data() -> impl Future<Output = Result<Data, Error>> { async { // ... } }
Callback → Async/Await
// JavaScript callback function fetchData(callback) { http.get(url, (res) => { callback(null, res); }).on('error', callback); }
// TypeScript async async function fetchData(): Promise<Response> { return new Promise((resolve, reject) => { http.get(url, resolve).on('error', reject); }); }
Common Pitfalls
1. Blocking in Async Context
// ❌ Blocks the async runtime async fn bad() { std::thread::sleep(Duration::from_secs(1)); // Blocks! } // ✓ Use async sleep async fn good() { tokio::time::sleep(Duration::from_secs(1)).await; } // ✓ Or spawn_blocking for CPU-bound work async fn cpu_bound() { tokio::task::spawn_blocking(|| { heavy_computation() }).await.unwrap(); }
2. Deadlock with Channels
// ❌ Deadlock - unbuffered channel, same goroutine ch := make(chan int) ch <- 42 // Blocks forever - no receiver val := <-ch // ✓ Use goroutine ch := make(chan int) go func() { ch <- 42 }() val := <-ch
3. Forgetting to Close Channels
// ❌ Receiver blocks forever ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- i } // Forgot to close! }() for v := range ch { // Blocks after 10 values fmt.Println(v) } // ✓ Close when done go func() { defer close(ch) for i := 0; i < 10; i++ { ch <- i } }()
4. Shared State Without Synchronization
// ❌ Data race let mut data = vec![]; for i in 0..10 { tokio::spawn(async move { data.push(i); // Cannot borrow mutably! }); } // ✓ Use Arc<Mutex<T>> let data = Arc::new(Mutex::new(vec![])); for i in 0..10 { let data = data.clone(); tokio::spawn(async move { data.lock().await.push(i); }); }
Best Practices
- Prefer message passing over shared state when possible
- Use structured concurrency - parent tasks own child tasks
- Always handle cancellation - provide clean shutdown paths
- Avoid blocking in async contexts
- Limit concurrency with semaphores for resource-intensive operations
- Close channels when done sending
- Use timeouts for all external operations
- Test concurrent code with race detectors (
, ThreadSanitizer)go test -race
Related Skills
- Code conversion patternsmeta-convert-dev
- Async decorators/macrospatterns-metaprogramming-dev
skills - Language-specific concurrency detailslang-*-dev