Claude-code-plugins snowflake-core-workflow-b

install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-b" ~/.claude/skills/jeremylongshore-claude-code-plugins-snowflake-core-workflow-b && rm -rf "$T"
manifest: plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-b/SKILL.md
source content

Snowflake Core Workflow B — Data Transformation

Overview

Build ELT pipelines using streams (change data capture), tasks (scheduling), and dynamic tables (declarative transforms).

Prerequisites

  • Data loaded into Snowflake (via
    snowflake-core-workflow-a
    )
  • Understanding of ELT vs ETL patterns
  • Role with
    CREATE TASK
    ,
    CREATE STREAM
    privileges

Instructions

Step 1: Create a Stream for Change Data Capture

-- Track changes on the raw orders table
CREATE OR REPLACE STREAM orders_stream ON TABLE raw_orders
  APPEND_ONLY = FALSE;

-- Append-only stream (lighter weight, inserts only)
CREATE OR REPLACE STREAM events_stream ON TABLE raw_events
  APPEND_ONLY = TRUE;

-- Check what's changed since last consumption
SELECT * FROM orders_stream;
-- METADATA$ACTION = 'INSERT' | 'DELETE'
-- METADATA$ISUPDATE = TRUE if row is part of an UPDATE
-- METADATA$ROW_ID = unique row identifier

Step 2: Create a Task to Process Stream Data

-- Transform task runs when stream has data
CREATE OR REPLACE TASK transform_orders
  WAREHOUSE = TRANSFORM_WH
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
  MERGE INTO dim_orders AS target
  USING (
    SELECT
      order_id,
      customer_id,
      amount::DECIMAL(12,2) AS amount,
      order_date::TIMESTAMP_NTZ AS order_date,
      CASE
        WHEN amount >= 1000 THEN 'high_value'
        WHEN amount >= 100 THEN 'medium_value'
        ELSE 'standard'
      END AS order_tier,
      CURRENT_TIMESTAMP() AS processed_at
    FROM orders_stream
    WHERE METADATA$ACTION = 'INSERT'
  ) AS source
  ON target.order_id = source.order_id
  WHEN MATCHED THEN UPDATE SET
    target.amount = source.amount,
    target.order_tier = source.order_tier,
    target.processed_at = source.processed_at
  WHEN NOT MATCHED THEN INSERT
    (order_id, customer_id, amount, order_date, order_tier, processed_at)
  VALUES
    (source.order_id, source.customer_id, source.amount,
     source.order_date, source.order_tier, source.processed_at);

-- Enable the task
ALTER TASK transform_orders RESUME;

Step 3: Build a Task DAG (Directed Acyclic Graph)

-- Root task: aggregate daily metrics
CREATE OR REPLACE TASK daily_metrics_root
  WAREHOUSE = TRANSFORM_WH
  SCHEDULE = 'USING CRON 0 6 * * * America/New_York'
AS
  INSERT INTO daily_order_metrics
  SELECT
    CURRENT_DATE() - 1 AS metric_date,
    COUNT(*) AS total_orders,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
  FROM dim_orders
  WHERE order_date >= CURRENT_DATE() - 1
    AND order_date < CURRENT_DATE();

-- Child task: runs after root completes
CREATE OR REPLACE TASK update_customer_segments
  WAREHOUSE = TRANSFORM_WH
  AFTER daily_metrics_root
AS
  MERGE INTO customer_segments AS target
  USING (
    SELECT customer_id,
      COUNT(*) AS order_count,
      SUM(amount) AS lifetime_value,
      CASE
        WHEN SUM(amount) >= 10000 THEN 'platinum'
        WHEN SUM(amount) >= 5000 THEN 'gold'
        WHEN SUM(amount) >= 1000 THEN 'silver'
        ELSE 'bronze'
      END AS segment
    FROM dim_orders GROUP BY customer_id
  ) AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET
    target.order_count = source.order_count,
    target.lifetime_value = source.lifetime_value,
    target.segment = source.segment
  WHEN NOT MATCHED THEN INSERT VALUES
    (source.customer_id, source.order_count, source.lifetime_value, source.segment);

-- Resume tasks (children first, then root)
ALTER TASK update_customer_segments RESUME;
ALTER TASK daily_metrics_root RESUME;

Step 4: Dynamic Tables (Declarative Alternative)

-- Auto-refreshes based on target freshness — no streams/tasks needed
CREATE OR REPLACE DYNAMIC TABLE customer_360
  TARGET_LAG = '10 minutes'
  WAREHOUSE = TRANSFORM_WH
AS
  SELECT
    c.customer_id, c.name, c.email,
    COUNT(o.order_id) AS total_orders,
    COALESCE(SUM(o.amount), 0) AS lifetime_value,
    MAX(o.order_date) AS last_order_date,
    DATEDIFF('day', MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order
  FROM customers c
  LEFT JOIN dim_orders o ON c.customer_id = o.customer_id
  GROUP BY c.customer_id, c.name, c.email;

-- Monitor refresh status
SELECT name, target_lag, refresh_mode, scheduling_state
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE name = 'CUSTOMER_360';

Step 5: Monitor Pipelines

-- Task run history
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD(hours, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

-- Find failed runs
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED'
  AND scheduled_time >= DATEADD(hours, -24, CURRENT_TIMESTAMP());

-- Stream lag check — if STALE = TRUE, data may be lost
SHOW STREAMS LIKE 'orders_stream';

Error Handling

ErrorCauseSolution
Task is suspended
Not resumed after creation
ALTER TASK x RESUME
Stream is stale
Data retention exceededRecreate stream; increase
DATA_RETENTION_TIME_IN_DAYS
Warehouse does not exist
Wrong warehouse in taskVerify warehouse name
MERGE: duplicate rows
Non-unique join keyAdd dedup CTE before MERGE
Dynamic table refresh failed
Source schema changedCheck upstream table definitions

Resources

Next Steps

For common errors and troubleshooting, see

snowflake-common-errors
.