Claude-scaffold predictive-analytics

Predictive Analytics

install
source · Clone the upstream repo
git clone https://github.com/pyramidheadshark/claude-scaffold
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/pyramidheadshark/claude-scaffold "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/predictive-analytics" ~/.claude/skills/pyramidheadshark-claude-scaffold-predictive-analytics && rm -rf "$T"
manifest: .claude/skills/predictive-analytics/SKILL.md
source content

Predictive Analytics

When to Load This Skill

Load when working with: scikit-learn pipelines, feature engineering, tabular ML, time series, model training/evaluation, MLflow experiment tracking, model registry, cross-validation.

Project Structure for ML Projects

src/{project_name}/
├── core/
│   └── domain.py
├── ml/
│   ├── __init__.py
│   ├── features/
│   │   ├── __init__.py
│   │   ├── builder.py       # FeatureBuilder — assembles feature matrix
│   │   └── transformers.py  # custom sklearn transformers
│   ├── models/
│   │   ├── __init__.py
│   │   ├── trainer.py       # training pipeline
│   │   └── evaluator.py     # metrics computation
│   └── registry/
│       └── mlflow_adapter.py

Sklearn Pipeline Standard

Always use

Pipeline
— never apply transformations outside of it. This ensures train/test consistency and prevents data leakage.

import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler


def build_pipeline(
    numeric_features: list[str],
    categorical_features: list[str],
) -> Pipeline:
    numeric_transformer = Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler()),
    ])

    categorical_transformer = Pipeline([
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
    ])

    preprocessor = ColumnTransformer([
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ])

    return Pipeline([
        ("preprocessor", preprocessor),
        ("classifier", GradientBoostingClassifier(n_estimators=200, random_state=42)),
    ])

Custom Transformer Pattern

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin


class DateFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, date_column: str) -> None:
        self.date_column = date_column

    def fit(self, X: pd.DataFrame, y=None) -> "DateFeatureExtractor":
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X = X.copy()
        dt = pd.to_datetime(X[self.date_column])
        X[f"{self.date_column}_year"] = dt.dt.year
        X[f"{self.date_column}_month"] = dt.dt.month
        X[f"{self.date_column}_dayofweek"] = dt.dt.dayofweek
        X[f"{self.date_column}_quarter"] = dt.dt.quarter
        return X.drop(columns=[self.date_column])

MLflow Experiment Tracking

import mlflow
import mlflow.sklearn
from sklearn.metrics import classification_report
from sklearn.model_selection import cross_val_score
import numpy as np
import pandas as pd


class ExperimentTracker:
    def __init__(self, experiment_name: str, tracking_uri: str = "mlruns") -> None:
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment_name)

    def run(
        self,
        pipeline,
        X_train: pd.DataFrame,
        y_train: pd.Series,
        X_test: pd.DataFrame,
        y_test: pd.Series,
        params: dict,
        run_name: str = "",
    ) -> str:
        with mlflow.start_run(run_name=run_name) as run:
            mlflow.log_params(params)

            cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring="f1_weighted")
            mlflow.log_metric("cv_f1_mean", float(np.mean(cv_scores)))
            mlflow.log_metric("cv_f1_std", float(np.std(cv_scores)))

            pipeline.fit(X_train, y_train)
            y_pred = pipeline.predict(X_test)
            report = classification_report(y_test, y_pred, output_dict=True)

            mlflow.log_metric("test_f1", report["weighted avg"]["f1-score"])
            mlflow.log_metric("test_precision", report["weighted avg"]["precision"])
            mlflow.log_metric("test_recall", report["weighted avg"]["recall"])

            mlflow.sklearn.log_model(pipeline, "model")

            return run.info.run_id

Model Registry and Promotion

import mlflow
from mlflow.tracking import MlflowClient


class ModelRegistryAdapter:
    def __init__(self, model_name: str) -> None:
        self._client = MlflowClient()
        self._model_name = model_name

    def register(self, run_id: str) -> int:
        result = mlflow.register_model(
            model_uri=f"runs:/{run_id}/model",
            name=self._model_name,
        )
        return result.version

    def promote_to_production(self, version: int) -> None:
        self._client.transition_model_version_stage(
            name=self._model_name,
            version=str(version),
            stage="Production",
            archive_existing_versions=True,
        )

    def load_production(self):
        return mlflow.sklearn.load_model(
            model_uri=f"models:/{self._model_name}/Production"
        )

Evaluation Standard

Always report these metrics for classification. Never report accuracy alone.

from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_auc_score,
    average_precision_score,
)
import pandas as pd


def evaluate_classifier(y_true, y_pred, y_prob=None) -> dict:
    report = classification_report(y_true, y_pred, output_dict=True)
    metrics = {
        "f1_weighted": report["weighted avg"]["f1-score"],
        "precision_weighted": report["weighted avg"]["precision"],
        "recall_weighted": report["weighted avg"]["recall"],
        "confusion_matrix": confusion_matrix(y_true, y_pred).tolist(),
    }
    if y_prob is not None:
        metrics["roc_auc"] = roc_auc_score(y_true, y_prob, multi_class="ovr")
        metrics["avg_precision"] = average_precision_score(y_true, y_prob)
    return metrics

Time Series Specifics

For time series data, use

TimeSeriesSplit
— never
train_test_split
with shuffling.

from sklearn.model_selection import TimeSeriesSplit, cross_val_score

tscv = TimeSeriesSplit(n_splits=5, gap=7)
scores = cross_val_score(pipeline, X, y, cv=tscv, scoring="neg_mean_absolute_error")

Feature engineering for time series:

def add_lag_features(df: pd.DataFrame, target_col: str, lags: list[int]) -> pd.DataFrame:
    df = df.copy()
    for lag in lags:
        df[f"{target_col}_lag_{lag}"] = df[target_col].shift(lag)
    return df


def add_rolling_features(df: pd.DataFrame, target_col: str, windows: list[int]) -> pd.DataFrame:
    df = df.copy()
    for w in windows:
        df[f"{target_col}_roll_mean_{w}"] = df[target_col].rolling(w).mean()
        df[f"{target_col}_roll_std_{w}"] = df[target_col].rolling(w).std()
    return df

Required Dependencies

dependencies = [
    "scikit-learn>=1.4.0",
    "pandas>=2.2.0",
    "numpy>=1.26.0",
    "mlflow>=2.12.0",
    "optuna>=3.6.0",     # hyperparameter optimization
]

Further Resources

  • resources/hyperparameter-tuning.md
    — Optuna integration for HPO
  • resources/feature-importance.md
    — SHAP values and feature selection