Claude-skill-registry effect-concurrency
Use when Effect concurrency patterns including fibers, fork, join, parallel execution, and race conditions. Use for concurrent operations in Effect applications.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/effect-concurrency" ~/.claude/skills/majiayu000-claude-skill-registry-effect-concurrency && rm -rf "$T"
skills/data/effect-concurrency/SKILL.mdEffect Concurrency
Master concurrent execution in Effect using fibers. This skill covers forking, joining, interruption, parallel execution, and advanced concurrency patterns for building high-performance Effect applications.
Fibers Fundamentals
What are Fibers?
Fibers are lightweight virtual threads that execute effects concurrently:
import { Effect, Fiber } from "effect" // Every effect runs on a fiber const effect = Effect.succeed(42) // When run, this executes on a fiber // Effects are descriptions - fibers are executions // Effect: lazy, immutable description // Fiber: running execution with state
Forking Effects
Create independent concurrent fibers:
import { Effect, Fiber } from "effect" const task = Effect.gen(function* () { yield* Effect.sleep("1 second") yield* Effect.log("Task completed") return 42 }) const program = Effect.gen(function* () { // Fork creates a new fiber const fiber = yield* Effect.fork(task) // fiber: RuntimeFiber<number, never> yield* Effect.log("Main fiber continues") // Join waits for fiber to complete const result = yield* Fiber.join(fiber) yield* Effect.log(`Result: ${result}`) return result })
Fiber Operations
import { Effect, Fiber } from "effect" const program = Effect.gen(function* () { const fiber = yield* Effect.fork(longRunningTask) // Join - wait for result const result = yield* Fiber.join(fiber) // Await - get Exit value (success/failure/interruption) const exit = yield* Fiber.await(fiber) // Interrupt - cancel execution yield* Fiber.interrupt(fiber) // Poll - check if complete (non-blocking) const status = yield* Fiber.poll(fiber) })
Parallel Execution
Effect.all - Run Multiple Effects
import { Effect } from "effect" // Parallel execution (default) const program = Effect.gen(function* () { const results = yield* Effect.all([ fetchUser("1"), fetchUser("2"), fetchUser("3") ]) // All requests run concurrently return results }) // Sequential execution const sequential = Effect.gen(function* () { const results = yield* Effect.all([ fetchUser("1"), fetchUser("2"), fetchUser("3") ], { concurrency: 1 }) return results }) // Limited concurrency const limited = Effect.gen(function* () { const results = yield* Effect.all( Array.from({ length: 100 }, (_, i) => fetchUser(`${i}`)), { concurrency: 10 } // Max 10 concurrent ) return results })
Effect.all with Batching
import { Effect } from "effect" // Batching for efficiency const batchFetch = Effect.gen(function* () { const userIds = Array.from({ length: 1000 }, (_, i) => `${i}`) const results = yield* Effect.all( userIds.map(id => fetchUser(id)), { concurrency: 50, // 50 concurrent requests batching: true // Enable batching optimization } ) return results })
Effect.forEach - Concurrent Iteration
import { Effect } from "effect" const processUsers = (userIds: string[]) => Effect.forEach( userIds, (id) => Effect.gen(function* () { const user = yield* fetchUser(id) const processed = yield* processUser(user) return processed }), { concurrency: "unbounded" } // No limit ) // With concurrency limit const processUsersLimited = (userIds: string[]) => Effect.forEach( userIds, (id) => processUser(id), { concurrency: 10 } )
Racing Effects
Effect.race - First to Complete
import { Effect } from "effect" const fetchWithFallback = (id: string) => Effect.race( fetchFromPrimaryDb(id), fetchFromSecondaryDb(id) ) // Returns whichever completes first // Racing multiple effects const fastestSource = Effect.race( fetchFromSource1(), fetchFromSource2(), fetchFromSource3() )
Effect.raceAll - Race Multiple Effects
import { Effect } from "effect" const sources = [ fetchFromSource1(), fetchFromSource2(), fetchFromSource3() ] // First to succeed wins const fastest = Effect.raceAll(sources)
Timeout Racing
import { Effect } from "effect" const withTimeout = <A, E, R>( effect: Effect.Effect<A, E, R>, duration: Duration.Duration ) => Effect.race( effect, Effect.sleep(duration).pipe( Effect.andThen(Effect.fail({ _tag: "Timeout" })) ) ) const program = Effect.gen(function* () { const result = yield* withTimeout( slowOperation(), Duration.seconds(5) ) return result })
Interruption
Fiber Interruption
import { Effect, Fiber } from "effect" const program = Effect.gen(function* () { const fiber = yield* Effect.fork(longRunningTask) // Cancel after 1 second yield* Effect.sleep("1 second") yield* Fiber.interrupt(fiber) yield* Effect.log("Task cancelled") }) // Automatic interruption on parent exit const autoInterrupt = Effect.gen(function* () { const fiber = yield* Effect.fork(infiniteLoop) // fiber will be interrupted when this effect completes })
Uninterruptible Regions
import { Effect } from "effect" const criticalSection = Effect.gen(function* () { // This region cannot be interrupted yield* Effect.uninterruptible( Effect.gen(function* () { yield* beginTransaction() yield* updateDatabase() yield* commitTransaction() }) ) }) // Interruptible regions within uninterruptible const mixed = Effect.uninterruptible( Effect.gen(function* () { yield* criticalOperation1() // Allow interruption here yield* Effect.interruptible( nonCriticalOperation() ) yield* criticalOperation2() }) )
Daemon Fibers
Fork Daemon - Independent Fibers
import { Effect } from "effect" const program = Effect.gen(function* () { // Regular fork - interrupted when parent exits const regularFiber = yield* Effect.fork(task) // Daemon fork - survives parent exit const daemonFiber = yield* Effect.forkDaemon(backgroundTask) // Parent exits, regularFiber interrupted, daemonFiber continues }) // Background worker example const startBackgroundWorker = Effect.gen(function* () { yield* Effect.forkDaemon( Effect.gen(function* () { while (true) { yield* processQueue() yield* Effect.sleep("1 second") } }) ) })
Scoped Concurrency
Effect.forkScoped - Fiber Cleanup
import { Effect, Scope } from "effect" const program = Effect.gen(function* () { yield* Effect.scoped( Effect.gen(function* () { // Fibers are tied to scope const fiber1 = yield* Effect.forkScoped(task1) const fiber2 = yield* Effect.forkScoped(task2) // Do work yield* doWork() // Scope exit automatically interrupts fibers }) ) // fiber1 and fiber2 are interrupted here })
Fork In Scope
import { Effect } from "effect" const managedConcurrency = Effect.gen(function* () { const scope = yield* Scope.make() // Fork in specific scope const fiber = yield* Effect.forkIn(task, scope) // Work continues yield* doWork() // Close scope, interrupt fiber yield* Scope.close(scope, Exit.succeed(undefined)) })
Advanced Patterns
Worker Pool
import { Effect, Queue } from "effect" interface Task { id: string data: unknown } const createWorkerPool = (workers: number) => Effect.gen(function* () { const queue = yield* Queue.bounded<Task>(100) // Start workers const workerFibers = yield* Effect.all( Array.from({ length: workers }, () => Effect.fork( Effect.forever( Effect.gen(function* () { const task = yield* Queue.take(queue) yield* processTask(task) }) ) ) ) ) return { submit: (task: Task) => Queue.offer(queue, task), shutdown: () => Effect.all( workerFibers.map(fiber => Fiber.interrupt(fiber)) ) } })
Parallel Map-Reduce
import { Effect, Chunk } from "effect" const parallelMapReduce = <A, B, E, R>( items: A[], map: (item: A) => Effect.Effect<B, E, R>, reduce: (acc: B, item: B) => B, initial: B, concurrency: number ) => Effect.gen(function* () { const mapped = yield* Effect.forEach( items, map, { concurrency } ) return mapped.reduce(reduce, initial) })
Request Deduplication
import { Effect, Request, RequestResolver } from "effect" interface GetUser extends Request.Request<User, UserNotFound> { readonly _tag: "GetUser" readonly id: string } const GetUserResolver = RequestResolver.makeBatched( (requests: GetUser[]) => Effect.gen(function* () { const ids = requests.map(r => r.id) const users = yield* fetchUsersBatch(ids) // Resolve all requests return Effect.forEach(requests, (request) => { const user = users.find(u => u.id === request.id) return user ? Request.complete(request, user) : Request.fail(request, { _tag: "UserNotFound", id: request.id }) }) }) ) // Multiple concurrent requests for same ID deduplicated const program = Effect.gen(function* () { const results = yield* Effect.all([ Effect.request(GetUser({ id: "1" }), GetUserResolver), Effect.request(GetUser({ id: "1" }), GetUserResolver), Effect.request(GetUser({ id: "1" }), GetUserResolver) ]) // Only one actual fetch for ID "1" })
Best Practices
-
Use Effect.all for Parallel Work: Don't fork manually when Effect.all suffices.
-
Limit Concurrency: Set appropriate concurrency limits to avoid resource exhaustion.
-
Handle Interruption: Ensure cleanup code runs in uninterruptible regions.
-
Use Scoped Forks: Tie fiber lifetime to scopes for automatic cleanup.
-
Avoid Infinite Loops: Use Effect.forever with sleep for background tasks.
-
Batch Requests: Use request resolvers to batch and deduplicate.
-
Timeout Long Operations: Add timeouts to prevent hanging.
-
Monitor Fiber Status: Use Fiber.await and Fiber.poll for status checks.
-
Use Daemon Sparingly: Only fork daemons when truly independent.
-
Test Concurrent Code: Write tests for race conditions and interruption.
Common Pitfalls
-
Forgetting to Join: Forking without joining loses results.
-
No Concurrency Limits: Unbounded concurrency can exhaust resources.
-
Not Handling Interruption: Missing cleanup in interruptible regions.
-
Race Conditions: Sharing mutable state between fibers.
-
Deadlocks: Circular dependencies between fibers.
-
Ignoring Failures: Not checking fiber exit status.
-
Memory Leaks: Daemon fibers that never terminate.
-
Over-Forking: Creating too many fibers unnecessarily.
-
Missing Timeouts: Long-running operations without limits.
-
Wrong Execution Mode: Using sequential when parallel is intended.
When to Use This Skill
Use effect-concurrency when you need to:
- Execute multiple operations in parallel
- Build high-performance data pipelines
- Handle concurrent user requests
- Implement background workers
- Race multiple data sources
- Add timeouts to operations
- Build concurrent job processors
- Manage fiber lifecycles
- Implement request deduplication
- Optimize throughput with batching
Resources
Official Documentation
Related Skills
- effect-core-patterns - Basic Effect operations
- effect-resource-management - Resource cleanup with scopes