Claude-skill-registry add-kafka-consumer
Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages (project)
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/add-kafka-consumer" ~/.claude/skills/majiayu000-claude-skill-registry-add-kafka-consumer && rm -rf "$T"
manifest:
skills/data/add-kafka-consumer/SKILL.mdsource content
Add Kafka Consumer Skill
Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages in NovaTune.
Project Context
- Handlers location:
src/NovaTuneApp/NovaTuneApp.Workers.{Name}/Handlers/ - Message types:
src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Messaging/Messages/ - Topic naming:
(e.g.,{prefix}-{topic-name}
)dev-track-deletions
Steps
1. Create Message Type (if needed)
Location:
src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Messaging/Messages/{EventName}.cs
namespace NovaTuneApp.ApiService.Infrastructure.Messaging.Messages; /// <summary> /// Event published when a track is soft-deleted. /// </summary> public record TrackDeletedEvent { public int SchemaVersion { get; init; } = 2; public required string TrackId { get; init; } public required string UserId { get; init; } public required string ObjectKey { get; init; } public string? WaveformObjectKey { get; init; } public required long FileSizeBytes { get; init; } public required DateTimeOffset DeletedAt { get; init; } public required DateTimeOffset ScheduledDeletionAt { get; init; } public required string CorrelationId { get; init; } public required DateTimeOffset Timestamp { get; init; } }
2. Create Handler Class
Location:
src/NovaTuneApp/NovaTuneApp.Workers.{Name}/Handlers/{EventName}Handler.cs
using KafkaFlow; using NovaTuneApp.ApiService.Infrastructure.Messaging.Messages; namespace NovaTuneApp.Workers.Lifecycle.Handlers; /// <summary> /// Handles TrackDeletedEvent messages for immediate cache invalidation. /// </summary> public class TrackDeletedHandler : IMessageHandler<TrackDeletedEvent> { private readonly IServiceProvider _serviceProvider; private readonly ILogger<TrackDeletedHandler> _logger; public TrackDeletedHandler( IServiceProvider serviceProvider, ILogger<TrackDeletedHandler> logger) { _serviceProvider = serviceProvider; _logger = logger; } public async Task Handle(IMessageContext context, TrackDeletedEvent message) { using var scope = _serviceProvider.CreateScope(); _logger.LogInformation( "Processing TrackDeletedEvent for track {TrackId}, user {UserId}", message.TrackId, message.UserId); try { // Get services from scoped container var cacheService = scope.ServiceProvider.GetRequiredService<ICacheService>(); // Perform idempotent operations await cacheService.InvalidateTrackCacheAsync( message.TrackId, message.UserId, context.ConsumerContext.WorkerStopped); _logger.LogDebug( "Successfully processed TrackDeletedEvent for {TrackId}, scheduled deletion at {ScheduledAt}", message.TrackId, message.ScheduledDeletionAt); } catch (Exception ex) { _logger.LogError(ex, "Failed to process TrackDeletedEvent for track {TrackId}", message.TrackId); // Re-throw to trigger retry/DLQ behavior throw; } } }
3. Register Consumer in Program.cs
var topicPrefix = builder.Configuration["NovaTune:TopicPrefix"] ?? "dev"; var bootstrapServers = builder.Configuration.GetConnectionString("messaging") ?? "localhost:9092"; builder.Services.AddKafka(kafka => kafka .UseMicrosoftLog() .AddCluster(cluster => { cluster.WithBrokers([bootstrapServers]); // Register consumer for track deletions cluster.AddConsumer(consumer => consumer .Topic($"{topicPrefix}-track-deletions") .WithGroupId($"{topicPrefix}-lifecycle-worker") .WithBufferSize(100) .WithWorkersCount(2) .WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Earliest) .WithConsumerConfig(new ConsumerConfig { SessionTimeoutMs = 45000, SocketTimeoutMs = 30000, ReconnectBackoffMs = 1000 }) .AddMiddlewares(m => m .AddDeserializer<JsonCoreDeserializer>() .AddTypedHandlers(h => h.AddHandler<TrackDeletedHandler>()) ) ); }) ); // Register handler in DI builder.Services.AddTransient<TrackDeletedHandler>();
4. Add KafkaFlow Hosted Service
builder.Services.AddHostedService<KafkaFlowHostedService>();
The hosted service manages the KafkaFlow bus lifecycle:
internal class KafkaFlowHostedService : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly ILogger<KafkaFlowHostedService> _logger; private IKafkaBus? _kafkaBus; private const int MaxRetries = 30; private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2); public KafkaFlowHostedService( IServiceProvider serviceProvider, ILogger<KafkaFlowHostedService> logger) { _serviceProvider = serviceProvider; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Starting KafkaFlow bus..."); await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken); for (var attempt = 1; attempt <= MaxRetries; attempt++) { try { _kafkaBus = _serviceProvider.CreateKafkaBus(); await _kafkaBus.StartAsync(stoppingToken); _logger.LogInformation("KafkaFlow bus started on attempt {Attempt}", attempt); await Task.Delay(Timeout.Infinite, stoppingToken); return; } catch (OperationCanceledException) { _logger.LogInformation("KafkaFlow bus stopping due to cancellation"); return; } catch (Exception ex) { _logger.LogWarning(ex, "Failed to start KafkaFlow bus (attempt {Attempt}/{Max})", attempt, MaxRetries); if (attempt < MaxRetries) await Task.Delay(RetryDelay, stoppingToken); else _logger.LogError(ex, "Failed after {Max} attempts", MaxRetries); } } } public override async Task StopAsync(CancellationToken cancellationToken) { if (_kafkaBus is not null) { _logger.LogInformation("Stopping KafkaFlow bus..."); await _kafkaBus.StopAsync(); } await base.StopAsync(cancellationToken); } }
Consumer Configuration Options
| Option | Description | Default |
|---|---|---|
| Internal message buffer size | 100 |
| Parallel message processors | 1-4 |
| Starting position for new consumers | |
| Consumer session timeout | 45000 |
| Socket timeout | 30000 |
Handler Patterns
Simple Handler
public class SimpleHandler : IMessageHandler<MyEvent> { public Task Handle(IMessageContext context, MyEvent message) { // Process message return Task.CompletedTask; } }
Handler with Scoped Services
public class ScopedHandler : IMessageHandler<MyEvent> { private readonly IServiceProvider _serviceProvider; public ScopedHandler(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } public async Task Handle(IMessageContext context, MyEvent message) { using var scope = _serviceProvider.CreateScope(); var dbSession = scope.ServiceProvider.GetRequiredService<IAsyncDocumentSession>(); // Use scoped services await dbSession.SaveChangesAsync(context.ConsumerContext.WorkerStopped); } }
Handler with Retry/DLQ
public async Task Handle(IMessageContext context, MyEvent message) { try { // Process message } catch (TransientException ex) { // Will be retried based on consumer config throw; } catch (PermanentException ex) { // Log and swallow - don't retry _logger.LogError(ex, "Permanent failure for message"); } }
Best Practices
- Make handlers idempotent - Messages may be delivered more than once
- Use scoped services - Create scope for each message
- Handle cancellation - Use
context.ConsumerContext.WorkerStopped - Log appropriately - Info for processing, Debug for success, Error for failures
- Re-throw for retries - Only swallow permanent failures
- Keep handlers focused - One handler per message type
Testing
[Fact] public async Task Handler_Should_InvalidateCache_OnTrackDeleted() { // Arrange var cacheService = Substitute.For<ICacheService>(); var serviceProvider = BuildServiceProvider(cacheService); var handler = new TrackDeletedHandler(serviceProvider, _logger); var message = new TrackDeletedEvent { TrackId = "01HXK...", UserId = "user123", ObjectKey = "tracks/01HXK...", FileSizeBytes = 1024, DeletedAt = DateTimeOffset.UtcNow, ScheduledDeletionAt = DateTimeOffset.UtcNow.AddDays(30), CorrelationId = Guid.NewGuid().ToString(), Timestamp = DateTimeOffset.UtcNow }; // Act await handler.Handle(_mockContext, message); // Assert await cacheService.Received(1) .InvalidateTrackCacheAsync(message.TrackId, message.UserId, Arg.Any<CancellationToken>()); }
Topic Naming Convention
| Topic | Purpose | Producer | Consumer |
|---|---|---|---|
| Audio upload notifications | Upload flow | Audio processor |
| Track deletion events | API service | Lifecycle worker |
| MinIO bucket events | MinIO | Upload ingestor |