Claude-skill-registry implementing-io-pipelines
Implements high-performance streaming using System.IO.Pipelines in .NET. Use when building network protocols, parsing binary data, or processing large streams efficiently.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/implementing-io-pipelines" ~/.claude/skills/majiayu000-claude-skill-registry-implementing-io-pipelines && rm -rf "$T"
manifest:
skills/data/implementing-io-pipelines/SKILL.mdsource content
.NET Streaming (System.IO.Pipelines)
A guide for System.IO.Pipelines API for high-performance I/O pipelines.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
1. Core Concepts
| Concept | Description |
|---|---|
| Memory buffer-based read/write pipe |
| Read data from pipe |
| Write data to pipe |
| Non-contiguous memory sequence |
2. Advantages
- Zero-copy: Minimizes unnecessary memory copying
- Backpressure control: Speed regulation between producer and consumer
- Memory pooling: Automatic buffer reuse
- Async I/O: Efficient asynchronous processing
3. Basic Usage
using System.IO.Pipelines; public sealed class PipelineProcessor { public async Task ProcessAsync(Stream stream) { var pipe = new Pipe(); // Run Writer and Reader concurrently var writing = FillPipeAsync(stream, pipe.Writer); var reading = ReadPipeAsync(pipe.Reader); await Task.WhenAll(writing, reading); } private async Task FillPipeAsync(Stream stream, PipeWriter writer) { const int minimumBufferSize = 512; while (true) { // Acquire buffer from memory pool Memory<byte> memory = writer.GetMemory(minimumBufferSize); int bytesRead = await stream.ReadAsync(memory); if (bytesRead == 0) break; // Notify bytes written writer.Advance(bytesRead); // Flush data and notify Reader FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) break; } // Signal write completion await writer.CompleteAsync(); } private async Task ReadPipeAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; // Process buffer ProcessBuffer(buffer); // Notify consumption up to processed position reader.AdvanceTo(buffer.End); if (result.IsCompleted) break; } // Signal read completion await reader.CompleteAsync(); } }
4. Line-by-Line Parsing
private async Task ReadLinesAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) { ProcessLine(line); } // Notify unprocessed data position reader.AdvanceTo(buffer.Start, buffer.End); if (result.IsCompleted) break; } await reader.CompleteAsync(); } private bool TryReadLine( ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) { // Find newline SequencePosition? position = buffer.PositionOf((byte)'\n'); if (position is null) { line = default; return false; } // Slice up to newline line = buffer.Slice(0, position.Value); // Move buffer past newline buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); return true; }
5. Processing ReadOnlySequence<T>
private void ProcessBuffer(ReadOnlySequence<byte> buffer) { if (buffer.IsSingleSegment) { // Single segment - direct access ProcessSpan(buffer.FirstSpan); } else { // Multiple segments - iteration required foreach (var segment in buffer) { ProcessSpan(segment.Span); } } }
6. Network I/O Integration
public async Task ProcessSocketAsync(Socket socket) { var pipe = new Pipe(); var writing = ReceiveAsync(socket, pipe.Writer); var reading = ProcessAsync(pipe.Reader); await Task.WhenAll(writing, reading); } private async Task ReceiveAsync(Socket socket, PipeWriter writer) { while (true) { Memory<byte> memory = writer.GetMemory(4096); int bytesReceived = await socket.ReceiveAsync( memory, SocketFlags.None); if (bytesReceived == 0) break; writer.Advance(bytesReceived); FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) break; } await writer.CompleteAsync(); }
7. PipeOptions Configuration
var pipeOptions = new PipeOptions( pool: MemoryPool<byte>.Shared, // Memory pool readerScheduler: PipeScheduler.ThreadPool, // Reader scheduler writerScheduler: PipeScheduler.ThreadPool, // Writer scheduler pauseWriterThreshold: 64 * 1024, // Writer pause threshold resumeWriterThreshold: 32 * 1024, // Writer resume threshold minimumSegmentSize: 4096, // Minimum segment size useSynchronizationContext: false ); var pipe = new Pipe(pipeOptions);
8. Required NuGet Package
<ItemGroup> <!-- Included in BCL for .NET Core 3.0+ --> <PackageReference Include="System.IO.Pipelines" Version="9.0.*" /> </ItemGroup>
9. Important Notes
AdvanceTo Call Required
// Must call AdvanceTo after ReadAsync ReadResult result = await reader.ReadAsync(); // ... processing ... reader.AdvanceTo(consumed, examined);
Buffer Lifetime
// ❌ Bad example: Saving buffer after ReadAsync ReadOnlySequence<byte> saved; var result = await reader.ReadAsync(); saved = result.Buffer; // Dangerous! Invalidated after AdvanceTo // ✅ Good example: Copy needed data var copy = result.Buffer.ToArray(); reader.AdvanceTo(result.Buffer.End);
Completion Calls
// Must call CompleteAsync for both Writer and Reader await writer.CompleteAsync(); await reader.CompleteAsync();