Claude-skill-registry data-lake-management
Data Lake architecture and management including medallion architecture (bronze/silver/gold zones), data catalog with AWS Glue, partitioning strategies, schema evolution, data quality, governance, cost optimization, S3 lifecycle policies, data retention, compliance, query optimization with Athena, data formats (Parquet, ORC, Avro), incremental processing, CDC patterns, and production best practices for scalable data lakes.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/data-lake-management" ~/.claude/skills/majiayu000-claude-skill-registry-data-lake-management && rm -rf "$T"
manifest:
skills/data/data-lake-management/SKILL.mdsource content
Data Lake Management Skill
Purpose
Design, build, and maintain production-grade data lakes on AWS using medallion architecture, ensuring data quality, governance, and cost optimization.
When to Use This Skill
Auto-activates when working with:
- Data lake architecture design
- Medallion architecture (Bronze/Silver/Gold)
- AWS S3 data organization
- AWS Glue Data Catalog
- Data partitioning strategies
- Schema evolution
- Data quality and governance
- Cost optimization
- Query performance tuning
Core Concepts
Medallion Architecture
Bronze Zone (Raw) ├── Immutable source data ├── Original format ├── Minimal validation └── Append-only Silver Zone (Cleaned) ├── Validated and cleaned ├── Deduplicated ├── Standardized schema └── Business rules applied Gold Zone (Curated) ├── Aggregated ├── Business-ready ├── Optimized for queries └── Subject-oriented
S3 Data Organization
s3://data-lake-{env}/ ├── bronze/ # Raw zone │ ├── source_system_1/ │ │ ├── table_name/ │ │ │ ├── year=2025/ │ │ │ │ ├── month=11/ │ │ │ │ │ ├── day=04/ │ │ │ │ │ │ └── data.parquet │ ├── _metadata/ # Metadata and manifests │ └── _dlq/ # Dead letter queue ├── silver/ # Processed zone │ ├── domain_1/ │ │ ├── entity_name/ │ │ │ ├── date=2025-11-04/ │ │ │ │ └── part-*.parquet ├── gold/ # Curated zone │ ├── analytics/ │ │ ├── daily_aggregates/ │ │ ├── monthly_reports/ └── temp/ # Temporary processing
Quick Start Examples
1. Bronze Zone Ingestion
import boto3 import pyarrow as pa import pyarrow.parquet as pq from datetime import datetime import json class BronzeIngestion: """Ingest raw data to bronze zone.""" def __init__(self, bucket: str, source_system: str): self.s3 = boto3.client('s3') self.bucket = bucket self.source_system = source_system def ingest(self, data: list, table_name: str): """Ingest data with partitioning by ingestion date.""" now = datetime.now() partition_path = f"year={now.year}/month={now.month:02d}/day={now.day:02d}" # Convert to Arrow Table table = pa.Table.from_pylist(data) # Add metadata metadata = { b'ingestion_timestamp': now.isoformat().encode(), b'source_system': self.source_system.encode(), b'record_count': str(len(data)).encode() } table = table.replace_schema_metadata(metadata) # Write to S3 s3_path = f"s3://{self.bucket}/bronze/{self.source_system}/{table_name}/{partition_path}/data.parquet" pq.write_to_dataset( table, root_path=s3_path, filesystem=pa.fs.S3FileSystem(), compression='snappy', use_dictionary=True, write_statistics=True ) # Update Glue Data Catalog self.update_catalog(table_name, partition_path) def update_catalog(self, table_name: str, partition: str): """Update Glue Data Catalog with new partition.""" glue = boto3.client('glue') try: glue.create_partition( DatabaseName='bronze', TableName=f"{self.source_system}_{table_name}", PartitionInput={ 'Values': partition.split('/'), 'StorageDescriptor': { 'Location': f"s3://{self.bucket}/bronze/{self.source_system}/{table_name}/{partition}/", 'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat', 'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat', 'SerdeInfo': { 'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' } } } ) except glue.exceptions.AlreadyExistsException: pass # Partition already exists
2. Silver Zone Processing (PySpark)
from pyspark.sql import SparkSession from pyspark.sql.functions import * from delta.tables import DeltaTable class SilverZoneProcessor: """Process bronze to silver with Delta Lake.""" def __init__(self): self.spark = (SparkSession.builder .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()) def process_incremental(self, bronze_table: str, silver_path: str, primary_key: str): """Incremental processing with deduplication and SCD Type 2.""" # Read bronze (new records only) df_bronze = (self.spark.read .format("delta") .load(bronze_table) .filter(col("processed") == False)) # Clean and transform df_cleaned = (df_bronze # Remove corrupt records .filter(col("_corrupt_record").isNull()) # Standardize schema .select( col("id").cast("string").alias("id"), col("timestamp").cast("timestamp"), col("value").cast("double"), to_json(col("metadata")).alias("metadata_json") ) # Add processing metadata .withColumn("processed_at", current_timestamp()) .withColumn("is_active", lit(True)) .withColumn("valid_from", col("timestamp")) .withColumn("valid_to", lit(None).cast("timestamp")) # Deduplicate (keep latest per key) .withColumn("row_num", row_number().over( Window.partitionBy(primary_key) .orderBy(col("timestamp").desc()) ) ) .filter(col("row_num") == 1) .drop("row_num") ) # Merge into silver (SCD Type 2) if DeltaTable.isDeltaTable(self.spark, silver_path): delta_table = DeltaTable.forPath(self.spark, silver_path) # Close out old records delta_table.alias("target").merge( df_cleaned.alias("source"), f"target.{primary_key} = source.{primary_key} AND target.is_active = true" ).whenMatchedUpdate( condition="source.timestamp > target.valid_from", set={ "is_active": "false", "valid_to": "source.timestamp" } ).execute() # Insert new records delta_table.alias("target").merge( df_cleaned.alias("source"), f"target.{primary_key} = source.{primary_key} AND target.is_active = true" ).whenNotMatchedInsert( values={ col: f"source.{col}" for col in df_cleaned.columns } ).execute() else: # Initial load df_cleaned.write.format("delta").save(silver_path) # Mark bronze records as processed (df_bronze .withColumn("processed", lit(True)) .write.format("delta").mode("overwrite").save(bronze_table)) # Optimize delta table delta_table.optimize().executeCompaction()
3. Gold Zone Aggregation
class GoldZoneAggregator: """Create business-ready aggregated tables.""" def create_daily_summary(self, silver_path: str, gold_path: str, date: str): """Daily aggregation for analytics.""" df_silver = spark.read.format("delta").load(silver_path) df_gold = (df_silver .filter(col("date") == date) .filter(col("is_active") == True) # Business aggregations .groupBy("date", "category", "region") .agg( count("*").alias("total_count"), sum("value").alias("total_value"), avg("value").alias("avg_value"), min("value").alias("min_value"), max("value").alias("max_value"), stddev("value").alias("stddev_value"), approx_count_distinct("user_id").alias("unique_users") ) # Add derived metrics .withColumn("value_per_user", col("total_value") / col("unique_users")) # Add metadata .withColumn("created_at", current_timestamp()) .withColumn("data_version", lit("1.0")) ) # Write with overwrite for specific partition (df_gold.write .format("delta") .mode("overwrite") .option("replaceWhere", f"date = '{date}'") .partitionBy("date") .save(gold_path)) # Create/update Athena table self.create_athena_table(gold_path, "daily_summary")
Data Quality Framework
from great_expectations.dataset import SparkDFDataset import great_expectations as ge class DataQualityValidator: """Data quality validation with Great Expectations.""" def validate_silver_data(self, df): """Validate silver zone data quality.""" ge_df = SparkDFDataset(df) # Schema validation ge_df.expect_column_to_exist("id") ge_df.expect_column_to_exist("timestamp") ge_df.expect_column_to_exist("value") # Data validation ge_df.expect_column_values_to_not_be_null("id") ge_df.expect_column_values_to_be_unique("id") ge_df.expect_column_values_to_be_between("value", min_value=0, max_value=1000000) # Business rules ge_df.expect_column_values_to_match_regex("email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") # Get validation results results = ge_df.validate() if not results.success: failed_expectations = [exp for exp in results.results if not exp.success] raise DataQualityError(f"Validation failed: {failed_expectations}") return results
Resource Files
resources/partitioning-strategies.md
- Partition key selection
- Multi-level partitioning
- Partition pruning optimization
- Partition size management
resources/schema-evolution.md
- Schema versioning
- Backward compatibility
- Column addition/removal
- Data type changes
resources/cost-optimization.md
- S3 storage classes
- Lifecycle policies
- Compression formats
- Query optimization
resources/data-governance.md
- Access control (Lake Formation)
- Data lineage tracking
- Compliance (GDPR, HIPAA)
- Data retention policies
resources/query-optimization.md
- Athena query tuning
- Partition pruning
- Columnar format benefits
- Caching strategies
Best Practices
- Partition by date for time-series data
- Use Parquet with Snappy compression
- Implement schema evolution carefully
- Monitor data quality continuously
- Use Delta Lake for ACID transactions
- Implement data lineage tracking
- Set up lifecycle policies for cost control
- Use Glue Data Catalog for discoverability
- Implement incremental processing
- Monitor query costs and optimize
- Use proper access controls
- Document data schemas
- Implement data retention policies
Common Patterns
Incremental Processing with Bookmarks
# Read with bookmark bookmark = get_bookmark("my_job") df_new = spark.read.parquet(source).filter(col("timestamp") > bookmark) # Process df_processed = transform(df_new) # Write and update bookmark df_processed.write.parquet(target) update_bookmark("my_job", df_new.agg(max("timestamp")).collect()[0][0])
Data Catalog Registration
glue.create_table( DatabaseName='silver', TableInput={ 'Name': 'events', 'StorageDescriptor': { 'Columns': schema_columns, 'Location': 's3://data-lake/silver/events/', 'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat', 'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat', 'SerdeInfo': { 'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' } }, 'PartitionKeys': [ {'Name': 'date', 'Type': 'string'} ] } )
Status: Production-Ready Last Updated: 2025-11-04 Architecture: Medallion (Bronze → Silver → Gold)