Claude-code-plugins-plus-skills snowflake-sdk-patterns
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/snowflake-pack/skills/snowflake-sdk-patterns" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-snowflake-sdk-patterns && rm -rf "$T"
manifest:
plugins/saas-packs/snowflake-pack/skills/snowflake-sdk-patterns/SKILL.mdsource content
Snowflake SDK Patterns
Overview
Production-ready patterns for
snowflake-sdk (Node.js) and snowflake-connector-python using real driver APIs.
Prerequisites
- Completed
setupsnowflake-install-auth - Understanding of callback-to-promise conversion patterns
- Familiarity with Snowflake's callback-based Node.js API
Instructions
Step 1: Connection Pool (Node.js)
// src/snowflake/pool.ts import snowflake from 'snowflake-sdk'; interface PoolConfig { max: number; idleTimeoutMs: number; } class SnowflakePool { private pool: snowflake.Connection[] = []; private available: snowflake.Connection[] = []; private waiting: ((conn: snowflake.Connection) => void)[] = []; private config: PoolConfig; constructor( private connConfig: snowflake.ConnectionOptions, config: Partial<PoolConfig> = {} ) { this.config = { max: 10, idleTimeoutMs: 60000, ...config }; } async acquire(): Promise<snowflake.Connection> { // Return available connection if (this.available.length > 0) { return this.available.pop()!; } // Create new if under limit if (this.pool.length < this.config.max) { const conn = snowflake.createConnection(this.connConfig); await new Promise<void>((resolve, reject) => { conn.connect((err) => (err ? reject(err) : resolve())); }); this.pool.push(conn); return conn; } // Wait for one to become available return new Promise((resolve) => { this.waiting.push(resolve); }); } release(conn: snowflake.Connection): void { if (this.waiting.length > 0) { const next = this.waiting.shift()!; next(conn); } else { this.available.push(conn); } } async withConnection<T>(fn: (conn: snowflake.Connection) => Promise<T>): Promise<T> { const conn = await this.acquire(); try { return await fn(conn); } finally { this.release(conn); } } } // Singleton pool export const pool = new SnowflakePool({ account: process.env.SNOWFLAKE_ACCOUNT!, username: process.env.SNOWFLAKE_USER!, password: process.env.SNOWFLAKE_PASSWORD!, warehouse: process.env.SNOWFLAKE_WAREHOUSE || 'COMPUTE_WH', database: process.env.SNOWFLAKE_DATABASE!, schema: process.env.SNOWFLAKE_SCHEMA || 'PUBLIC', });
Step 2: Promise-Based Query Helper
// src/snowflake/query.ts import snowflake from 'snowflake-sdk'; interface QueryResult<T = Record<string, any>> { rows: T[]; statement: snowflake.Statement; sqlText: string; } export function query<T = Record<string, any>>( conn: snowflake.Connection, sqlText: string, binds?: snowflake.Binds ): Promise<QueryResult<T>> { return new Promise((resolve, reject) => { conn.execute({ sqlText, binds, complete: (err, stmt, rows) => { if (err) { reject(Object.assign(err, { sqlText })); } else { resolve({ rows: (rows || []) as T[], statement: stmt, sqlText }); } }, }); }); } // Multi-statement execution export async function multiQuery( conn: snowflake.Connection, statements: string[] ): Promise<QueryResult[]> { const results: QueryResult[] = []; for (const sql of statements) { results.push(await query(conn, sql)); } return results; }
Step 3: Streaming for Large Result Sets
// src/snowflake/stream.ts export async function* streamQuery<T = Record<string, any>>( conn: snowflake.Connection, sqlText: string, binds?: snowflake.Binds ): AsyncGenerator<T> { const stmt = await new Promise<snowflake.Statement>((resolve, reject) => { conn.execute({ sqlText, binds, streamResult: true, complete: (err, stmt) => { if (err) reject(err); else resolve(stmt); }, }); }); const stream = stmt.streamRows(); for await (const row of stream) { yield row as T; } } // Usage: process millions of rows without memory pressure // for await (const row of streamQuery(conn, 'SELECT * FROM big_table')) { // await processRow(row); // }
Step 4: Python Context Manager Pattern
# src/snowflake_pool.py import snowflake.connector from contextlib import contextmanager from typing import Generator, Any class SnowflakePool: def __init__(self, **conn_params): self._params = conn_params @contextmanager def connection(self) -> Generator[snowflake.connector.SnowflakeConnection, None, None]: conn = snowflake.connector.connect(**self._params) try: yield conn finally: conn.close() @contextmanager def cursor(self) -> Generator[snowflake.connector.cursor.SnowflakeCursor, None, None]: with self.connection() as conn: cur = conn.cursor() try: yield cur finally: cur.close() def execute(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]: with self.cursor() as cur: cur.execute(sql, params) columns = [desc[0] for desc in cur.description] if cur.description else [] return [dict(zip(columns, row)) for row in cur.fetchall()] def execute_many(self, sql: str, params_list: list[tuple]) -> int: """Batch insert — much faster than individual inserts.""" with self.cursor() as cur: cur.executemany(sql, params_list) return cur.rowcount # Usage pool = SnowflakePool( account=os.environ['SNOWFLAKE_ACCOUNT'], user=os.environ['SNOWFLAKE_USER'], password=os.environ['SNOWFLAKE_PASSWORD'], warehouse='COMPUTE_WH', database='MY_DB', schema='PUBLIC', ) users = pool.execute("SELECT * FROM users WHERE status = %s", ('active',))
Step 5: Error Handling Wrapper
// src/snowflake/errors.ts export class SnowflakeQueryError extends Error { constructor( message: string, public readonly sqlState: string, public readonly code: number, public readonly sqlText: string, public readonly retryable: boolean ) { super(message); this.name = 'SnowflakeQueryError'; } } const RETRYABLE_CODES = new Set([ 390114, // Connection token expired — reconnect 390503, // Service unavailable ]); export async function safeQuery<T>( conn: snowflake.Connection, sqlText: string, binds?: snowflake.Binds, maxRetries = 3 ): Promise<T[]> { for (let attempt = 1; attempt <= maxRetries; attempt++) { try { const { rows } = await query<T>(conn, sqlText, binds); return rows; } catch (err: any) { const retryable = RETRYABLE_CODES.has(err.code); if (!retryable || attempt === maxRetries) { throw new SnowflakeQueryError( err.message, err.sqlState, err.code, sqlText, retryable ); } const delay = 1000 * Math.pow(2, attempt - 1); await new Promise((r) => setTimeout(r, delay)); } } throw new Error('Unreachable'); }
Error Handling
| Pattern | Use Case | Benefit |
|---|---|---|
| Connection pool | High concurrency apps | Reuses connections, prevents exhaustion |
| Promise wrapper | All Node.js code | Clean async/await instead of callbacks |
| Streaming | Large result sets (>100K rows) | Constant memory usage |
| Context manager | All Python code | Guarantees connection cleanup |
| Retry with backoff | Transient failures | Handles token expiry, service blips |
Resources
Next Steps
Apply patterns in
snowflake-core-workflow-a for real-world data pipeline usage.