Claude-skill-registry-data medallion-architecture
Bronze/Silver/Gold layer design patterns and templates for building scalable data lakehouse architectures. Includes incremental processing, data quality checks, and optimization strategies.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry-data
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry-data "$T" && mkdir -p ~/.claude/skills && cp -r "$T/data/medallion-architecture" ~/.claude/skills/majiayu000-claude-skill-registry-data-medallion-architecture && rm -rf "$T"
manifest:
data/medallion-architecture/SKILL.mdsource content
Medallion Architecture Skill
Overview
The medallion architecture (also called multi-hop architecture) is a design pattern for organizing data in a lakehouse using three progressive layers:
- Bronze (Raw): Ingested data in its original format
- Silver (Refined): Cleansed and conformed data
- Gold (Curated): Business-level aggregates and features
When to Use This Skill
Use this skill when you need to:
- Design a new data pipeline with proper layering
- Migrate from traditional ETL to lakehouse architecture
- Implement incremental processing patterns
- Build a scalable data platform
- Ensure data quality at each layer
Architecture Principles
1. Bronze Layer (Raw)
Purpose: Store raw data exactly as received from source systems
Characteristics:
- Immutable historical record
- Schema-on-read approach
- Metadata enrichment (_ingested_at, _source_file)
- Minimal transformations
- Full audit trail
Use Cases:
- Data recovery
- Reprocessing requirements
- Audit compliance
- Debugging data issues
2. Silver Layer (Refined)
Purpose: Cleansed, validated, and standardized data
Characteristics:
- Schema enforcement
- Data quality checks
- Deduplication
- Standardization
- Type conversions
- Business rules applied
Use Cases:
- Downstream analytics
- Feature engineering
- Data science modeling
- Operational reporting
3. Gold Layer (Curated)
Purpose: Business-level aggregates optimized for consumption
Characteristics:
- Highly aggregated
- Optimized for queries
- Business KPIs
- Feature tables
- Production-ready datasets
Use Cases:
- Dashboards and BI
- ML model serving
- Real-time applications
- Executive reporting
Implementation Patterns
Pattern 1: Batch Processing
Bronze Layer:
def ingest_to_bronze(source_path: str, target_table: str): """Ingest raw data to Bronze layer.""" df = (spark.read .format("cloudFiles") .option("cloudFiles.format", "parquet") .load(source_path) .withColumn("_ingested_at", current_timestamp()) .withColumn("_source_file", input_file_name()) ) (df.write .format("delta") .mode("append") .option("mergeSchema", "true") .saveAsTable(target_table) )
Silver Layer:
def process_to_silver(bronze_table: str, silver_table: str): """Transform Bronze to Silver with quality checks.""" bronze_df = spark.read.table(bronze_table) silver_df = (bronze_df .dropDuplicates(["id"]) .filter(col("id").isNotNull()) .withColumn("email", lower(trim(col("email")))) .withColumn("created_date", to_date(col("created_at"))) .withColumn("quality_score", when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0) .otherwise(0.5) ) ) (silver_df.write .format("delta") .mode("overwrite") .saveAsTable(silver_table) )
Gold Layer:
def aggregate_to_gold(silver_table: str, gold_table: str): """Aggregate Silver to Gold business metrics.""" silver_df = spark.read.table(silver_table) gold_df = (silver_df .groupBy("customer_segment", "region") .agg( count("*").alias("customer_count"), sum("lifetime_value").alias("total_ltv"), avg("quality_score").alias("avg_quality") ) .withColumn("updated_at", current_timestamp()) ) (gold_df.write .format("delta") .mode("overwrite") .saveAsTable(gold_table) )
Pattern 2: Incremental Processing
Bronze (Streaming):
(spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load(source_path) .withColumn("_ingested_at", current_timestamp()) .writeStream .format("delta") .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(bronze_table) )
Silver (Incremental Merge):
from delta.tables import DeltaTable def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str): """Incrementally merge new Bronze data into Silver.""" # Get new records since last watermark new_records = (spark.read.table(bronze_table) .filter(col("_ingested_at") > watermark) ) # Transform transformed = transform_to_silver(new_records) # Merge into Silver silver = DeltaTable.forName(spark, silver_table) (silver.alias("target") .merge( transformed.alias("source"), "target.id = source.id" ) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute() )
Data Quality Patterns
Quality Checks at Each Layer
Bronze:
- File completeness check
- Row count validation
- Schema drift detection
Silver:
- Null value checks
- Data type validation
- Business rule validation
- Referential integrity
- Duplicate detection
Gold:
- Aggregate accuracy
- KPI threshold checks
- Trend anomaly detection
- Completeness validation
Quality Check Implementation
def validate_silver_quality(table_name: str) -> Dict[str, bool]: """Run quality checks on Silver table.""" df = spark.read.table(table_name) checks = { "no_null_ids": df.filter(col("id").isNull()).count() == 0, "valid_emails": df.filter( ~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$") ).count() == 0, "no_duplicates": df.count() == df.select("id").distinct().count(), "within_date_range": df.filter( (col("created_date") < "2020-01-01") | (col("created_date") > current_date()) ).count() == 0 } return checks
Optimization Strategies
Bronze Layer Optimization
-- Partition by ingestion date CREATE TABLE bronze.raw_events USING delta PARTITIONED BY (ingestion_date) AS SELECT *, current_date() as ingestion_date FROM source; -- Enable auto-optimize ALTER TABLE bronze.raw_events SET TBLPROPERTIES ( 'delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true' );
Silver Layer Optimization
-- Z-ORDER for common filters OPTIMIZE silver.customers ZORDER BY (customer_segment, region, created_date); -- Enable Change Data Feed ALTER TABLE silver.customers SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
Gold Layer Optimization
-- Liquid clustering for query performance CREATE TABLE gold.customer_metrics USING delta CLUSTER BY (customer_segment, date) AS SELECT * FROM aggregated_metrics; -- Optimize and vacuum OPTIMIZE gold.customer_metrics; VACUUM gold.customer_metrics RETAIN 168 HOURS;
Complete Example
See
/templates/bronze-silver-gold/ for a complete implementation including:
- Project structure
- Bronze ingestion scripts
- Silver transformation logic
- Gold aggregation queries
- Data quality tests
- Deployment configuration
Best Practices
- Idempotency: Ensure pipelines can be re-run safely
- Incrementality: Process only new/changed data
- Quality Gates: Block bad data from progressing
- Schema Evolution: Handle schema changes gracefully
- Monitoring: Track pipeline health and data quality
- Documentation: Document data lineage and transformations
- Testing: Unit test transformations, integration test pipelines
Common Pitfalls to Avoid
❌ Don't:
- Mix transformation logic across layers
- Skip Bronze layer to "save storage"
- Over-aggregate too early
- Ignore data quality in Silver
- Hard-code business logic in Bronze
✅ Do:
- Keep Bronze immutable
- Enforce quality in Silver
- Optimize Gold for consumption
- Use incremental processing
- Implement proper monitoring
Related Skills
: Declarative pipeline orchestrationdelta-live-tables
: Great Expectations integrationdata-quality
: Pipeline testing strategiestesting-patterns
: Deployment automationcicd-workflows