Claude-code-plugins databricks-core-workflow-a
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/databricks-pack/skills/databricks-core-workflow-a" ~/.claude/skills/jeremylongshore-claude-code-plugins-databricks-core-workflow-a && rm -rf "$T"
manifest:
plugins/saas-packs/databricks-pack/skills/databricks-core-workflow-a/SKILL.mdsource content
Databricks Core Workflow A: Delta Lake ETL
Overview
Build production Delta Lake ETL pipelines using the medallion architecture (Bronze > Silver > Gold). Uses Auto Loader (
cloudFiles) for incremental ingestion, MERGE INTO for upserts, and Delta Live Tables for declarative pipelines.
Prerequisites
- Completed
setupdatabricks-install-auth - Unity Catalog enabled with catalogs/schemas created
- Access to cloud storage for raw data (S3, ADLS, GCS)
Architecture
Raw Sources (S3/ADLS/GCS) │ Auto Loader (cloudFiles) ▼ Bronze (raw + metadata) │ Cleanse, deduplicate, type-cast ▼ Silver (conformed) │ Aggregate, join, feature engineer ▼ Gold (analytics-ready)
Instructions
Step 1: Bronze Layer — Raw Ingestion with Auto Loader
Auto Loader (
cloudFiles format) incrementally processes new files as they arrive. It handles schema inference, evolution, and scales to millions of files.
from pyspark.sql import SparkSession from pyspark.sql.functions import current_timestamp, input_file_name, lit spark = SparkSession.builder.getOrCreate() # Streaming ingestion with Auto Loader bronze_stream = ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", "/checkpoints/bronze/orders/schema") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaEvolutionMode", "addNewColumns") .load("s3://data-lake/raw/orders/") ) # Add ingestion metadata bronze_with_meta = ( bronze_stream .withColumn("_ingested_at", current_timestamp()) .withColumn("_source_file", input_file_name()) .withColumn("_source_system", lit("orders-api")) ) # Write to bronze Delta table (bronze_with_meta.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/checkpoints/bronze/orders/data") .option("mergeSchema", "true") .toTable("prod_catalog.bronze.raw_orders"))
Step 2: Silver Layer — Cleansing and Deduplication
Read from Bronze, apply business logic, and MERGE INTO Silver with upsert semantics.
from pyspark.sql.functions import col, trim, lower, to_timestamp, sha2, concat_ws from delta.tables import DeltaTable # Read new records from bronze (batch mode for scheduled jobs) bronze_df = spark.table("prod_catalog.bronze.raw_orders") # Apply transformations silver_df = ( bronze_df .withColumn("order_id", col("order_id").cast("string")) .withColumn("customer_email", lower(trim(col("customer_email")))) .withColumn("order_date", to_timestamp(col("order_date"), "yyyy-MM-dd'T'HH:mm:ss")) .withColumn("amount", col("amount").cast("decimal(12,2)")) .withColumn("email_hash", sha2(col("customer_email"), 256)) .filter(col("order_id").isNotNull()) .dropDuplicates(["order_id"]) ) # Upsert into silver with MERGE if spark.catalog.tableExists("prod_catalog.silver.orders"): target = DeltaTable.forName(spark, "prod_catalog.silver.orders") (target.alias("t") .merge(silver_df.alias("s"), "t.order_id = s.order_id") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute()) else: silver_df.write.format("delta").saveAsTable("prod_catalog.silver.orders")
Step 3: Gold Layer — Business Aggregations
Aggregate Silver data into analytics-ready tables. Use partition-level overwrites for efficient updates.
from pyspark.sql.functions import sum as _sum, count, avg, date_trunc # Daily order metrics gold_metrics = ( spark.table("prod_catalog.silver.orders") .withColumn("order_day", date_trunc("day", col("order_date"))) .groupBy("order_day", "region") .agg( count("order_id").alias("total_orders"), _sum("amount").alias("total_revenue"), avg("amount").alias("avg_order_value"), ) ) # Overwrite only changed partitions (gold_metrics.write .format("delta") .mode("overwrite") .option("replaceWhere", f"order_day >= '{target_date}'") .saveAsTable("prod_catalog.gold.daily_order_metrics"))
Step 4: Delta Table Maintenance
-- Compact small files (bin-packing) OPTIMIZE prod_catalog.silver.orders; -- Z-order for query performance on frequently filtered columns OPTIMIZE prod_catalog.silver.orders ZORDER BY (order_date, region); -- Or use Liquid Clustering (DBR 13.3+) — replaces partitioning + Z-order ALTER TABLE prod_catalog.silver.orders CLUSTER BY (order_date, region); OPTIMIZE prod_catalog.silver.orders; -- Clean up old file versions (default: 7 days) VACUUM prod_catalog.silver.orders RETAIN 168 HOURS; -- Compute statistics for query optimizer ANALYZE TABLE prod_catalog.silver.orders COMPUTE STATISTICS;
Step 5: Delta Live Tables (Declarative Pipeline)
DLT manages orchestration, data quality, lineage, and error handling automatically.
import dlt from pyspark.sql.functions import col, current_timestamp @dlt.table( comment="Raw orders from Auto Loader", table_properties={"quality": "bronze"}, ) def bronze_orders(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load("s3://data-lake/raw/orders/") .withColumn("_ingested_at", current_timestamp()) ) @dlt.table(comment="Cleansed orders") @dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL") @dlt.expect_or_drop("valid_amount", "amount > 0") def silver_orders(): return ( dlt.read_stream("bronze_orders") .withColumn("amount", col("amount").cast("decimal(12,2)")) .dropDuplicates(["order_id"]) ) @dlt.table(comment="Daily revenue metrics") def gold_daily_revenue(): return ( dlt.read("silver_orders") .groupBy("region", "order_date") .agg({"amount": "sum", "order_id": "count"}) )
Step 6: Schedule the Pipeline
from databricks.sdk import WorkspaceClient from databricks.sdk.service.jobs import ( CreateJob, Task, NotebookTask, JobCluster, CronSchedule, ) from databricks.sdk.service.compute import ClusterSpec, AutoScale w = WorkspaceClient() job = w.jobs.create( name="daily-orders-etl", tasks=[ Task(task_key="bronze", job_cluster_key="etl", notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/bronze")), Task(task_key="silver", job_cluster_key="etl", notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/silver"), depends_on=[{"task_key": "bronze"}]), Task(task_key="gold", job_cluster_key="etl", notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/gold"), depends_on=[{"task_key": "silver"}]), ], job_clusters=[JobCluster( job_cluster_key="etl", new_cluster=ClusterSpec( spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", autoscale=AutoScale(min_workers=1, max_workers=4), ), )], schedule=CronSchedule(quartz_cron_expression="0 0 6 * * ?", timezone_id="UTC"), max_concurrent_runs=1, ) print(f"Created job: {job.job_id}")
Output
- Bronze layer with raw data, Auto Loader schema evolution, and ingestion metadata
- Silver layer with cleansed, deduplicated, type-cast data via MERGE upserts
- Gold layer with business-ready aggregations
- Table maintenance schedule (OPTIMIZE, VACUUM, ANALYZE)
- Optional DLT pipeline with built-in data quality expectations
Error Handling
| Error | Cause | Solution |
|---|---|---|
| Source schema changed | Auto Loader handles this; for batch add |
| Multiple jobs writing same table | Use MERGE with retry logic or serialize writes via |
| Bad source data | Add or |
| Driver collecting large results | Never call on large data; use to keep distributed |
| Retention < 7 days | Set minimum |
Examples
Quick Pipeline Validation
-- Verify row counts flow through medallion layers SELECT 'bronze' AS layer, COUNT(*) AS rows FROM prod_catalog.bronze.raw_orders UNION ALL SELECT 'silver', COUNT(*) FROM prod_catalog.silver.orders UNION ALL SELECT 'gold', COUNT(*) FROM prod_catalog.gold.daily_order_metrics;
Resources
Next Steps
For ML workflows, see
databricks-core-workflow-b.