Claude-skill-registry big-data
Apache Spark, Hadoop, distributed computing, and large-scale data processing for petabyte-scale workloads
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/big-data" ~/.claude/skills/majiayu000-claude-skill-registry-big-data && rm -rf "$T"
manifest:
skills/data/big-data/SKILL.mdsource content
Big Data & Distributed Computing
Production-grade big data processing with Apache Spark, distributed systems patterns, and petabyte-scale data engineering.
Quick Start
# PySpark 3.5+ modern DataFrame API from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.window import Window # Initialize Spark with optimal settings spark = (SparkSession.builder .appName("ProductionETL") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate()) # Efficient data loading with schema enforcement from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType schema = StructType([ StructField("event_id", StringType(), False), StructField("user_id", LongType(), False), StructField("event_type", StringType(), False), StructField("timestamp", TimestampType(), False), StructField("properties", StringType(), True) ]) df = (spark.read .schema(schema) .parquet("s3://bucket/events/") .filter(F.col("timestamp") >= F.current_date() - 30)) # Complex aggregation with window functions window_spec = Window.partitionBy("user_id").orderBy("timestamp") result = (df .withColumn("event_rank", F.row_number().over(window_spec)) .withColumn("session_id", F.sum( F.when( F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"), 1 ).otherwise(0) ).over(window_spec)) .groupBy("user_id", "session_id") .agg( F.count("*").alias("event_count"), F.min("timestamp").alias("session_start"), F.max("timestamp").alias("session_end") )) result.write.mode("overwrite").parquet("s3://bucket/sessions/")
Core Concepts
1. Spark Architecture Deep Dive
┌─────────────────────────────────────────────────────────┐ │ Driver Program │ │ ┌─────────────────────────────────────────────────┐ │ │ │ SparkContext/SparkSession │ │ │ │ - Creates execution plan (DAG) │ │ │ │ - Coordinates with Cluster Manager │ │ │ │ - Schedules tasks │ │ │ └─────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ Cluster Manager (YARN/K8s/Standalone) │ └─────────────────────────────────────────────────────────┘ │ ┌────────────────┼────────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Executor │ │ Executor │ │ Executor │ │ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │ │ │Task 1│ │ │ │Task 2│ │ │ │Task 3│ │ │ │Task 4│ │ │ │Task 5│ │ │ │Task 6│ │ │ └──────┘ │ │ └──────┘ │ │ └──────┘ │ │ Cache │ │ Cache │ │ Cache │ └──────────┘ └──────────┘ └──────────┘
2. Partition Optimization
from pyspark.sql import functions as F # Check current partitioning print(f"Partitions: {df.rdd.getNumPartitions()}") # Rule of thumb: 128MB per partition, 2-4 partitions per core # For 100GB data on 10 executors with 4 cores each: # 100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions # Use: 200-400 partitions # Repartition by key (for joins) df_repartitioned = df.repartition(200, "user_id") # Coalesce (reduce partitions without shuffle) df_coalesced = df.coalesce(100) # Optimal write partitioning df.repartition(F.year("date"), F.month("date")) \ .write \ .partitionBy("year", "month") \ .mode("overwrite") \ .parquet("s3://bucket/output/") # Bucketing for repeated joins df.write \ .bucketBy(256, "user_id") \ .sortBy("user_id") \ .saveAsTable("bucketed_events")
3. Join Optimization Strategies
from pyspark.sql import functions as F # Broadcast join (small table < 10MB default, configurable to 100MB) small_df = spark.read.parquet("s3://bucket/dim_product/") # 5MB large_df = spark.read.parquet("s3://bucket/fact_sales/") # 500GB # Explicit broadcast hint from pyspark.sql.functions import broadcast result = large_df.join(broadcast(small_df), "product_id") # Increase broadcast threshold spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB # Sort-Merge Join (for large tables) # Both tables sorted and partitioned by join key users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id") orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id") result = users.join(orders, "user_id") # Skewed join handling (salting technique) # If user_id has skew (some users have millions of rows) salt_range = 10 salted_users = (users .withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_range)]))) .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt"))) salted_orders = (orders .withColumn("salt", (F.rand() * salt_range).cast("int")) .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt"))) result = salted_users.join(salted_orders, "salted_key").drop("salt", "salted_key")
4. Caching & Persistence
from pyspark import StorageLevel # Caching strategies df.cache() # MEMORY_AND_DISK by default in Spark 3.x df.persist(StorageLevel.MEMORY_ONLY) # Fastest, may recompute if evicted df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Compressed, slower but less memory df.persist(StorageLevel.DISK_ONLY) # For very large intermediate datasets # When to cache: # - Reused DataFrames (used in multiple actions) # - After expensive transformations (joins, aggregations) # - Before iterative algorithms # Cache usage pattern expensive_df = (spark.read.parquet("s3://bucket/large/") .filter(F.col("status") == "active") .join(broadcast(dim_df), "dim_key") .groupBy("category") .agg(F.sum("amount").alias("total"))) expensive_df.cache() expensive_df.count() # Materialize cache # Use cached DataFrame multiple times top_categories = expensive_df.orderBy(F.desc("total")).limit(10) summary = expensive_df.agg(F.avg("total"), F.max("total")) # Release cache when done expensive_df.unpersist()
5. Structured Streaming
from pyspark.sql import functions as F from pyspark.sql.types import * # Read from Kafka kafka_df = (spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("subscribe", "events") .option("startingOffsets", "latest") .option("maxOffsetsPerTrigger", 100000) .load()) # Parse JSON payload event_schema = StructType([ StructField("event_id", StringType()), StructField("user_id", LongType()), StructField("event_type", StringType()), StructField("timestamp", TimestampType()) ]) parsed_df = (kafka_df .select(F.from_json(F.col("value").cast("string"), event_schema).alias("data")) .select("data.*") .withWatermark("timestamp", "10 minutes")) # Windowed aggregation windowed_counts = (parsed_df .groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" ) .count()) # Write to Delta Lake with checkpointing query = (windowed_counts.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "s3://bucket/checkpoints/events/") .trigger(processingTime="1 minute") .start("s3://bucket/streaming_output/")) # Monitor stream query.awaitTermination()
Tools & Technologies
| Tool | Purpose | Version (2025) |
|---|---|---|
| Apache Spark | Distributed processing | 3.5+ |
| Delta Lake | ACID transactions | 3.0+ |
| Apache Iceberg | Table format | 1.4+ |
| Apache Flink | Stream processing | 1.18+ |
| Databricks | Managed Spark platform | Latest |
| AWS EMR | Managed Hadoop/Spark | 7.0+ |
| Trino | Interactive queries | 400+ |
| dbt | Transform layer | 1.7+ |
Learning Path
Phase 1: Foundations (Weeks 1-3)
Week 1: Distributed computing concepts, MapReduce Week 2: Spark architecture, RDDs, DataFrames Week 3: Spark SQL, basic transformations
Phase 2: Intermediate (Weeks 4-6)
Week 4: Joins, aggregations, window functions Week 5: Partitioning, bucketing, caching Week 6: Performance tuning, Spark UI analysis
Phase 3: Advanced (Weeks 7-10)
Week 7: Structured Streaming Week 8: Delta Lake / Iceberg table formats Week 9: Cluster sizing, cost optimization Week 10: Advanced optimizations (AQE, skew handling)
Phase 4: Production (Weeks 11-14)
Week 11: Deployment on EMR/Databricks Week 12: Monitoring, alerting, debugging Week 13: CI/CD for Spark jobs Week 14: Multi-cluster architectures
Production Patterns
Delta Lake UPSERT (Merge)
from delta.tables import DeltaTable # Incremental UPSERT pattern delta_table = DeltaTable.forPath(spark, "s3://bucket/users/") updates_df = spark.read.parquet("s3://bucket/updates/") delta_table.alias("target").merge( updates_df.alias("source"), "target.user_id = source.user_id" ).whenMatchedUpdate(set={ "email": "source.email", "updated_at": "source.updated_at" }).whenNotMatchedInsertAll().execute() # Optimize after merge delta_table.optimize().executeCompaction() delta_table.vacuum(retentionHours=168) # 7 days
Cost-Effective Cluster Configuration
# spark-submit configuration for 1TB processing job """ spark-submit \ --master yarn \ --deploy-mode cluster \ --num-executors 50 \ --executor-cores 4 \ --executor-memory 16g \ --driver-memory 8g \ --conf spark.sql.adaptive.enabled=true \ --conf spark.sql.adaptive.coalescePartitions.enabled=true \ --conf spark.sql.shuffle.partitions=400 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.minExecutors=10 \ --conf spark.dynamicAllocation.maxExecutors=100 \ --conf spark.speculation=true \ job.py """ # Sizing guidelines: # - Executor memory: 16-32GB (avoid GC overhead) # - Executor cores: 4-5 (parallelism per executor) # - Total cores: 2-4x data size in GB # - Partitions: 2-4x total cores
Troubleshooting Guide
Common Failure Modes
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| OOM Error | "Container killed by YARN" | Too much data per partition | Increase partitions, reduce broadcast |
| Shuffle Spill | Slow stage, disk I/O | Insufficient memory | Increase |
| Skewed Tasks | One task much slower | Data skew on key | Use salting, AQE skew handling |
| GC Overhead | "GC overhead limit exceeded" | Too many small objects | Use Kryo serialization, reduce UDFs |
| Driver OOM | Driver crash | collect(), large broadcast | Avoid collect, stream results |
Debug Checklist
# 1. Check Spark UI (port 4040/18080) # - Stages: Look for skewed tasks (max >> median) # - Storage: Check cached data size # - Environment: Verify configuration # 2. Analyze execution plan df.explain(mode="extended") # 3. Check partition distribution df.groupBy(F.spark_partition_id()).count().show() # 4. Profile data skew df.groupBy("key_column").count().orderBy(F.desc("count")).show(20) # 5. Monitor job metrics spark.sparkContext.setLogLevel("WARN") # 6. Enable detailed metrics spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") spark.conf.set("spark.eventLog.enabled", "true")
Reading Spark UI
Stage Analysis: ├── Duration: Total time for stage ├── Tasks: Number of parallel tasks │ ├── Median: Typical task duration │ ├── Max: Slowest task (check for skew) │ └── Failed: Retry count ├── Input: Data read ├── Shuffle Read: Data from other stages ├── Shuffle Write: Data for downstream stages └── Spill: Disk spill (indicates memory pressure) Key Metrics: ├── GC Time > 10%: Memory issue ├── Shuffle Write > Input: Exploding join ├── Max/Median > 2x: Data skew └── Spill > 0: Increase partitions or memory
Unit Test Template
import pytest from pyspark.sql import SparkSession from chispa.dataframe_comparer import assert_df_equality import pyspark.sql.functions as F @pytest.fixture(scope="session") def spark(): """Session-scoped Spark for tests.""" return (SparkSession.builder .master("local[2]") .appName("UnitTests") .config("spark.sql.shuffle.partitions", 2) .getOrCreate()) @pytest.fixture def sample_data(spark): return spark.createDataFrame([ (1, "user1", 100.0), (2, "user2", 200.0), (3, "user1", 150.0), ], ["id", "user_id", "amount"]) class TestAggregations: def test_user_totals(self, spark, sample_data): # Arrange expected = spark.createDataFrame([ ("user1", 250.0), ("user2", 200.0), ], ["user_id", "total"]) # Act result = sample_data.groupBy("user_id").agg( F.sum("amount").alias("total") ) # Assert assert_df_equality(result, expected, ignore_row_order=True) def test_handles_empty_dataframe(self, spark): # Arrange empty_df = spark.createDataFrame([], "id INT, amount DOUBLE") # Act result = empty_df.agg(F.sum("amount").alias("total")).collect() # Assert assert result[0]["total"] is None def test_window_functions(self, spark, sample_data): # Arrange from pyspark.sql.window import Window window = Window.partitionBy("user_id").orderBy("id") # Act result = sample_data.withColumn( "running_total", F.sum("amount").over(window) ).filter(F.col("user_id") == "user1") # Assert totals = [row["running_total"] for row in result.collect()] assert totals == [100.0, 250.0]
Best Practices
Performance
# ✅ DO: Use DataFrame API over RDD df.filter(F.col("status") == "active") # Catalyst optimized # ❌ DON'T: Use RDD transformations rdd.filter(lambda x: x["status"] == "active") # No optimization # ✅ DO: Use built-in functions df.withColumn("upper_name", F.upper("name")) # ❌ DON'T: Use Python UDFs (slow serialization) @udf def upper_name(name): return name.upper() # ✅ DO: Broadcast small lookups df.join(broadcast(small_df), "key") # ✅ DO: Persist wisely intermediate.cache() intermediate.count() # Force materialization # ... use intermediate multiple times ... intermediate.unpersist()
Code Organization
# ✅ DO: Chain transformations fluently result = (df .filter(condition) .withColumn("new_col", F.expr("...")) .groupBy("key") .agg(F.sum("value"))) # ✅ DO: Use descriptive column aliases .agg( F.count("*").alias("event_count"), F.avg("amount").alias("avg_amount") ) # ✅ DO: Parameterize for reusability def add_date_features(df, date_col): return (df .withColumn("year", F.year(date_col)) .withColumn("month", F.month(date_col)) .withColumn("day_of_week", F.dayofweek(date_col)))
Resources
Official Documentation
Performance Tuning
Books
- "Learning Spark 2nd Edition" by Damji et al.
- "Spark: The Definitive Guide" by Chambers & Zaharia
- "High Performance Spark" by Karau & Warren
Next Skills
After mastering Big Data:
- →
- Design dimensional modelsdata-warehousing - →
- Deploy ML at scalemlops - →
- Real-time with Flink/Kafkastreaming - →
- AWS EMR, Databrickscloud-platforms
Skill Certification Checklist:
- Can optimize Spark jobs using EXPLAIN and Spark UI
- Can implement efficient joins with broadcast and bucketing
- Can handle data skew with salting techniques
- Can build streaming pipelines with Structured Streaming
- Can use Delta Lake for ACID operations