Claude-code-plugins snowflake-core-workflow-b
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-b" ~/.claude/skills/jeremylongshore-claude-code-plugins-snowflake-core-workflow-b && rm -rf "$T"
manifest:
plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-b/SKILL.mdsource content
Snowflake Core Workflow B — Data Transformation
Overview
Build ELT pipelines using streams (change data capture), tasks (scheduling), and dynamic tables (declarative transforms).
Prerequisites
- Data loaded into Snowflake (via
)snowflake-core-workflow-a - Understanding of ELT vs ETL patterns
- Role with
,CREATE TASK
privilegesCREATE STREAM
Instructions
Step 1: Create a Stream for Change Data Capture
-- Track changes on the raw orders table CREATE OR REPLACE STREAM orders_stream ON TABLE raw_orders APPEND_ONLY = FALSE; -- Append-only stream (lighter weight, inserts only) CREATE OR REPLACE STREAM events_stream ON TABLE raw_events APPEND_ONLY = TRUE; -- Check what's changed since last consumption SELECT * FROM orders_stream; -- METADATA$ACTION = 'INSERT' | 'DELETE' -- METADATA$ISUPDATE = TRUE if row is part of an UPDATE -- METADATA$ROW_ID = unique row identifier
Step 2: Create a Task to Process Stream Data
-- Transform task runs when stream has data CREATE OR REPLACE TASK transform_orders WAREHOUSE = TRANSFORM_WH SCHEDULE = '5 MINUTE' WHEN SYSTEM$STREAM_HAS_DATA('orders_stream') AS MERGE INTO dim_orders AS target USING ( SELECT order_id, customer_id, amount::DECIMAL(12,2) AS amount, order_date::TIMESTAMP_NTZ AS order_date, CASE WHEN amount >= 1000 THEN 'high_value' WHEN amount >= 100 THEN 'medium_value' ELSE 'standard' END AS order_tier, CURRENT_TIMESTAMP() AS processed_at FROM orders_stream WHERE METADATA$ACTION = 'INSERT' ) AS source ON target.order_id = source.order_id WHEN MATCHED THEN UPDATE SET target.amount = source.amount, target.order_tier = source.order_tier, target.processed_at = source.processed_at WHEN NOT MATCHED THEN INSERT (order_id, customer_id, amount, order_date, order_tier, processed_at) VALUES (source.order_id, source.customer_id, source.amount, source.order_date, source.order_tier, source.processed_at); -- Enable the task ALTER TASK transform_orders RESUME;
Step 3: Build a Task DAG (Directed Acyclic Graph)
-- Root task: aggregate daily metrics CREATE OR REPLACE TASK daily_metrics_root WAREHOUSE = TRANSFORM_WH SCHEDULE = 'USING CRON 0 6 * * * America/New_York' AS INSERT INTO daily_order_metrics SELECT CURRENT_DATE() - 1 AS metric_date, COUNT(*) AS total_orders, SUM(amount) AS total_revenue, AVG(amount) AS avg_order_value, COUNT(DISTINCT customer_id) AS unique_customers FROM dim_orders WHERE order_date >= CURRENT_DATE() - 1 AND order_date < CURRENT_DATE(); -- Child task: runs after root completes CREATE OR REPLACE TASK update_customer_segments WAREHOUSE = TRANSFORM_WH AFTER daily_metrics_root AS MERGE INTO customer_segments AS target USING ( SELECT customer_id, COUNT(*) AS order_count, SUM(amount) AS lifetime_value, CASE WHEN SUM(amount) >= 10000 THEN 'platinum' WHEN SUM(amount) >= 5000 THEN 'gold' WHEN SUM(amount) >= 1000 THEN 'silver' ELSE 'bronze' END AS segment FROM dim_orders GROUP BY customer_id ) AS source ON target.customer_id = source.customer_id WHEN MATCHED THEN UPDATE SET target.order_count = source.order_count, target.lifetime_value = source.lifetime_value, target.segment = source.segment WHEN NOT MATCHED THEN INSERT VALUES (source.customer_id, source.order_count, source.lifetime_value, source.segment); -- Resume tasks (children first, then root) ALTER TASK update_customer_segments RESUME; ALTER TASK daily_metrics_root RESUME;
Step 4: Dynamic Tables (Declarative Alternative)
-- Auto-refreshes based on target freshness — no streams/tasks needed CREATE OR REPLACE DYNAMIC TABLE customer_360 TARGET_LAG = '10 minutes' WAREHOUSE = TRANSFORM_WH AS SELECT c.customer_id, c.name, c.email, COUNT(o.order_id) AS total_orders, COALESCE(SUM(o.amount), 0) AS lifetime_value, MAX(o.order_date) AS last_order_date, DATEDIFF('day', MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order FROM customers c LEFT JOIN dim_orders o ON c.customer_id = o.customer_id GROUP BY c.customer_id, c.name, c.email; -- Monitor refresh status SELECT name, target_lag, refresh_mode, scheduling_state FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES()) WHERE name = 'CUSTOMER_360';
Step 5: Monitor Pipelines
-- Task run history SELECT name, state, error_message, scheduled_time FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY( SCHEDULED_TIME_RANGE_START => DATEADD(hours, -24, CURRENT_TIMESTAMP()) )) ORDER BY scheduled_time DESC; -- Find failed runs SELECT name, state, error_message, scheduled_time FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) WHERE state = 'FAILED' AND scheduled_time >= DATEADD(hours, -24, CURRENT_TIMESTAMP()); -- Stream lag check — if STALE = TRUE, data may be lost SHOW STREAMS LIKE 'orders_stream';
Error Handling
| Error | Cause | Solution |
|---|---|---|
| Not resumed after creation | |
| Data retention exceeded | Recreate stream; increase |
| Wrong warehouse in task | Verify warehouse name |
| Non-unique join key | Add dedup CTE before MERGE |
| Source schema changed | Check upstream table definitions |
Resources
Next Steps
For common errors and troubleshooting, see
snowflake-common-errors.