Ai go-concurrency
Production Go concurrency patterns — goroutines, channels, sync primitives, context, worker pools, pipelines, and graceful shutdown. Use when building concurrent Go applications or debugging race conditions.
install
source · Clone the upstream repo
git clone https://github.com/wpank/ai
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/wpank/ai "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/backend/go-concurrency" ~/.claude/skills/wpank-ai-go-concurrency && rm -rf "$T"
manifest:
skills/backend/go-concurrency/SKILL.mdsource content
Go Concurrency Patterns
Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management.
Installation
OpenClaw / Moltbot / Clawbot
npx clawhub@latest install go-concurrency
When to Use
- Building concurrent Go applications
- Implementing worker pools and pipelines
- Managing goroutine lifecycles and cancellation
- Debugging race conditions
- Implementing graceful shutdown
Concurrency Primitives
| Primitive | Purpose | When to Use |
|---|---|---|
| Lightweight concurrent execution | Any concurrent work |
| Communication between goroutines | Passing data, signaling |
| Multiplex channel operations | Waiting on multiple channels |
| Mutual exclusion | Protecting shared state |
| Wait for goroutines to complete | Coordinating goroutine completion |
| Cancellation and deadlines | Request-scoped lifecycle management |
| Concurrent tasks with errors | Parallel work that can fail |
Go Concurrency Mantra: Don't communicate by sharing memory; share memory by communicating.
Quick Start
func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() results := make(chan string, 10) var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() select { case <-ctx.Done(): return case results <- fmt.Sprintf("Worker %d done", id): } }(i) } go func() { wg.Wait(); close(results) }() for result := range results { fmt.Println(result) } }
Pattern 1: Worker Pool
type Job struct { ID int Data string } type Result struct { JobID int Output string Err error } func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result { results := make(chan Result) var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for job := range jobs { select { case <-ctx.Done(): return default: results <- Result{ JobID: job.ID, Output: fmt.Sprintf("Processed: %s", job.Data), } } } }() } go func() { wg.Wait(); close(results) }() return results } // Usage func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() jobs := make(chan Job, 100) go func() { for i := 0; i < 50; i++ { jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)} } close(jobs) }() for result := range WorkerPool(ctx, 5, jobs) { fmt.Printf("Result: %+v\n", result) } }
Pattern 2: Fan-Out / Fan-In Pipeline
// Stage 1: Generate values func generate(ctx context.Context, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case <-ctx.Done(): return case out <- n: } } }() return out } // Stage 2: Transform (run multiple instances for fan-out) func square(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case <-ctx.Done(): return case out <- n * n: } } }() return out } // Fan-in: Merge multiple channels into one func merge(ctx context.Context, channels ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) wg.Add(len(channels)) for _, ch := range channels { go func(c <-chan int) { defer wg.Done() for n := range c { select { case <-ctx.Done(): return case out <- n: } } }(ch) } go func() { wg.Wait(); close(out) }() return out } // Usage: fan out to 3 squarers, fan in results func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) c1 := square(ctx, in) c2 := square(ctx, in) c3 := square(ctx, in) for result := range merge(ctx, c1, c2, c3) { fmt.Println(result) } }
Pattern 3: errgroup with Cancellation
import "golang.org/x/sync/errgroup" func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) results := make([]string, len(urls)) for i, url := range urls { i, url := i, url g.Go(func() error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return fmt.Errorf("creating request for %s: %w", url, err) } resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("fetching %s: %w", url, err) } defer resp.Body.Close() results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode) return nil }) } if err := g.Wait(); err != nil { return nil, err // First error cancels all others via ctx } return results, nil } // With concurrency limit func fetchWithLimit(ctx context.Context, urls []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) // Max concurrent goroutines results := make([]string, len(urls)) for i, url := range urls { i, url := i, url g.Go(func() error { result, err := fetchURL(ctx, url) if err != nil { return err } results[i] = result return nil }) } return results, g.Wait() }
Pattern 4: Bounded Concurrency (Semaphore)
import "golang.org/x/sync/semaphore" type RateLimitedWorker struct { sem *semaphore.Weighted } func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker { return &RateLimitedWorker{sem: semaphore.NewWeighted(maxConcurrent)} } func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error { var ( wg sync.WaitGroup mu sync.Mutex errors []error ) for _, task := range tasks { if err := w.sem.Acquire(ctx, 1); err != nil { return []error{err} } wg.Add(1) go func(t func() error) { defer wg.Done() defer w.sem.Release(1) if err := t(); err != nil { mu.Lock() errors = append(errors, err) mu.Unlock() } }(task) } wg.Wait() return errors } // Simpler alternative: channel-based semaphore type Semaphore chan struct{} func NewSemaphore(n int) Semaphore { return make(chan struct{}, n) } func (s Semaphore) Acquire() { s <- struct{}{} } func (s Semaphore) Release() { <-s }
Pattern 5: Graceful Shutdown
func main() { ctx, cancel := context.WithCancel(context.Background()) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) server := NewServer() server.Start(ctx) sig := <-sigCh fmt.Printf("Received signal: %v\n", sig) cancel() // Cancel context to stop all workers server.Shutdown(5 * time.Second) } type Server struct { wg sync.WaitGroup } func (s *Server) Start(ctx context.Context) { for i := 0; i < 5; i++ { s.wg.Add(1) go s.worker(ctx, i) } } func (s *Server) worker(ctx context.Context, id int) { defer s.wg.Done() ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): fmt.Printf("Worker %d cleaning up...\n", id) return case <-ticker.C: fmt.Printf("Worker %d working...\n", id) } } } func (s *Server) Shutdown(timeout time.Duration) { done := make(chan struct{}) go func() { s.wg.Wait(); close(done) }() select { case <-done: fmt.Println("Clean shutdown completed") case <-time.After(timeout): fmt.Println("Shutdown timed out, forcing exit") } }
Pattern 6: Concurrent Map
// sync.Map: optimized for read-heavy workloads with stable keys type Cache struct { m sync.Map } func (c *Cache) Get(key string) (any, bool) { return c.m.Load(key) } func (c *Cache) Set(key string, value any) { c.m.Store(key, value) } func (c *Cache) GetOrSet(key string, val any) (any, bool) { return c.m.LoadOrStore(key, val) } // ShardedMap: better for write-heavy workloads type ShardedMap struct { shards []*shard numShards int } type shard struct { sync.RWMutex data map[string]any } func NewShardedMap(n int) *ShardedMap { m := &ShardedMap{shards: make([]*shard, n), numShards: n} for i := range m.shards { m.shards[i] = &shard{data: make(map[string]any)} } return m } func (m *ShardedMap) getShard(key string) *shard { h := 0 for _, c := range key { h = 31*h + int(c) } return m.shards[h%m.numShards] } func (m *ShardedMap) Get(key string) (any, bool) { s := m.getShard(key) s.RLock() defer s.RUnlock() v, ok := s.data[key] return v, ok } func (m *ShardedMap) Set(key string, value any) { s := m.getShard(key) s.Lock() defer s.Unlock() s.data[key] = value }
When to use which:
— Few keys, many reads, keys added once and rarely deletedsync.Map
— Many keys, frequent writes, need predictable performanceShardedMap
Select Patterns
// Timeout select { case v := <-ch: fmt.Println("Received:", v) case <-time.After(time.Second): fmt.Println("Timeout!") } // Non-blocking send/receive select { case ch <- 42: fmt.Println("Sent") default: fmt.Println("Channel full, skipping") } // Priority select: check high-priority first for { select { case msg := <-highPriority: handle(msg) default: select { case msg := <-highPriority: handle(msg) case msg := <-lowPriority: handle(msg) } } }
Race Detection
go test -race ./... # Tests with race detector go build -race . # Build with race detector go run -race main.go # Run with race detector
Best Practices
Do:
- Use
for cancellation and deadlines on every goroutinecontext.Context - Close channels from the sender side only
- Use
for concurrent operations that return errorserrgroup - Buffer channels when count is known upfront
- Prefer channels over mutexes for coordination
- Always run tests with
-race
Don't:
- Leak goroutines — every goroutine must have an exit path
- Close a channel from the receiver — causes panic
- Use
for synchronization — use proper primitivestime.Sleep - Ignore
in long-running goroutinesctx.Done() - Share memory without synchronization — use channels or mutexes
NEVER Do
- NEVER close a channel from the receiver — Only the sender should close; receivers panic on closed channels
- NEVER send on a closed channel — Causes panic; design so sender controls close
- NEVER use unbounded goroutine spawning — Use worker pools or semaphores for bounded concurrency
- NEVER ignore the
flag in testing — Data races are silent bugs that corrupt state-race - NEVER pass pointers to loop variables into goroutines — Capture the value or use index closure pattern
- NEVER use
as synchronization — Use channels, WaitGroups, or contexttime.Sleep