Awesome-omni-skill data-pipeline
Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics
install
source · Clone the upstream repo
git clone https://github.com/diegosouzapw/awesome-omni-skill
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data-ai/data-pipeline" ~/.claude/skills/diegosouzapw-awesome-omni-skill-data-pipeline && rm -rf "$T"
manifest:
skills/data-ai/data-pipeline/SKILL.mdsource content
Data Pipeline
Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.
Overview
This skill covers:
- Data extraction from multiple sources
- Transformation and cleaning
- Loading to destinations
- Scheduling and monitoring
- Error handling and alerts
ETL Patterns
Basic ETL Flow
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ EXTRACT │───▶│ TRANSFORM │───▶│ LOAD │ │ │ │ │ │ │ │ • APIs │ │ • Clean │ │ • Database │ │ • Databases │ │ • Map │ │ • Warehouse │ │ • Files │ │ • Aggregate │ │ • Files │ │ • Webhooks │ │ • Enrich │ │ • APIs │ └─────────────┘ └─────────────┘ └─────────────┘
n8n ETL Workflow
workflow: "Daily Sales ETL" schedule: "2am daily" nodes: # EXTRACT - name: "Extract from Shopify" type: shopify action: get_orders filter: created_at >= yesterday - name: "Extract from Stripe" type: stripe action: get_payments filter: created >= yesterday # TRANSFORM - name: "Merge Data" type: merge mode: combine_by_key key: order_id - name: "Transform" type: code code: | return items.map(item => ({ date: item.created_at.split('T')[0], order_id: item.id, customer_email: item.email, total: parseFloat(item.total_price), currency: item.currency, items: item.line_items.length, source: item.source_name, payment_status: item.payment.status })); # LOAD - name: "Load to BigQuery" type: google_bigquery action: insert_rows table: sales_daily - name: "Update Google Sheets" type: google_sheets action: append_rows spreadsheet: "Daily Sales Report"
Data Sources
Common Extractors
extractors: databases: - postgresql: connection: connection_string query: "SELECT * FROM orders WHERE date >= $1" - mysql: connection: connection_string query: custom_sql - mongodb: connection: connection_string collection: orders filter: {date: {$gte: yesterday}} apis: - rest_api: url: "https://api.example.com/data" method: GET headers: {Authorization: "Bearer {token}"} pagination: handle_automatically - graphql: url: "https://api.example.com/graphql" query: graphql_query files: - csv: source: sftp/s3/google_drive delimiter: "," encoding: utf-8 - excel: source: file_path sheet: "Sheet1" - json: source: api/file path: "data.items" saas: - salesforce: get_objects - hubspot: get_contacts/deals - stripe: get_charges - shopify: get_orders
Transformations
Common Transformations
transformations: cleaning: - remove_nulls: drop_or_fill - trim_whitespace: all_string_fields - deduplicate: by_key - validate: against_schema mapping: - rename_fields: {old_name: new_name} - convert_types: {date_string: date} - map_values: {status_code: status_name} aggregation: - group_by: [date, category] - sum: [revenue, quantity] - count: orders - average: order_value enrichment: - lookup: from_reference_table - geocode: from_address - calculate: derived_fields filtering: - where: condition - limit: n_rows - sample: percentage
Code Transform Examples
// Clean and normalize data function transform(items) { return items.map(item => ({ // Clean strings name: item.name?.trim().toLowerCase(), // Parse dates date: new Date(item.created_at).toISOString().split('T')[0], // Convert types amount: parseFloat(item.amount) || 0, // Map values status: statusMap[item.status_code] || 'unknown', // Calculate fields total: item.quantity * item.unit_price, // Filter nested tags: item.tags?.filter(t => t.active).map(t => t.name), // Default values source: item.source || 'direct' })); } // Aggregate data function aggregate(items) { const grouped = {}; items.forEach(item => { const key = `${item.date}_${item.category}`; if (!grouped[key]) { grouped[key] = { date: item.date, category: item.category, total_revenue: 0, order_count: 0 }; } grouped[key].total_revenue += item.amount; grouped[key].order_count += 1; }); return Object.values(grouped); }
Data Destinations
Common Loaders
loaders: data_warehouses: - bigquery: project: project_id dataset: analytics table: sales write_mode: append/truncate - snowflake: account: account_id warehouse: compute_wh database: analytics schema: public - redshift: cluster: cluster_id database: analytics databases: - postgresql: upsert: on_conflict_update - mysql: batch_insert: 1000_rows files: - s3: bucket: data-lake path: /processed/{date}/ format: parquet - google_cloud_storage: bucket: data-bucket spreadsheets: - google_sheets: mode: append/overwrite - airtable: base: base_id table: table_name apis: - webhook: url: destination_url batch_size: 100
Scheduling & Monitoring
Pipeline Scheduling
scheduling: patterns: hourly: cron: "0 * * * *" use_for: real_time_dashboards daily: cron: "0 2 * * *" use_for: daily_reports weekly: cron: "0 3 * * 1" use_for: weekly_summaries on_demand: trigger: webhook/manual use_for: ad_hoc_analysis dependencies: - pipeline_a: must_complete_before pipeline_b - wait_for: all_extracts_complete retries: max_attempts: 3 delay: exponential_backoff alert_on: final_failure
Monitoring & Alerts
monitoring: metrics: - rows_processed - execution_time - error_count - data_freshness alerts: pipeline_failed: channels: [slack, pagerduty] template: | 🚨 *Pipeline Failed* Pipeline: {pipeline_name} Stage: {failed_stage} Error: {error_message} [View Logs]({logs_url}) data_quality: trigger: anomaly_detected conditions: - row_count: differs_by > 50% - null_rate: exceeds_threshold - schema: changed_unexpectedly stale_data: trigger: last_update > threshold threshold: 2_hours
Data Quality
Quality Checks
data_quality: schema_validation: - required_fields: [id, date, amount] - field_types: id: integer date: date amount: number - allowed_values: status: [active, pending, closed] statistical_checks: - null_rate: < 5% - duplicate_rate: < 1% - value_range: amount: [0, 1000000] business_rules: - total_equals_sum_of_line_items - dates_are_not_in_future - email_format_valid trend_analysis: - row_count: within_2_std_of_mean - total_value: within_expected_range
Output Example
Request: "Create a daily sales data pipeline"
Output:
# Daily Sales Data Pipeline ## Pipeline Overview
Shopify + Stripe → Transform → BigQuery + Sheets
## Schedule - Runs: 2am daily - Timezone: UTC - Retry: 3 attempts ## Extract ### Shopify Orders ```yaml source: shopify filter: created_at >= yesterday fields: [id, email, total_price, line_items, created_at]
Stripe Payments
source: stripe filter: created >= yesterday fields: [id, amount, status, metadata.order_id]
Transform
// Join and clean data { date: order.created_at.split('T')[0], order_id: order.id, customer: order.email, revenue: parseFloat(order.total_price), items: order.line_items.length, payment_status: payment.status }
Load
BigQuery
- Table:
analytics.sales_daily - Mode: Append
Google Sheets
- Sheet: "Daily Sales Dashboard"
- Tab: "Raw Data"
Quality Checks
- Row count > 0
- No null order_ids
- Revenue sum matches Stripe
Alerts
- Slack: #data-alerts
- On failure: @data-team
--- *Data Pipeline Skill - Part of Claude Office Skills*