Claude-skill-registry effect-streams-pipelines
Stream creation, transformation, sinks, batching, and resilience. Use when building data pipelines with concurrency and backpressure.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/effect-streams-pipelines" ~/.claude/skills/majiayu000-claude-skill-registry-effect-streams-pipelines-f6f616 && rm -rf "$T"
manifest:
skills/data/effect-streams-pipelines/SKILL.mdsource content
Streams & Pipelines
When to use
- You’re building data pipelines with batching/backpressure
- You need controlled concurrency per element
- You must process large inputs with constant memory
Create
const s = Stream.fromIterable(items)
Transform
const out = s.pipe( Stream.mapEffect(processItem, { concurrency: 4 }), Stream.filter((a) => a.valid), Stream.grouped(100) )
Consume
yield* Stream.runDrain(out) // or const all = yield* Stream.runCollect(out)
Resource-Safe
const fileLines = Stream.acquireRelease(open(), close).pipe( Stream.flatMap(readLines) )
Resilience
const resilient = s.pipe( Stream.mapEffect((x) => op(x).pipe(Effect.retry(retry))) )
Real-world snippet: Stream to S3 with progress and scoped background ticker
let downloadedBytes = 0 yield* Effect.gen(function* () { // background progress ticker yield* Effect.repeat( Effect.gen(function* () { const bytes = yield* Effect.succeed(downloadedBytes) yield* Effect.log(`Downloaded ${bytes}/${contentLength} bytes`) }), Schedule.forever.pipe(Schedule.delayed(() => "2 seconds")) ).pipe(Effect.delay("100 millis"), Effect.forkScoped) yield* s3.putObject(key, resp.stream.pipe( Stream.tap((chunk) => { downloadedBytes += chunk.length; return Effect.void }) ), { contentLength } ) }).pipe(Effect.scoped)
Guidance
- Prefer
withStream.mapEffect
to control parallel workconcurrency - Use
for batching network/DB operationsgrouped(n) - Always model resource acquisition with
acquireRelease
Pitfalls
- Collecting massive streams into memory → prefer
or chunked writesrunDrain - Doing blocking IO in transformations → keep operations effectful and non-blocking
Cross-links
- Concurrency: pools and timeouts for per-item work
- Resources: scope/finalizers for pipeline resources
- EffectPatterns inspiration: https://github.com/PaulJPhilp/EffectPatterns
Local Source Reference
CRITICAL: Search local Effect source before implementing
The full Effect source code is available at
docs/effect-source/. Always search the actual implementation before writing Effect code.
Key Source Files
- Stream:
docs/effect-source/effect/src/Stream.ts - Sink:
docs/effect-source/effect/src/Sink.ts - Channel:
docs/effect-source/effect/src/Channel.ts
Example Searches
# Find Stream creation patterns grep -F "fromIterable" docs/effect-source/effect/src/Stream.ts grep -F "make" docs/effect-source/effect/src/Stream.ts grep -F "fromEffect" docs/effect-source/effect/src/Stream.ts # Study Stream transformations grep -F "mapEffect" docs/effect-source/effect/src/Stream.ts grep -F "filter" docs/effect-source/effect/src/Stream.ts grep -F "grouped" docs/effect-source/effect/src/Stream.ts # Find Stream consumption grep -F "runDrain" docs/effect-source/effect/src/Stream.ts grep -F "runCollect" docs/effect-source/effect/src/Stream.ts # Look at Stream test examples grep -F "Stream." docs/effect-source/effect/test/Stream.test.ts
Workflow
- Identify the Stream API you need (e.g., mapEffect, grouped)
- Search
for the implementationdocs/effect-source/effect/src/Stream.ts - Study the types and pipeline patterns
- Look at test files for usage examples
- Write your code based on real implementations
Real source code > documentation > assumptions
References
- Agent Skills overview: https://www.anthropic.com/news/skills
- Skills guide: https://docs.claude.com/en/docs/claude-code/skills