Claude-skill-registry add-datalake-consumer
Adds an event consumer that writes to Azure Data Lake (Parquet) following BI_SALES_RISK plan. Creates events/consumers/[Name]DataLakeCollector.ts subscribing to RabbitMQ, building Parquet rows, writing to /path_prefix/year=YYYY/month=MM/day=DD/. Use when adding DataLakeCollector in logging or similar “event to Data Lake” pipelines.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/add-datalake-consumer" ~/.claude/skills/majiayu000-claude-skill-registry-add-datalake-consumer && rm -rf "$T"
manifest:
skills/data/add-datalake-consumer/SKILL.mdsource content
Add Data Lake Consumer
Event consumer that subscribes to RabbitMQ and writes to Azure Data Lake (Parquet). Pattern: logging’s DataLakeCollector for
risk.evaluated (BI_SALES_RISK_IMPLEMENTATION_PLAN §3.5, §9.1). BI Sales Risk: Paths and Parquet columns MUST match documentation/requirements/BI_SALES_RISK_DATA_LAKE_LAYOUT.md (§2.1 risk.evaluated, §2.2 ml_outcomes, §4 config).
1. Consumer
Path:
src/events/consumers/[Name]DataLakeCollector.ts
withEventConsumer
,queue
,exchange: coder_events
: e.g.bindings
.['risk.evaluated','ml.prediction.completed','opportunity.updated','forecast.generated']- Handler: map event to row. For risk.evaluated use columns in Data Lake Layout §2.1. Build path:
(Layout §1).{path_prefix}/year={YYYY}/month={MM}/day={DD}/... - Write via
(BlockBlob) or@azure/storage-blob
+@azure/storage-blob
(or Arrow) for Parquet. Buffer/batch by time or count if needed.parquetjs - Config:
,data_lake.connection_string
,data_lake.container
(e.g.data_lake.path_prefix
)./risk_evaluations
2. Config
config/default.yaml:
data_lake: connection_string: ${DATA_LAKE_CONNECTION_STRING} container: ${DATA_LAKE_CONTAINER:-risk} path_prefix: ${DATA_LAKE_PATH_PREFIX:-/risk_evaluations} rabbitmq: url: ${RABBITMQ_URL} exchange: coder_events queue: [module]_data_lake bindings: - risk.evaluated - ml.prediction.completed # ...
config/schema.json: add
data_lake with connection_string, container, path_prefix.
3. Server
In
server.ts: await dataLakeCollector.start() after RabbitMQ connect.
4. Checklist
- Consumer in
, subscribe to RabbitMQ (no Azure Service Bus)events/consumers/ - Path:
; format Parquet{path_prefix}/year=.../month=.../day=.../ - Config:
and schema;data_lake.*
queue and bindingsrabbitmq - Start collector in server