Claude-skill-registry effect-stream-patterns
Stream creation, consumption, transformation. Stream.async, Stream.fromSchedule, Stream.runForEach. Progressive data patterns for Effect-TS streams.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/effect-stream-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-effect-stream-patterns && rm -rf "$T"
skills/data/effect-stream-patterns/SKILL.mdEffect Stream Patterns
Overview
Effect Streams are pull-based, lazy sequences of values. Unlike Observables (push-based), Streams are consumed on-demand and provide powerful composition primitives.
Key characteristics:
- Pull-based — Consumer drives execution
- Lazy — Only computes what's needed
- Chunked — Emits
for efficiencyChunk<A> - Effectful — Each element can be an
Effect - Composable — Rich operators for transformation
Use streams for:
- Progressive data loading — Display results as they arrive
- Infinite sequences — Tickers, pollers, event sources
- Resource-efficient processing — Process large datasets without loading all into memory
- Async iteration — Cleaner than manual async loops
Canonical Sources
Effect Stream Core
- Submodule:
../../submodules/effect/packages/effect/src/
— Stream constructorsStream.ts:52-70
— Stream.asyncStream.ts:316-362
(various) — Operators and consumersStream.ts
Effect Website Documentation
- Submodule:
../../submodules/website/content/src/content/docs/docs/stream/
— Stream constructorscreating.mdx
— Running streamsconsuming-streams.mdx
— Transformationsoperations.mdx
— Failure managementerror-handling.mdx
— Resource-managed streamsresourceful-streams.mdx
TMNL Battle-tested Implementations
- Progressive search —
(Stream with atom updates)src/lib/data-manager/v1/atoms/index.ts:206 - Stream-to-Atom —
(Reactive integration)src/lib/streams/atoms/streamToAtom.ts - Mock streams —
(Test utilities)src/lib/data-grid/mocking/stream.ts
Patterns
Decision Tree: Which Stream Pattern?
Need a stream? │ ├─ From existing data (array, iterable)? │ └─ Use: Stream.fromIterable(data) │ ├─ From async callback (WebSocket, EventSource)? │ └─ Use: Stream.async((emit) => { ... }) │ ├─ Ticking/polling at intervals? │ └─ Use: Stream.fromSchedule(Schedule.spaced(...)) │ ├─ Single effectful value? │ └─ Use: Stream.fromEffect(effect) │ ├─ Range of numbers? │ └─ Use: Stream.range(start, end) │ ├─ Infinite sequence? │ └─ Use: Stream.iterate(initial, fn) │ └─ From another stream with transformation? └─ Use: stream.pipe(Stream.map(...))
Pattern 1: Stream.fromIterable — FROM ARRAYS
When to use:
- Have existing array/iterable data
- Want to process data lazily
- Need to chunk large datasets
Signature:
Stream.fromIterable<A>(iterable: Iterable<A>): Stream.Stream<A>
Full Example:
import { Stream, Effect } from 'effect' // From array const numbers = Stream.fromIterable([1, 2, 3, 4, 5]) // Lazy processing (only computes when consumed) const doubled = numbers.pipe( Stream.map((n) => n * 2) ) // Consume const result = await Stream.runCollect(doubled).pipe(Effect.runPromise) console.log(result) // Chunk([2, 4, 6, 8, 10]) // From generator function* fibonacci() { let a = 0, b = 1 while (true) { yield a ;[a, b] = [b, a + b] } } const fibs = Stream.fromIterable(fibonacci()).pipe( Stream.take(10) )
Key Features:
- Lazy evaluation — Doesn't process until consumed
- Automatic chunking — Emits
for efficiencyChunk<A> - Supports generators — Works with any
Iterable
Pattern 2: Stream.async — FROM ASYNC CALLBACKS
When to use:
- Integrate push-based sources (WebSocket, EventSource, DOM events)
- Wrap callbacks into pull-based stream
- Need cleanup logic
Signature:
Stream.async<A, E>( register: (emit: Emit<A, E>) => Effect.Effect<void> | void, bufferSize?: number ): Stream.Stream<A, E>
Full Example:
import { Stream, Effect } from 'effect' // WebSocket stream const wsStream = Stream.async<string, Error>((emit) => { const ws = new WebSocket('wss://example.com/stream') ws.onmessage = (event) => { emit.single(event.data) // Emit single value } ws.onerror = (error) => { emit.fail(new Error('WebSocket error')) // Emit error } ws.onclose = () => { emit.end() // End stream } // Cleanup function return Effect.sync(() => { ws.close() }) }) // EventSource stream const sseStream = Stream.async<MessageEvent>((emit) => { const source = new EventSource('/api/events') source.onmessage = (event) => { emit.single(event) } source.onerror = () => { emit.fail(new Error('SSE error')) } return Effect.sync(() => { source.close() }) }) // DOM event stream const clickStream = Stream.async<MouseEvent>((emit) => { const handler = (event: MouseEvent) => { emit.single(event) } document.addEventListener('click', handler) return Effect.sync(() => { document.removeEventListener('click', handler) }) })
Emit API:
— Emit one valueemit.single(value)
— Emit multiple values at onceemit.chunk(chunk)
— Emit error and end streamemit.fail(error)
— End stream successfullyemit.end()
— Emit from Effectemit.fromEffect(effect)
— Emit chunk from Effectemit.fromEffectChunk(effect)
Key Features:
- Cleanup support — Return Effect for cleanup logic
- Buffer size — Control backpressure (default: 16)
- Error handling — Emit failures via
emit.fail
Pattern 3: Stream.fromSchedule — TICKING STREAMS
When to use:
- Poll at regular intervals
- Emit values on a schedule
- Implement retry logic with backoff
Signature:
Stream.fromSchedule<A>(schedule: Schedule.Schedule<A>): Stream.Stream<A>
Full Example:
import { Stream, Schedule, Effect } from 'effect' // Tick every second const ticker = Stream.fromSchedule(Schedule.spaced('1 second')) // Emit current time every second const clock = ticker.pipe( Stream.map(() => new Date()) ) // Exponential backoff ticker const backoff = Stream.fromSchedule( Schedule.exponential('100 millis').pipe( Schedule.compose(Schedule.recurs(5)) // Max 5 retries ) ) // Poll API every 5 seconds const pollApi = Stream.fromSchedule(Schedule.spaced('5 seconds')).pipe( Stream.mapEffect(() => Effect.tryPromise(() => fetch('/api/status').then(r => r.json())) ) ) // Consume with counter const counted = ticker.pipe( Stream.scan(0, (count) => count + 1), Stream.take(10) )
Common Schedules:
— Fixed intervalSchedule.spaced('1 second')
— Exponential backoffSchedule.exponential('100 millis')
— Fibonacci backoffSchedule.fibonacci('1 second')
— Limit number of emissionsSchedule.recurs(n)
Key Features:
- Schedule composition — Combine schedules with
.pipe - Automatic timing — No manual
setTimeout - Configurable — Durations, retry logic, jitter
Pattern 4: Stream.fromEffect — SINGLE EFFECTFUL VALUE
When to use:
- Stream from a single Effect
- Lift async operation into stream
- Compose with other streams
Signature:
Stream.fromEffect<A, E>(effect: Effect.Effect<A, E>): Stream.Stream<A, E>
Full Example:
import { Stream, Effect } from 'effect' // From Effect const userStream = Stream.fromEffect( Effect.tryPromise(() => fetch('/api/user').then(r => r.json()) ) ) // Compose multiple const combined = Stream.mergeAll( Stream.fromEffect(fetchUsers), Stream.fromEffect(fetchPosts), Stream.fromEffect(fetchComments) ) // Chain effectful streams const users = Stream.fromEffect(fetchUserIds).pipe( Stream.flatMap((id) => Stream.fromEffect(fetchUser(id)) ) )
Pattern 5: Stream Transformations
map — Transform each element:
const doubled = stream.pipe( Stream.map((n) => n * 2) )
mapEffect — Transform with Effect:
const validated = stream.pipe( Stream.mapEffect((item) => Effect.tryPromise(() => validateItem(item)) ) )
filter — Keep matching elements:
const evens = stream.pipe( Stream.filter((n) => n % 2 === 0) )
flatMap — Transform to stream and flatten:
const expanded = stream.pipe( Stream.flatMap((item) => Stream.fromIterable(item.children) ) )
take — Limit number of elements:
const first10 = stream.pipe( Stream.take(10) )
drop — Skip first N elements:
const afterFirst5 = stream.pipe( Stream.drop(5) )
scan — Accumulate state:
const cumulative = stream.pipe( Stream.scan(0, (sum, n) => sum + n) )
rechunk — Change chunk size:
const batched = stream.pipe( Stream.rechunk(50) // Emit in chunks of 50 )
debounce — Emit only after quiet period:
const debounced = stream.pipe( Stream.debounce('500 millis') )
throttle — Limit emission rate:
const throttled = stream.pipe( Stream.throttle({ cost: () => 1, units: 10, duration: '1 second' }) )
Pattern 6: Stream Consumers
Stream.runCollect — Collect all elements:
const result = await Stream.runCollect(stream).pipe(Effect.runPromise) console.log(result) // Chunk([...])
Stream.runForEach — Side effect per element:
await Stream.runForEach(stream, (item) => Effect.sync(() => console.log(item)) ).pipe(Effect.runPromise)
Stream.runFold — Reduce to single value:
const sum = await Stream.runFold(stream, 0, (acc, n) => acc + n) .pipe(Effect.runPromise)
Stream.runDrain — Run and discard results:
await Stream.runDrain(stream).pipe(Effect.runPromise)
Stream.runIntoQueue — Push to Queue:
import { Queue } from 'effect' const queue = await Queue.unbounded<number>().pipe(Effect.runPromise) await Stream.runIntoQueue(stream, queue).pipe(Effect.runPromise)
Pattern 7: Progressive Accumulation (TMNL Pattern)
When to use:
- Display search results as they arrive
- Progressive UI updates
- Combine Stream with effect-atom
Full Example:
import { Atom } from '@effect-atom/atom-react' import { Stream, Effect } from 'effect' // State atoms const resultsAtom = Atom.make<SearchResult[]>([]) const statusAtom = Atom.make<'idle' | 'streaming' | 'complete'>('idle') // Operation atom with progressive stream const searchOp = runtimeAtom.fn<string>()((query, ctx) => Effect.gen(function* () { // Create stream const stream = yield* SearchKernel.pipe( Effect.flatMap((k) => k.searchStream(query)) ) // Initialize state ctx.set(statusAtom, 'streaming') ctx.set(resultsAtom, []) // Consume stream progressively yield* Stream.runForEach(stream, (item) => Effect.sync(() => { const prev = ctx.get(resultsAtom) ctx.set(resultsAtom, [...prev, item]) }) ) // Finalize ctx.set(statusAtom, 'complete') }) ) // React component function SearchResults() { const results = useAtomValue(resultsAtom) const status = useAtomValue(statusAtom) return ( <div> {status === 'streaming' && <Spinner />} <List items={results} /> </div> ) }
Key Pattern:
- Create state atoms at module level
- Stream emits chunks progressively
updates atoms viaStream.runForEachctx.set- React re-renders on each atom update
- UI shows progressive results
TMNL Example (
src/lib/data-manager/v1/atoms/index.ts:206):
export const doSearch = runtimeAtom.fn<{ query: string; limit: number }>()( ({ query, limit }, ctx) => Effect.gen(function* () { const dm = yield* DataManager const stream = yield* dm.searchStream(query, limit) ctx.set(statusAtom, 'streaming') ctx.set(resultsAtom, []) yield* Stream.runForEach(stream, (result) => Effect.sync(() => { const prev = ctx.get(resultsAtom) ctx.set(resultsAtom, [...prev, result]) }) ) ctx.set(statusAtom, 'complete') }) )
Pattern 8: Error Handling
catchAll — Recover from errors:
const recovered = stream.pipe( Stream.catchAll((error) => Stream.succeed({ error: error.message }) ) )
retry — Retry on failure:
const retried = stream.pipe( Stream.retry(Schedule.exponential('100 millis').pipe( Schedule.compose(Schedule.recurs(3)) )) )
orElse — Fallback stream:
const withFallback = primaryStream.pipe( Stream.orElse(() => fallbackStream) )
Pattern 9: Stream Merging & Combining
mergeAll — Merge multiple streams:
const merged = Stream.mergeAll( stream1, stream2, stream3 )
concat — Concatenate streams:
const concatenated = stream1.pipe( Stream.concat(stream2) )
zip — Combine elements pairwise:
const zipped = stream1.pipe( Stream.zip(stream2) ) // Emits: [a1, b1], [a2, b2], ...
interleave — Alternate between streams:
const interleaved = stream1.pipe( Stream.interleave(stream2) ) // Emits: a1, b1, a2, b2, a3, b3, ...
Pattern 10: Resource-Managed Streams
Stream.acquireRelease — Managed resources:
const fileStream = Stream.acquireRelease( Effect.tryPromise(() => fs.open('file.txt')), (handle) => Effect.sync(() => handle.close()) ).pipe( Stream.flatMap((handle) => Stream.fromIterable(handle.readLines()) ) )
Stream.ensuring — Run effect on completion:
const logged = stream.pipe( Stream.ensuring( Effect.sync(() => console.log('Stream completed')) ) )
Examples
Example 1: Infinite Ticker with Scan
import { Stream, Schedule, Effect } from 'effect' const counter = Stream.fromSchedule(Schedule.spaced('1 second')).pipe( Stream.scan(0, (count) => count + 1), Stream.take(10) ) await Stream.runForEach(counter, (n) => Effect.sync(() => console.log(`Tick ${n}`)) ).pipe(Effect.runPromise)
Example 2: WebSocket with Error Handling
import { Stream, Effect } from 'effect' const wsStream = Stream.async<string, Error>((emit) => { const ws = new WebSocket('wss://example.com') ws.onmessage = (event) => emit.single(event.data) ws.onerror = () => emit.fail(new Error('Connection failed')) ws.onclose = () => emit.end() return Effect.sync(() => ws.close()) }).pipe( Stream.retry(Schedule.exponential('1 second').pipe( Schedule.compose(Schedule.recurs(3)) )), Stream.catchAll((error) => Stream.succeed(`Error: ${error.message}`) ) )
Example 3: Batched API Polling
import { Stream, Schedule, Effect } from 'effect' const pollUsers = Stream.fromSchedule(Schedule.spaced('5 seconds')).pipe( Stream.mapEffect(() => Effect.tryPromise(() => fetch('/api/users').then(r => r.json()) ) ), Stream.take(10), Stream.rechunk(3) // Batch 3 responses together ) await Stream.runForEach(pollUsers, (batch) => Effect.sync(() => console.log(`Batch:`, batch)) ).pipe(Effect.runPromise)
Example 4: Stream-to-Atom (TMNL Testbed)
import { Atom } from '@effect-atom/atom-react' import { Stream, Schedule, Effect } from 'effect' // Create stream atom const tickerAtom = Atom.make( Stream.fromSchedule(Schedule.spaced('1 second')).pipe( Stream.scan(0, (n) => n + 1), Stream.take(10) ) ) // React component function Ticker() { const result = useAtomValue(tickerAtom) if (Result.isInitial(result)) return <div>Starting...</div> if (Result.isSuccess(result)) return <div>Count: {result.value}</div> return <div>Error: {result.error.message}</div> }
Anti-Patterns
1. Not Consuming Streams
// WRONG — Stream is lazy, nothing happens const stream = Stream.fromIterable([1, 2, 3]).pipe( Stream.map((n) => n * 2) ) // CORRECT — Must consume const result = await Stream.runCollect(stream).pipe(Effect.runPromise)
2. Ignoring Errors
// WRONG — Errors crash stream const stream = Stream.fromIterable(urls).pipe( Stream.mapEffect((url) => Effect.tryPromise(() => fetch(url))) ) // CORRECT — Handle errors const stream = Stream.fromIterable(urls).pipe( Stream.mapEffect((url) => Effect.tryPromise(() => fetch(url)) ), Stream.catchAll((error) => Stream.succeed({ error: error.message }) ) )
3. Blocking Operations in map
// WRONG — Blocking sync operation const stream = Stream.fromIterable(items).pipe( Stream.map((item) => { const result = await fetchData(item) // ❌ Can't await in map return result }) ) // CORRECT — Use mapEffect const stream = Stream.fromIterable(items).pipe( Stream.mapEffect((item) => Effect.tryPromise(() => fetchData(item)) ) )
4. Not Cleaning Up Resources
// WRONG — No cleanup Stream.async((emit) => { const ws = new WebSocket('wss://example.com') ws.onmessage = (e) => emit.single(e.data) // No cleanup! }) // CORRECT — Return cleanup Effect Stream.async((emit) => { const ws = new WebSocket('wss://example.com') ws.onmessage = (e) => emit.single(e.data) return Effect.sync(() => ws.close()) })
Quick Reference
| Need | Constructor | Example |
|---|---|---|
| From array | | |
| From async callback | | |
| Ticker | | |
| Single Effect | | |
| Transform | | |
| Filter | | |
| Limit | | |
| Accumulate | | |
| Collect all | | |
| Side effects | | |
| Fold | | |
Related Skills
- effect-atom-integration — Integrate streams with React atoms
- effect-service-authoring — Use streams in service methods
- effect-testing-patterns — Test stream-based code