Cc-skills ml-data-pipeline-architecture

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse. TRIGGERS - data pipeline, polars vs pandas, arrow format, clickhouse ml, efficient loading, zero-copy, memory optimization.

install
source · Clone the upstream repo
git clone https://github.com/terrylica/cc-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/terrylica/cc-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/devops-tools/skills/ml-data-pipeline-architecture" ~/.claude/skills/terrylica-cc-skills-ml-data-pipeline-architecture && rm -rf "$T"
manifest: plugins/devops-tools/skills/ml-data-pipeline-architecture/SKILL.md
source content

ML Data Pipeline Architecture

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.

ADR: 2026-01-22-polars-preference-hook (efficiency preferences framework)

Note: A PreToolUse hook enforces Polars preference. To use Pandas, add

# polars-exception: <reason>
at file top.

Self-Evolving Skill: This skill improves through use. If instructions are wrong, parameters drifted, or a workaround was needed — fix this file immediately, don't defer. Only update for real, reproducible issues.

When to Use This Skill

Use this skill when:

  • Deciding between Polars and Pandas for a data pipeline
  • Optimizing memory usage with zero-copy Arrow patterns
  • Loading data from ClickHouse into PyTorch DataLoaders
  • Implementing lazy evaluation for large datasets
  • Migrating existing Pandas code to Polars

1. Decision Tree: Polars vs Pandas

Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)

Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions

Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)

2. Zero-Copy Pipeline Architecture

The Problem with Pandas

# BAD: 3 memory copies
df = pd.read_sql(query, conn)     # Copy 1: DB → pandas
X = df[features].values           # Copy 2: pandas → numpy
tensor = torch.from_numpy(X)      # Copy 3: numpy → tensor
# Peak memory: 3x data size

The Solution with Arrow

# GOOD: 0-1 memory copies
import clickhouse_connect
import polars as pl
import torch

client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars")  # Arrow in DB memory
df = pl.from_arrow(arrow_table)                          # Zero-copy view
X = df.select(features).to_numpy()                       # Single allocation
tensor = torch.from_numpy(X)                             # View (no copy)
# Peak memory: 1.2x data size

3. ClickHouse Integration Patterns

Pattern A: Arrow Stream (Recommended)

def query_arrow(client, query: str) -> pl.DataFrame:
    """ClickHouse → Arrow → Polars (zero-copy chain)."""
    arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
    return pl.from_arrow(arrow_table)

# Usage
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")

Pattern B: Polars Native (Simpler)

# Polars has native ClickHouse support (see pola.rs for version requirements)
df = pl.read_database_uri(
    query="SELECT * FROM bars",
    uri="clickhouse://user:pass@host/db"
)

Pattern C: Parquet Export (Batch Jobs)

# For reproducible batch processing
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet")  # Lazy, memory-mapped

4. PyTorch DataLoader Integration

Minimal Change Pattern

from torch.utils.data import TensorDataset, DataLoader

# Accept both pandas and polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
    if isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df)

    X = df.select(features).to_numpy()
    y = df.select(target).to_numpy()

    return (
        torch.from_numpy(X).float(),
        torch.from_numpy(y).float()
    )

X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)

Custom PolarsDataset (Large Data)

class PolarsDataset(torch.utils.data.Dataset):
    """Memory-efficient dataset from Polars DataFrame."""

    def __init__(self, df: pl.DataFrame, features: list[str], target: str):
        self.arrow = df.to_arrow()  # Arrow backing for zero-copy slicing
        self.features = features
        self.target = target

    def __len__(self) -> int:
        return self.arrow.num_rows

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        row = self.arrow.slice(idx, 1)
        x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
        y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
        return x, y

5. Lazy Evaluation Patterns

Pipeline Composition

# Define transformations lazily (no computation yet)
pipeline = (
    pl.scan_parquet("raw_data.parquet")
    .filter(pl.col("timestamp") >= start_date)
    .with_columns([
        (pl.col("close").pct_change()).alias("returns"),
        (pl.col("volume").log()).alias("log_volume"),
    ])
    .select(features + [target])
)

# Execute only when needed
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()

Streaming for Large Files

# Process file in chunks (never loads full file)
def process_large_file(path: str, chunk_size: int = 100_000):
    reader = pl.scan_parquet(path)

    for batch in reader.iter_batches(n_rows=chunk_size):
        # Process each chunk
        features = compute_features(batch)
        yield features.to_numpy()

6. Schema Validation

Pydantic for Config

from pydantic import BaseModel, field_validator

class FeatureConfig(BaseModel):
    features: list[str]
    target: str
    seq_len: int = 15

    @field_validator("features")
    @classmethod
    def validate_features(cls, v):
        required = {"returns_vs", "momentum_z", "atr_pct"}
        missing = required - set(v)
        if missing:
            raise ValueError(f"Missing required features: {missing}")
        return v

DataFrame Schema Validation

def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
    """Fail-fast schema validation."""
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(
            f"[{stage}] Missing columns: {missing}\n"
            f"Available: {sorted(df.columns)}"
        )

7. Performance Benchmarks

OperationPandasPolarsSpeedup
Read CSV (1GB)45s4s11x
Filter rows2.1s0.4s5x
Group-by agg3.8s0.3s13x
Sort5.2s0.4s13x
Memory peak10GB2.5GB4x

Benchmark: 50M rows, 20 columns, MacBook M2


8. Migration Checklist

Phase 1: Add Arrow Support

  • Add
    polars = "<version>"
    to dependencies (see PyPI)
  • Implement
    query_arrow()
    in data client
  • Verify zero-copy with memory profiler

Phase 2: Polars at Entry Points

  • Add
    pl.from_pandas()
    wrapper at trainer entry
  • Update
    prepare_sequences()
    to accept both types
  • Add schema validation after conversion

Phase 3: Full Lazy Evaluation

  • Convert file reads to
    pl.scan_*
  • Compose transformations lazily
  • Call
    .collect()
    only before
    .to_numpy()

9. Anti-Patterns to Avoid

DON'T: Mix APIs Unnecessarily

# BAD: Convert back and forth
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas()  # Why?

DON'T: Collect Too Early

# BAD: Defeats lazy evaluation
df = pl.scan_parquet("data.parquet").collect()  # Full load
filtered = df.filter(...)  # After the fact

# GOOD: Filter before collect
df = pl.scan_parquet("data.parquet").filter(...).collect()

DON'T: Ignore Memory Pressure

# BAD: Loads entire file
df = pl.read_parquet("huge_file.parquet")

# GOOD: Stream in chunks
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
    process(batch)

References


Troubleshooting

IssueCauseSolution
Memory spike during loadCollecting too earlyUse lazy evaluation, call collect() only when needed
Arrow conversion failsUnsupported data typeCheck for object columns, convert to native types
ClickHouse connection errorWrong port or credentialsVerify host:8123 (HTTP) or host:9000 (native)
Zero-copy not workingIntermediate pandas conversionRemove to_pandas() calls, stay in Arrow/Polars
Polars hook blocking codePandas used without exceptionAdd
# polars-exception: reason
comment at file top
Slow group-by operationsUsing pandas for large datasetsMigrate to Polars for 5-10x speedup
Schema validation failureColumn names case-sensitiveVerify exact column names from source
PyTorch DataLoader OOMLoading full dataset into memoryUse PolarsDataset with Arrow backing for lazy access
Parquet scan performanceNot using predicate pushdownAdd filters before collect() for lazy evaluation
Type mismatch in tensorFloat64 vs Float32 mismatchExplicitly cast with .cast(pl.Float32) before numpy

Post-Execution Reflection

After this skill completes, reflect before closing the task:

  1. Locate yourself. — Find this SKILL.md's canonical path before editing.
  2. What failed? — Fix the instruction that caused it.
  3. What worked better than expected? — Promote to recommended practice.
  4. What drifted? — Fix any script, reference, or dependency that no longer matches reality.
  5. Log it. — Evolution-log entry with trigger, fix, and evidence.

Do NOT defer. The next invocation inherits whatever you leave behind.