Claude-code-plugins-plus-skills snowflake-core-workflow-a
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-a" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-snowflake-core-workflow-a && rm -rf "$T"
manifest:
plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-a/SKILL.mdsource content
Snowflake Core Workflow A — Data Loading
Overview
Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.
Prerequisites
- Completed
setupsnowflake-install-auth - Target table created in Snowflake
- Source data in S3, GCS, Azure Blob, or local files
- Role with
andCREATE STAGE
on warehouseUSAGE
Instructions
Step 1: Create a File Format
-- CSV format CREATE OR REPLACE FILE FORMAT my_csv_format TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1 NULL_IF = ('NULL', 'null', '') EMPTY_FIELD_AS_NULL = TRUE FIELD_OPTIONALLY_ENCLOSED_BY = '"' ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE; -- JSON format (for semi-structured data) CREATE OR REPLACE FILE FORMAT my_json_format TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE IGNORE_UTF8_ERRORS = TRUE; -- Parquet format CREATE OR REPLACE FILE FORMAT my_parquet_format TYPE = 'PARQUET' SNAPPY_COMPRESSION = TRUE;
Step 2: Create a Stage
-- External stage (S3) CREATE OR REPLACE STAGE my_s3_stage STORAGE_INTEGRATION = my_s3_integration URL = 's3://my-bucket/data/' FILE_FORMAT = my_csv_format; -- External stage (GCS) CREATE OR REPLACE STAGE my_gcs_stage STORAGE_INTEGRATION = my_gcs_integration URL = 'gcs://my-bucket/data/' FILE_FORMAT = my_csv_format; -- Internal stage (Snowflake-managed storage) CREATE OR REPLACE STAGE my_internal_stage FILE_FORMAT = my_csv_format; -- List files in stage LIST @my_s3_stage;
Step 3: Upload Files to Internal Stage
# Using SnowSQL PUT command snowsql -c prod -q "PUT file:///tmp/data/*.csv @my_internal_stage AUTO_COMPRESS=TRUE"
# Using Python connector cursor.execute("PUT file:///tmp/data/users.csv @my_internal_stage AUTO_COMPRESS=TRUE")
Step 4: Load Data with COPY INTO
-- Basic COPY INTO from stage COPY INTO my_db.my_schema.users FROM @my_s3_stage/users/ FILE_FORMAT = my_csv_format ON_ERROR = 'CONTINUE' -- Skip bad rows PURGE = TRUE; -- Delete files after load -- COPY with column mapping COPY INTO my_db.my_schema.orders (order_id, customer_id, amount, order_date) FROM ( SELECT $1, $2, $3::FLOAT, $4::TIMESTAMP_NTZ FROM @my_s3_stage/orders/ ) FILE_FORMAT = my_csv_format; -- Load JSON into VARIANT column COPY INTO my_db.my_schema.raw_events FROM @my_s3_stage/events/ FILE_FORMAT = my_json_format MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE; -- Check COPY history SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY( TABLE_NAME => 'USERS', START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP()) ));
Step 5: Programmatic Load (Node.js)
import { pool } from './snowflake/pool'; import { query } from './snowflake/query'; async function loadDataFromStage( tableName: string, stagePath: string, fileFormat: string = 'my_csv_format' ) { return pool.withConnection(async (conn) => { const result = await query(conn, ` COPY INTO ${tableName} FROM @${stagePath} FILE_FORMAT = ${fileFormat} ON_ERROR = 'CONTINUE' FORCE = FALSE `); // COPY INTO returns load status per file for (const row of result.rows) { console.log(`File: ${row.file}, Status: ${row.status}, Rows: ${row.rows_loaded}`); if (row.errors_seen > 0) { console.warn(` Errors: ${row.errors_seen}, First error: ${row.first_error}`); } } return result.rows; }); }
Step 6: Set Up Snowpipe for Continuous Loading
-- Create pipe for auto-ingest from S3 CREATE OR REPLACE PIPE my_db.my_schema.user_pipe AUTO_INGEST = TRUE AS COPY INTO my_db.my_schema.users FROM @my_s3_stage/users/ FILE_FORMAT = my_csv_format; -- Get the SQS queue ARN for S3 event notifications SHOW PIPES LIKE 'user_pipe'; -- Use the notification_channel value to configure S3 bucket events -- Monitor pipe status SELECT SYSTEM$PIPE_STATUS('my_db.my_schema.user_pipe'); -- Check Snowpipe load history SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY( TABLE_NAME => 'USERS', START_TIME => DATEADD(hours, -1, CURRENT_TIMESTAMP()) )) WHERE pipe_catalog_name IS NOT NULL;
Error Handling
| Error | Cause | Solution |
|---|---|---|
| Role lacks USAGE | |
| Wrong stage path | Run to verify files |
| Schema mismatch | Use or fix file |
| COPY deduplication | Use to reload (careful) |
| Missing S3 event notification | Configure SQS notification from output |
Resources
Next Steps
For data transformation workflows, see
snowflake-core-workflow-b.