Claude-skill-registry effect-match-patterns
Effect.Match for discriminated unions, Queue/PubSub for concurrency, Fiber lifecycle, and HashMap for registries. Covers patterns missing from effect-patterns skill.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/effect-match-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-effect-match-patterns && rm -rf "$T"
skills/data/effect-match-patterns/SKILL.mdEffect Match, Queue, PubSub, Fiber, HashMap Patterns
Overview
This skill covers advanced Effect-TS patterns not in the core
effect-patterns skill:
- Match — Type-safe pattern matching with exhaustiveness checking
- Queue — Bounded/unbounded async queues for producer-consumer patterns
- PubSub — One-to-many event broadcasting
- Fiber — Lightweight concurrency (fork, join, interrupt)
- HashMap — Immutable hash maps for registries
Canonical Sources
Effect-TS Core
- Submodule:
../../submodules/effect/packages/effect/src/ - Tests:
../../submodules/effect/packages/effect/test/ - DeepWiki: Query
for verificationEffect-TS/effect
TMNL Implementations
- ChannelService —
(Queue, PubSub, Fiber, HashMap)src/lib/streams/constructs/ChannelService.ts - Feed —
(Fiber lifecycle)src/lib/streams/constructs/Feed.ts - TokenRegistry —
(HashMap)src/lib/primitives/TokenRegistry/TokenRegistry.ts
Pattern 1: Match — TYPE-SAFE PATTERN MATCHING
When: Pattern matching on discriminated unions with exhaustiveness checking.
Basic Match with Tags
import { Match } from 'effect' type Event = | { readonly _tag: 'Fetch' } | { readonly _tag: 'Success'; readonly data: string } | { readonly _tag: 'Error'; readonly error: Error } | { readonly _tag: 'Cancel' } const handleEvent = Match.type<Event>().pipe( Match.tag('Fetch', () => 'Fetching...'), Match.tag('Success', (e) => `Got: ${e.data}`), Match.tag('Error', (e) => `Error: ${e.error.message}`), Match.tag('Cancel', () => 'Cancelled'), Match.exhaustive // Compile error if case missing ) // Usage const result = handleEvent({ _tag: 'Success', data: 'Hello' }) // 'Got: Hello'
Match.tagsExhaustive — OBJECT SYNTAX
When: Prefer object syntax for all cases at once.
import { Match, pipe } from 'effect' type Shape = | { _tag: 'Circle'; radius: number } | { _tag: 'Square'; side: number } | { _tag: 'Rectangle'; width: number; height: number } const area = pipe( Match.type<Shape>(), Match.tagsExhaustive({ Circle: (s) => Math.PI * s.radius ** 2, Square: (s) => s.side ** 2, Rectangle: (s) => s.width * s.height, }) ) area({ _tag: 'Circle', radius: 5 }) // ~78.54
Match with Schema.TaggedStruct
When: Pattern matching on Effect Schema types.
import { Schema } from 'effect' import { Match, pipe } from 'effect' // Schema definitions const PointerDown = Schema.TaggedStruct('PointerDown', { x: Schema.Number, y: Schema.Number, }) const PointerUp = Schema.TaggedStruct('PointerUp', { x: Schema.Number, y: Schema.Number, }) const PointerEvent = Schema.Union(PointerDown, PointerUp) type PointerEvent = typeof PointerEvent.Type // Pattern matching const describe = pipe( Match.type<PointerEvent>(), Match.tagsExhaustive({ PointerDown: (e) => `Down at (${e.x}, ${e.y})`, PointerUp: (e) => `Up at (${e.x}, ${e.y})`, }) )
Match.discriminator — CUSTOM DISCRIMINANT
When: Discriminated union uses field other than
_tag.
type Message = | { kind: 'text'; content: string } | { kind: 'image'; url: string } | { kind: 'video'; url: string; duration: number } const handle = pipe( Match.type<Message>(), Match.discriminator('kind')('text', (m) => m.content), Match.discriminator('kind')('image', (m) => `Image: ${m.url}`), Match.discriminator('kind')('video', (m) => `Video: ${m.url} (${m.duration}s)`), Match.exhaustive )
Pattern 2: Queue — ASYNC PRODUCER-CONSUMER
When: Async communication between producers and consumers.
Bounded Queue
import { Effect, Queue } from 'effect' const program = Effect.gen(function* () { // Create bounded queue (capacity 10) const queue = yield* Queue.bounded<string>(10) // Producer yield* Queue.offer(queue, 'message-1') yield* Queue.offer(queue, 'message-2') // Consumer const msg1 = yield* Queue.take(queue) // 'message-1' const msg2 = yield* Queue.take(queue) // 'message-2' return [msg1, msg2] })
Queue Strategies
| Strategy | Behavior when full |
|---|---|
| suspends until space |
| Drops new elements |
| Drops oldest elements |
| Never full (grows indefinitely) |
Queue in Service Pattern
TMNL Example — ChannelService (
src/lib/streams/constructs/ChannelService.ts:72-73):
export interface ChannelInstance { readonly state: ChannelState readonly fibers: HashMap.HashMap<string, Fiber.RuntimeFiber<void, unknown>> readonly subscriptions: HashMap.HashMap<string, Queue.Dequeue<unknown>> }
Pattern 3: PubSub — ONE-TO-MANY BROADCASTING
When: Single publisher, multiple subscribers each receive all messages.
Basic PubSub
import { Effect, PubSub, Queue } from 'effect' const program = Effect.gen(function* () { // Create PubSub const pubsub = yield* PubSub.bounded<string>(100) // Subscribe (each subscriber gets own Queue) const sub1 = yield* PubSub.subscribe(pubsub) const sub2 = yield* PubSub.subscribe(pubsub) // Publish yield* PubSub.publish(pubsub, 'hello') // Both subscribers receive const msg1 = yield* Queue.take(sub1) // 'hello' const msg2 = yield* Queue.take(sub2) // 'hello' })
PubSub for Event Dispatch
TMNL Pattern — Command/Event channels:
// From ChannelService concept interface ChannelServiceShape { readonly commandPubSub: PubSub.PubSub<ChannelCommand> readonly eventPubSub: PubSub.PubSub<ChannelEvent> // Send command to all listeners readonly dispatch: (cmd: ChannelCommand) => Effect.Effect<void> // Subscribe to events readonly onEvent: <T extends ChannelEvent['_tag']>( tag: T, handler: (event: Extract<ChannelEvent, { _tag: T }>) => Effect.Effect<void> ) => Effect.Effect<void, never, Scope.Scope> }
PubSub Strategies
| Strategy | Behavior |
|---|---|
| Backpressure on publishers |
| Drops new messages if full |
| Drops old messages |
| Never full |
Pattern 4: Fiber — LIGHTWEIGHT CONCURRENCY
When: Run Effects concurrently, manage lifecycle (cancel, await).
Fork and Join
import { Effect, Fiber } from 'effect' const program = Effect.gen(function* () { // Fork runs Effect concurrently, returns immediately const fiber = yield* Effect.fork( Effect.delay(Effect.succeed('result'), '1 second') ) // Do other work... yield* Effect.log('Doing other work') // Wait for fiber to complete const result = yield* Fiber.join(fiber) // 'result' })
Interrupt
const program = Effect.gen(function* () { const fiber = yield* Effect.fork( Effect.forever(Effect.log('Running...')) ) // Let it run briefly yield* Effect.sleep('100 millis') // Interrupt (cancel) yield* Fiber.interrupt(fiber) })
Fiber Registry Pattern
TMNL Example — Feed lifecycle (
src/lib/streams/constructs/Feed.ts):
interface FeedInstance { readonly state: FeedState readonly fiber: Fiber.RuntimeFiber<void, unknown> | null } // Start feed (fork emission fiber) const start = (feedId: FeedId): Effect.Effect<void> => Effect.gen(function* () { const feed = yield* getFeed(feedId) const emissionFiber = yield* Effect.fork(runEmission(feed)) // Store fiber reference for later interrupt yield* updateFeed(feedId, { ...feed, fiber: emissionFiber, }) }) // Stop feed (interrupt fiber) const stop = (feedId: FeedId): Effect.Effect<void> => Effect.gen(function* () { const feed = yield* getFeed(feedId) if (feed.fiber) { yield* Fiber.interrupt(feed.fiber) } })
Scoped Fiber (Auto-Cleanup)
When: Fiber should be interrupted when scope closes.
import { Effect, Scope } from 'effect' const program = Effect.gen(function* () { yield* Effect.scoped( Effect.gen(function* () { // Fiber auto-interrupted when scope closes yield* Effect.forkScoped( Effect.forever(Effect.log('Running...')) ) yield* Effect.sleep('100 millis') }) ) // Fiber automatically interrupted here })
Pattern 5: HashMap — IMMUTABLE REGISTRIES
When: Need immutable map for service state (registries, caches).
Basic HashMap
import { HashMap } from 'effect' // Create let map = HashMap.empty<string, number>() // Set (returns new map) map = HashMap.set(map, 'a', 1) map = HashMap.set(map, 'b', 2) // Get const a = HashMap.get(map, 'a') // Option.some(1) const c = HashMap.get(map, 'c') // Option.none() // Size HashMap.size(map) // 2
HashMap in Ref (Mutable Registry)
TMNL Pattern — ChannelService registry:
import { Effect, Ref, HashMap, Option } from 'effect' interface RegistryService<K, V> { readonly get: (key: K) => Effect.Effect<Option.Option<V>> readonly set: (key: K, value: V) => Effect.Effect<void> readonly remove: (key: K) => Effect.Effect<void> readonly entries: Effect.Effect<ReadonlyArray<readonly [K, V]>> } const makeRegistry = <K, V>(): Effect.Effect<RegistryService<K, V>> => Effect.gen(function* () { const ref = yield* Ref.make(HashMap.empty<K, V>()) return { get: (key) => Ref.get(ref).pipe(Effect.map((m) => HashMap.get(m, key))), set: (key, value) => Ref.update(ref, (m) => HashMap.set(m, key, value)), remove: (key) => Ref.update(ref, (m) => HashMap.remove(m, key)), entries: Ref.get(ref).pipe(Effect.map(HashMap.toEntries)), } })
HashMap with Fiber References
TMNL Example — ChannelInstance (
src/lib/streams/constructs/ChannelService.ts:69-73):
export interface ChannelInstance { readonly state: ChannelState readonly fibers: HashMap.HashMap<string, Fiber.RuntimeFiber<void, unknown>> readonly subscriptions: HashMap.HashMap<string, Queue.Dequeue<unknown>> } const makeInstance = (state: ChannelState): ChannelInstance => ({ state, fibers: HashMap.empty(), subscriptions: HashMap.empty(), })
Anti-Patterns
1. Missing Exhaustiveness Check
// WRONG — No compile error if case missing const handle = Match.type<Event>().pipe( Match.tag('Fetch', () => 'Fetching'), Match.orElse(() => 'Unknown'), // Catches all — hides missing cases ) // CORRECT — Compile error if case missing const handle = Match.type<Event>().pipe( Match.tag('Fetch', () => 'Fetching'), Match.tag('Success', (e) => e.data), Match.tag('Error', (e) => e.error.message), Match.tag('Cancel', () => 'Cancelled'), Match.exhaustive, // Enforces all cases )
2. Unbounded Queue Memory Leak
// WRONG — Queue grows forever if consumer is slow const queue = yield* Queue.unbounded<Event>() // CORRECT — Bounded with backpressure or sliding const queue = yield* Queue.bounded<Event>(1000) // or const queue = yield* Queue.sliding<Event>(1000) // Drop old if full
3. Fiber Leak (No Interrupt)
// WRONG — Fiber runs forever, never cleaned up const fiber = yield* Effect.fork(Effect.forever(...)) // Fiber keeps running even after parent exits // CORRECT — Use forkScoped or manual interrupt yield* Effect.scoped( Effect.forkScoped(Effect.forever(...)) ) // Auto-interrupted on scope close // OR const fiber = yield* Effect.fork(...) yield* Effect.ensuring( doWork, Fiber.interrupt(fiber) // Always interrupt on exit )
4. HashMap Mutation (Wrong Mental Model)
// WRONG — HashMap is immutable, this doesn't work const map = HashMap.empty<string, number>() HashMap.set(map, 'a', 1) // Returns NEW map, original unchanged console.log(HashMap.get(map, 'a')) // None! // CORRECT — Capture returned map let map = HashMap.empty<string, number>() map = HashMap.set(map, 'a', 1) console.log(HashMap.get(map, 'a')) // Some(1)
Decision Tree: When to Use Each
Need async communication? │ ├─ One producer, one consumer? │ └─ Use: Queue.bounded() or Queue.unbounded() │ ├─ One producer, many consumers (each gets ALL messages)? │ └─ Use: PubSub (each subscriber gets own Queue) │ ├─ Pattern matching on discriminated union? │ ├─ Want exhaustiveness check? │ │ └─ Use: Match.exhaustive or Match.tagsExhaustive │ └─ OK with fallback? │ └─ Use: Match.orElse │ ├─ Need concurrent execution? │ ├─ Fire and forget? │ │ └─ Use: Effect.fork (but manage lifecycle!) │ ├─ Need result later? │ │ └─ Use: Effect.fork + Fiber.join │ └─ Auto-cleanup on scope exit? │ └─ Use: Effect.forkScoped │ └─ Need registry/map state? └─ Use: Ref.make(HashMap.empty<K, V>()) with HashMap.set/get/remove
File Locations Summary
| Pattern | File | Lines | Usage |
|---|---|---|---|
| Queue + PubSub | | 16-54 | Event dispatch |
| Fiber registry | | 69-79 | Lifecycle management |
| HashMap registry | | — | Token storage |
| Fiber lifecycle | | — | Emission control |
Integration Points
- effect-patterns — Service/Layer patterns that use these primitives
- effect-stream-patterns — Streams that produce to Queues/PubSub
- effect-atom-integration — Atoms that wrap Queue/PubSub state
- xstate-integration — Machine transitions that dispatch via PubSub