Claude-skill-registry implementing-pubsub-pattern
Implements Pub-Sub patterns using System.Reactive and Channels for event-based communication in .NET. Use when building reactive applications or decoupled event-driven architectures.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/implementing-pubsub-pattern" ~/.claude/skills/majiayu000-claude-skill-registry-implementing-pubsub-pattern && rm -rf "$T"
manifest:
skills/data/implementing-pubsub-pattern/SKILL.mdsource content
.NET Pub-Sub Pattern
A guide for Pub-Sub patterns for event-based asynchronous communication.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
1. Core APIs
| API | Purpose | NuGet |
|---|---|---|
(Rx.NET) | Reactive event streams | System.Reactive |
| Async Producer-Consumer | BCL |
| Observable sequence | BCL |
2. System.Threading.Channels
2.1 Basic Usage
using System.Threading.Channels; public sealed class MessageProcessor { private readonly Channel<Message> _channel = Channel.CreateUnbounded<Message>(); // Producer - Send message public async Task SendAsync(Message message) { await _channel.Writer.WriteAsync(message); } // Consumer - Process message public async Task ProcessAsync(CancellationToken ct) { await foreach (var message in _channel.Reader.ReadAllAsync(ct)) { await HandleMessage(message); } } // Channel completion signal public void Complete() => _channel.Writer.Complete(); }
2.2 Bounded Channel (Backpressure Control)
// Backpressure control with buffer size limit var options = new BoundedChannelOptions(capacity: 100) { FullMode = BoundedChannelFullMode.Wait, // Wait when full SingleReader = true, SingleWriter = false }; var channel = Channel.CreateBounded<Message>(options); // Writer waits until space is available await channel.Writer.WriteAsync(message);
2.3 Multiple Consumer Pattern
public sealed class WorkerPool { private readonly Channel<WorkItem> _channel; private readonly int _workerCount; public WorkerPool(int workerCount = 4) { _workerCount = workerCount; _channel = Channel.CreateUnbounded<WorkItem>(); } public async Task StartAsync(CancellationToken ct) { var workers = Enumerable.Range(0, _workerCount) .Select(_ => ProcessAsync(ct)); await Task.WhenAll(workers); } private async Task ProcessAsync(CancellationToken ct) { await foreach (var item in _channel.Reader.ReadAllAsync(ct)) { await ProcessItem(item); } } public ValueTask EnqueueAsync(WorkItem item) => _channel.Writer.WriteAsync(item); }
3. System.Reactive (Rx.NET)
3.1 EventAggregator Pattern
using System.Reactive.Linq; using System.Reactive.Subjects; public sealed class EventAggregator : IDisposable { private readonly Subject<object> _subject = new(); // Subscribe to specific event type public IObservable<T> GetEvent<T>() => _subject.OfType<T>().AsObservable(); // Publish event public void Publish<T>(T @event) => _subject.OnNext(@event!); public void Dispose() => _subject.Dispose(); }
3.2 Usage Example
// Event definitions public record UserLoggedIn(string UserId); public record OrderPlaced(int OrderId); // Subscription var aggregator = new EventAggregator(); aggregator.GetEvent<UserLoggedIn>() .Subscribe(e => Console.WriteLine($"User logged in: {e.UserId}")); aggregator.GetEvent<OrderPlaced>() .Where(e => e.OrderId > 100) .Subscribe(e => Console.WriteLine($"Large order: {e.OrderId}")); // Publish aggregator.Publish(new UserLoggedIn("user123")); aggregator.Publish(new OrderPlaced(150));
3.3 Rx Operators
// Debounce - Process only the last event in a sequence searchInput .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() .Subscribe(query => Search(query)); // Buffer - Collect events for a period and process as batch events .Buffer(TimeSpan.FromSeconds(5)) .Subscribe(batch => ProcessBatch(batch)); // Retry - Retry on failure observable .Retry(3) .Subscribe( onNext: data => Process(data), onError: ex => LogError(ex) );
4. Comparison: Channels vs Rx
| Feature | Channels | Rx.NET |
|---|---|---|
| Purpose | Producer-Consumer | Event streams |
| Backpressure | Built-in (Bounded) | Separate implementation |
| Operators | Basic | Rich |
| Learning curve | Low | High |
| Dependency | BCL | NuGet |
5. DI Integration
// Program.cs services.AddSingleton(Channel.CreateUnbounded<Message>()); services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Reader); services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Writer); // Producer public sealed class Producer(ChannelWriter<Message> writer) { public ValueTask SendAsync(Message msg) => writer.WriteAsync(msg); } // Consumer public sealed class Consumer(ChannelReader<Message> reader) { public async Task ProcessAsync(CancellationToken ct) { await foreach (var msg in reader.ReadAllAsync(ct)) { await Handle(msg); } } }
6. Required NuGet Package
<ItemGroup> <PackageReference Include="System.Reactive" Version="6.0.*" /> </ItemGroup>
7. Important Notes
Memory Leaks
// Subscription disposal is required var subscription = observable.Subscribe(handler); // After use subscription.Dispose();
Thread Safety
- Channels are thread-safe by default
- Subject is not thread-safe (use Synchronize() if needed)
Backpressure Handling
// Prevent memory explosion with Bounded Channel var channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest // Drop old messages });