Claude-skill-registry data-cleaning-pipeline-generator
Generates data cleaning pipelines for pandas/polars with handling for missing values, duplicates, outliers, type conversions, and data validation. Use when user asks to "clean data", "generate data pipeline", "handle missing values", or "remove duplicates from dataset".
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/data-cleaning-pipeline-generator" ~/.claude/skills/majiayu000-claude-skill-registry-data-cleaning-pipeline-generator && rm -rf "$T"
manifest:
skills/data/data-cleaning-pipeline-generator/SKILL.mdsource content
Data Cleaning Pipeline Generator
Generates comprehensive data cleaning and preprocessing pipelines using pandas, polars, or PySpark with best practices for handling messy data.
When to Use
- "Clean my dataset"
- "Generate data cleaning pipeline"
- "Handle missing values"
- "Remove duplicates"
- "Fix data types"
- "Detect and remove outliers"
Instructions
1. Analyze Dataset
import pandas as pd # Load data df = pd.read_csv('data.csv') # Basic info print(df.info()) print(df.describe()) print(df.head()) # Check for issues print("\nMissing values:") print(df.isnull().sum()) print("\nDuplicates:") print(f"Total duplicates: {df.duplicated().sum()}") print("\nData types:") print(df.dtypes)
2. Generate Pandas Cleaning Pipeline
Complete Pipeline:
import pandas as pd import numpy as np from datetime import datetime class DataCleaningPipeline: """Data cleaning pipeline for pandas DataFrames.""" def __init__(self, df: pd.DataFrame): self.df = df.copy() self.original_shape = df.shape self.cleaning_log = [] def log(self, message: str): """Log cleaning steps.""" self.cleaning_log.append(f"[{datetime.now()}] {message}") print(message) def remove_duplicates(self, subset=None, keep='first'): """Remove duplicate rows.""" before = len(self.df) self.df = self.df.drop_duplicates(subset=subset, keep=keep) removed = before - len(self.df) self.log(f"Removed {removed} duplicate rows") return self def handle_missing_values(self, strategy='auto'): """Handle missing values based on strategy.""" missing = self.df.isnull().sum() columns_with_missing = missing[missing > 0] for col in columns_with_missing.index: missing_pct = (missing[col] / len(self.df)) * 100 if missing_pct > 50: self.log(f"Dropping column '{col}' ({missing_pct:.1f}% missing)") self.df = self.df.drop(columns=[col]) continue if self.df[col].dtype in ['int64', 'float64']: # Numeric columns if strategy == 'mean': fill_value = self.df[col].mean() elif strategy == 'median': fill_value = self.df[col].median() else: # auto fill_value = self.df[col].median() self.df[col] = self.df[col].fillna(fill_value) self.log(f"Filled '{col}' with {strategy}: {fill_value:.2f}") else: # Categorical columns if strategy == 'mode': fill_value = self.df[col].mode()[0] else: # auto fill_value = 'Unknown' self.df[col] = self.df[col].fillna(fill_value) self.log(f"Filled '{col}' with: {fill_value}") return self def fix_data_types(self, type_mapping=None): """Convert columns to appropriate data types.""" if type_mapping is None: type_mapping = {} for col in self.df.columns: if col in type_mapping: try: self.df[col] = self.df[col].astype(type_mapping[col]) self.log(f"Converted '{col}' to {type_mapping[col]}") except Exception as e: self.log(f"Failed to convert '{col}': {e}") else: # Auto-detect dates if 'date' in col.lower() or 'time' in col.lower(): try: self.df[col] = pd.to_datetime(self.df[col]) self.log(f"Converted '{col}' to datetime") except: pass return self def remove_outliers(self, columns=None, method='iqr', threshold=1.5): """Remove outliers using IQR or Z-score method.""" if columns is None: columns = self.df.select_dtypes(include=[np.number]).columns before = len(self.df) for col in columns: if method == 'iqr': Q1 = self.df[col].quantile(0.25) Q3 = self.df[col].quantile(0.75) IQR = Q3 - Q1 lower = Q1 - threshold * IQR upper = Q3 + threshold * IQR mask = (self.df[col] >= lower) & (self.df[col] <= upper) else: # z-score z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std()) mask = z_scores < threshold self.df = self.df[mask] removed = before - len(self.df) self.log(f"Removed {removed} outlier rows using {method} method") return self def normalize_text(self, columns=None): """Normalize text columns (lowercase, strip whitespace).""" if columns is None: columns = self.df.select_dtypes(include=['object']).columns for col in columns: self.df[col] = self.df[col].str.strip().str.lower() self.log(f"Normalized text in '{col}'") return self def encode_categorical(self, columns=None, method='label'): """Encode categorical variables.""" if columns is None: columns = self.df.select_dtypes(include=['object']).columns for col in columns: if method == 'label': self.df[col] = pd.Categorical(self.df[col]).codes self.log(f"Label encoded '{col}'") elif method == 'onehot': dummies = pd.get_dummies(self.df[col], prefix=col) self.df = pd.concat([self.df.drop(columns=[col]), dummies], axis=1) self.log(f"One-hot encoded '{col}'") return self def validate_ranges(self, range_checks): """Validate numeric columns are within expected ranges.""" for col, (min_val, max_val) in range_checks.items(): invalid = ((self.df[col] < min_val) | (self.df[col] > max_val)).sum() if invalid > 0: self.log(f"WARNING: {invalid} values in '{col}' outside range [{min_val}, {max_val}]") # Remove invalid rows self.df = self.df[(self.df[col] >= min_val) & (self.df[col] <= max_val)] return self def generate_report(self): """Generate cleaning report.""" report = f""" Data Cleaning Report ==================== Original Shape: {self.original_shape} Final Shape: {self.df.shape} Rows Removed: {self.original_shape[0] - self.df.shape[0]} Columns Removed: {self.original_shape[1] - self.df.shape[1]} Cleaning Steps: """ for step in self.cleaning_log: report += f" - {step}\n" return report def get_cleaned_data(self): """Return cleaned DataFrame.""" return self.df # Usage pipeline = DataCleaningPipeline(df) cleaned_df = ( pipeline .remove_duplicates() .handle_missing_values(strategy='auto') .fix_data_types() .remove_outliers(method='iqr', threshold=1.5) .normalize_text() .validate_ranges({'age': (0, 120), 'price': (0, 1000000)}) .get_cleaned_data() ) print(pipeline.generate_report()) cleaned_df.to_csv('cleaned_data.csv', index=False)
3. Polars Pipeline (Faster for Large Data)
import polars as pl # Load data df = pl.read_csv('data.csv') # Cleaning pipeline cleaned_df = ( df # Remove duplicates .unique() # Handle missing values .with_columns([ pl.col('age').fill_null(pl.col('age').median()), pl.col('name').fill_null('Unknown'), ]) # Fix data types .with_columns([ pl.col('date').str.strptime(pl.Date, '%Y-%m-%d'), pl.col('amount').cast(pl.Float64), ]) # Remove outliers .filter( (pl.col('age') >= 0) & (pl.col('age') <= 120) ) # Normalize text .with_columns([ pl.col('name').str.to_lowercase().str.strip(), pl.col('email').str.to_lowercase(), ]) ) # Save cleaned_df.write_csv('cleaned_data.csv')
4. PySpark Pipeline (For Big Data)
from pyspark.sql import SparkSession from pyspark.sql.functions import col, when, mean, trim, lower from pyspark.sql.types import IntegerType, DoubleType spark = SparkSession.builder.appName('DataCleaning').getOrCreate() # Load data df = spark.read.csv('data.csv', header=True, inferSchema=True) # Cleaning pipeline cleaned_df = ( df # Remove duplicates .dropDuplicates() # Handle missing values .fillna({ 'age': df.select(mean('age')).collect()[0][0], 'name': 'Unknown', }) # Fix data types .withColumn('age', col('age').cast(IntegerType())) .withColumn('amount', col('amount').cast(DoubleType())) # Remove outliers .filter((col('age') >= 0) & (col('age') <= 120)) # Normalize text .withColumn('name', trim(lower(col('name')))) .withColumn('email', trim(lower(col('email')))) ) # Save cleaned_df.write.csv('cleaned_data', header=True, mode='overwrite')
5. Data Quality Checks
def data_quality_checks(df): """Run comprehensive data quality checks.""" report = [] # Check 1: Missing values missing = df.isnull().sum() if missing.sum() > 0: report.append(f"⚠️ Missing values found:\n{missing[missing > 0]}") # Check 2: Duplicates duplicates = df.duplicated().sum() if duplicates > 0: report.append(f"⚠️ {duplicates} duplicate rows found") # Check 3: Data types expected_types = { 'age': 'int64', 'amount': 'float64', 'date': 'datetime64[ns]', } for col, expected in expected_types.items(): if col in df.columns and df[col].dtype != expected: report.append(f"⚠️ Column '{col}' has type {df[col].dtype}, expected {expected}") # Check 4: Value ranges if 'age' in df.columns: invalid_age = ((df['age'] < 0) | (df['age'] > 120)).sum() if invalid_age > 0: report.append(f"⚠️ {invalid_age} invalid age values") # Check 5: Unique identifiers if 'id' in df.columns: if df['id'].duplicated().any(): report.append(f"⚠️ Duplicate IDs found") # Check 6: Consistency if 'email' in df.columns: invalid_email = ~df['email'].str.contains('@', na=False) if invalid_email.sum() > 0: report.append(f"⚠️ {invalid_email.sum()} invalid email addresses") if report: print("Data Quality Issues:") for issue in report: print(issue) else: print("✅ All quality checks passed!") return len(report) == 0 # Run checks data_quality_checks(cleaned_df)
6. Automated Cleaning Function
def auto_clean_dataframe(df, config=None): """Automatically clean DataFrame with sensible defaults.""" if config is None: config = { 'remove_duplicates': True, 'handle_missing': True, 'remove_outliers': True, 'fix_types': True, 'normalize_text': True, } print(f"Original shape: {df.shape}") if config['remove_duplicates']: df = df.drop_duplicates() print(f"After removing duplicates: {df.shape}") if config['handle_missing']: # Numeric: fill with median numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: df[col] = df[col].fillna(df[col].median()) # Categorical: fill with mode or 'Unknown' categorical_cols = df.select_dtypes(include=['object']).columns for col in categorical_cols: if df[col].mode().empty: df[col] = df[col].fillna('Unknown') else: df[col] = df[col].fillna(df[col].mode()[0]) print(f"After handling missing values: {df.shape}") if config['remove_outliers']: numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 df = df[(df[col] >= Q1 - 1.5 * IQR) & (df[col] <= Q3 + 1.5 * IQR)] print(f"After removing outliers: {df.shape}") if config['normalize_text']: text_cols = df.select_dtypes(include=['object']).columns for col in text_cols: df[col] = df[col].str.strip().str.lower() return df # Usage cleaned_df = auto_clean_dataframe(df)
7. Save Pipeline Configuration
# data_cleaning_config.yaml cleaning_pipeline: remove_duplicates: enabled: true subset: ['id', 'email'] keep: 'first' missing_values: strategy: auto drop_threshold: 50 # Drop columns with >50% missing numeric_fill: median categorical_fill: mode outliers: method: iqr threshold: 1.5 columns: ['age', 'price', 'quantity'] data_types: age: int64 price: float64 date: datetime64 email: string text_normalization: lowercase: true strip_whitespace: true remove_special_chars: false validation: ranges: age: [0, 120] price: [0, 1000000] required_columns: ['id', 'name', 'email']
Best Practices
DO:
- Always keep original data
- Log all cleaning steps
- Validate data quality
- Handle missing values appropriately
- Remove duplicates early
- Check for outliers
- Validate data types
- Document assumptions
DON'T:
- Delete original data
- Fill all missing with zeros
- Ignore outliers
- Mix data types
- Skip validation
- Overfit to training data
- Remove too many rows
- Forget to save cleaned data
Checklist
- Loaded and inspected data
- Removed duplicates
- Handled missing values
- Fixed data types
- Removed/handled outliers
- Normalized text fields
- Validated data quality
- Generated cleaning report
- Saved cleaned data