install
source · Clone the upstream repo
git clone https://github.com/Intense-Visions/harness-engineering
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/Intense-Visions/harness-engineering "$T" && mkdir -p ~/.claude/skills && cp -r "$T/agents/skills/claude-code/node-streams-pattern" ~/.claude/skills/intense-visions-harness-engineering-node-streams-pattern && rm -rf "$T"
manifest:
agents/skills/claude-code/node-streams-pattern/SKILL.mdsource content
Node.js Streams Pattern
Process large data efficiently using Node.js Readable, Writable, and Transform streams
When to Use
- Processing files or data too large to fit in memory
- Building data pipelines that transform data in stages
- Streaming HTTP responses or file uploads
- Implementing backpressure-aware data processing
Instructions
- Read a file as a stream:
import { createReadStream } from 'node:fs'; import { pipeline } from 'node:stream/promises'; const readable = createReadStream('large-file.csv', { encoding: 'utf-8' }); for await (const chunk of readable) { processChunk(chunk); }
- Pipeline for stream composition (handles errors and cleanup):
import { createReadStream, createWriteStream } from 'node:fs'; import { createGzip } from 'node:zlib'; import { pipeline } from 'node:stream/promises'; await pipeline(createReadStream('input.json'), createGzip(), createWriteStream('output.json.gz'));
- Transform stream for data processing:
import { Transform } from 'node:stream'; const uppercase = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); }, }); await pipeline(createReadStream('input.txt'), uppercase, createWriteStream('output.txt'));
- Object mode streams for JSON processing:
import { Transform } from 'node:stream'; const filterActive = new Transform({ objectMode: true, transform(user, encoding, callback) { if (user.isActive) this.push(user); callback(); }, });
- Readable from async generator:
import { Readable } from 'node:stream'; async function* generateRecords() { for (let i = 0; i < 1_000_000; i++) { yield { id: i, value: Math.random() }; } } const readable = Readable.from(generateRecords());
- HTTP streaming response:
import { createReadStream } from 'node:fs'; app.get('/download', (req, res) => { res.setHeader('Content-Type', 'application/octet-stream'); const stream = createReadStream('large-file.zip'); stream.pipe(res); stream.on('error', (err) => { res.status(500).end('Error reading file'); }); });
- Line-by-line processing:
import { createReadStream } from 'node:fs'; import { createInterface } from 'node:readline'; const rl = createInterface({ input: createReadStream('data.csv'), crlfDelay: Infinity, }); for await (const line of rl) { const [name, email] = line.split(','); await processUser({ name, email }); }
- Always use
instead ofpipeline
—.pipe()
handles error propagation and cleanup:pipeline
// Bad: errors not propagated, streams not cleaned up readable.pipe(transform).pipe(writable); // Good: errors propagated, streams destroyed on error await pipeline(readable, transform, writable);
Details
Streams process data in chunks without loading the entire dataset into memory. A 10GB file can be processed with a 64KB buffer.
Stream types:
- Readable — source of data (file reads, HTTP responses, database cursors)
- Writable — destination for data (file writes, HTTP requests, database inserts)
- Transform — processes data in transit (compression, encryption, parsing)
- Duplex — both readable and writable (network sockets, WebSocket connections)
Backpressure: When a writable stream cannot consume data as fast as the readable produces it, Node.js automatically pauses the readable stream. This prevents memory exhaustion.
pipeline handles backpressure correctly; manual .pipe() does not always.
: Controls the internal buffer size (default 16KB for byte streams, 16 objects for object mode). Lower values reduce memory but increase I/O operations.highWaterMark
Trade-offs:
- Streams handle unlimited data sizes — but add complexity compared to simple
readFile
handles errors and cleanup — but requires all streams to implementpipeline
correctlydestroy- Object mode is convenient for JSON processing — but disables backpressure optimization (each object is one "chunk")
- Async generators create readable streams easily — but lose backpressure if the generator is compute-bound
Source
https://nodejs.org/api/stream.html
Process
- Read the instructions and examples in this document.
- Apply the patterns to your implementation, adapting to your specific context.
- Verify your implementation against the details and edge cases listed above.
Harness Integration
- Type: knowledge — this skill is a reference document, not a procedural workflow.
- No tools or state — consumed as context by other skills and agents.
Success Criteria
- The patterns described in this document are applied correctly in the implementation.
- Edge cases and anti-patterns listed in this document are avoided.