Claude-code-plugins-plus-skills snowflake-core-workflow-a

install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-a" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-snowflake-core-workflow-a && rm -rf "$T"
manifest: plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-a/SKILL.md
source content

Snowflake Core Workflow A — Data Loading

Overview

Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.

Prerequisites

  • Completed
    snowflake-install-auth
    setup
  • Target table created in Snowflake
  • Source data in S3, GCS, Azure Blob, or local files
  • Role with
    CREATE STAGE
    and
    USAGE
    on warehouse

Instructions

Step 1: Create a File Format

-- CSV format
CREATE OR REPLACE FILE FORMAT my_csv_format
  TYPE = 'CSV'
  FIELD_DELIMITER = ','
  SKIP_HEADER = 1
  NULL_IF = ('NULL', 'null', '')
  EMPTY_FIELD_AS_NULL = TRUE
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;

-- JSON format (for semi-structured data)
CREATE OR REPLACE FILE FORMAT my_json_format
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE
  IGNORE_UTF8_ERRORS = TRUE;

-- Parquet format
CREATE OR REPLACE FILE FORMAT my_parquet_format
  TYPE = 'PARQUET'
  SNAPPY_COMPRESSION = TRUE;

Step 2: Create a Stage

-- External stage (S3)
CREATE OR REPLACE STAGE my_s3_stage
  STORAGE_INTEGRATION = my_s3_integration
  URL = 's3://my-bucket/data/'
  FILE_FORMAT = my_csv_format;

-- External stage (GCS)
CREATE OR REPLACE STAGE my_gcs_stage
  STORAGE_INTEGRATION = my_gcs_integration
  URL = 'gcs://my-bucket/data/'
  FILE_FORMAT = my_csv_format;

-- Internal stage (Snowflake-managed storage)
CREATE OR REPLACE STAGE my_internal_stage
  FILE_FORMAT = my_csv_format;

-- List files in stage
LIST @my_s3_stage;

Step 3: Upload Files to Internal Stage

# Using SnowSQL PUT command
snowsql -c prod -q "PUT file:///tmp/data/*.csv @my_internal_stage AUTO_COMPRESS=TRUE"
# Using Python connector
cursor.execute("PUT file:///tmp/data/users.csv @my_internal_stage AUTO_COMPRESS=TRUE")

Step 4: Load Data with COPY INTO

-- Basic COPY INTO from stage
COPY INTO my_db.my_schema.users
  FROM @my_s3_stage/users/
  FILE_FORMAT = my_csv_format
  ON_ERROR = 'CONTINUE'          -- Skip bad rows
  PURGE = TRUE;                  -- Delete files after load

-- COPY with column mapping
COPY INTO my_db.my_schema.orders (order_id, customer_id, amount, order_date)
  FROM (
    SELECT $1, $2, $3::FLOAT, $4::TIMESTAMP_NTZ
    FROM @my_s3_stage/orders/
  )
  FILE_FORMAT = my_csv_format;

-- Load JSON into VARIANT column
COPY INTO my_db.my_schema.raw_events
  FROM @my_s3_stage/events/
  FILE_FORMAT = my_json_format
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- Check COPY history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
  TABLE_NAME => 'USERS',
  START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP())
));

Step 5: Programmatic Load (Node.js)

import { pool } from './snowflake/pool';
import { query } from './snowflake/query';

async function loadDataFromStage(
  tableName: string,
  stagePath: string,
  fileFormat: string = 'my_csv_format'
) {
  return pool.withConnection(async (conn) => {
    const result = await query(conn, `
      COPY INTO ${tableName}
        FROM @${stagePath}
        FILE_FORMAT = ${fileFormat}
        ON_ERROR = 'CONTINUE'
        FORCE = FALSE
    `);

    // COPY INTO returns load status per file
    for (const row of result.rows) {
      console.log(`File: ${row.file}, Status: ${row.status}, Rows: ${row.rows_loaded}`);
      if (row.errors_seen > 0) {
        console.warn(`  Errors: ${row.errors_seen}, First error: ${row.first_error}`);
      }
    }
    return result.rows;
  });
}

Step 6: Set Up Snowpipe for Continuous Loading

-- Create pipe for auto-ingest from S3
CREATE OR REPLACE PIPE my_db.my_schema.user_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO my_db.my_schema.users
    FROM @my_s3_stage/users/
    FILE_FORMAT = my_csv_format;

-- Get the SQS queue ARN for S3 event notifications
SHOW PIPES LIKE 'user_pipe';
-- Use the notification_channel value to configure S3 bucket events

-- Monitor pipe status
SELECT SYSTEM$PIPE_STATUS('my_db.my_schema.user_pipe');

-- Check Snowpipe load history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
  TABLE_NAME => 'USERS',
  START_TIME => DATEADD(hours, -1, CURRENT_TIMESTAMP())
))
WHERE pipe_catalog_name IS NOT NULL;

Error Handling

ErrorCauseSolution
Insufficient privileges on stage
Role lacks USAGE
GRANT USAGE ON STAGE x TO ROLE y
File not found
Wrong stage pathRun
LIST @stage
to verify files
Number of columns in file does not match
Schema mismatchUse
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
or fix file
Files already loaded
COPY deduplicationUse
FORCE = TRUE
to reload (careful)
Pipe not receiving files
Missing S3 event notificationConfigure SQS notification from
SHOW PIPES
output

Resources

Next Steps

For data transformation workflows, see

snowflake-core-workflow-b
.