Claude-skill-registry data-pipeline-processor
Process data files through transformation pipelines with validation, cleaning, and export. Use for CSV/Excel/JSON data processing, encoding handling, batch operations, and data transformation workflows.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/data-pipeline-processor" ~/.claude/skills/majiayu000-claude-skill-registry-data-pipeline-processor && rm -rf "$T"
manifest:
skills/data/data-pipeline-processor/SKILL.mdsource content
Data Pipeline Processor
Version: 1.1.0 Category: Development Last Updated: 2026-01-02
Process data files through transformation pipelines with validation, encoding detection, and multi-format export capabilities.
Quick Start
import pandas as pd from pathlib import Path # Simple pipeline: Load -> Transform -> Export df = pd.read_csv("data/raw/source.csv") # Transform df = df[df['value'] > 0] # Filter df['date'] = pd.to_datetime(df['date']) # Convert types df = df.sort_values('date') # Sort # Export Path("data/processed").mkdir(parents=True, exist_ok=True) df.to_csv("data/processed/cleaned.csv", index=False) print(f"Processed {len(df)} rows")
When to Use
- Processing CSV/Excel/JSON files with validation
- Data cleaning and transformation workflows
- Batch file processing with aggregation
- Handling encoding issues (UTF-8, Latin-1 fallback)
- ETL (Extract, Transform, Load) operations
- Data quality checks and reporting
Core Pattern
Input (CSV/Excel/JSON) -> Validate -> Transform -> Analyze -> Export
Implementation
Data Reader with Encoding Detection
import pandas as pd from pathlib import Path from typing import Any, Dict, List, Optional, Union import logging import chardet logger = logging.getLogger(__name__) class DataReader: """Read data files with automatic encoding detection.""" SUPPORTED_FORMATS = ['csv', 'xlsx', 'xls', 'json', 'parquet'] def __init__(self, encoding_fallback: List[str] = None): """ Initialize data reader. Args: encoding_fallback: List of encodings to try in order """ self.encoding_fallback = encoding_fallback or ['utf-8', 'latin-1', 'cp1252'] def read(self, file_path: str, **kwargs) -> pd.DataFrame: """ Read data file with automatic format and encoding detection. Args: file_path: Path to data file **kwargs: Additional arguments for pandas readers Returns: DataFrame with loaded data """ path = Path(file_path) suffix = path.suffix.lower().lstrip('.') if suffix == 'csv': return self._read_csv(path, **kwargs) elif suffix in ['xlsx', 'xls']: return self._read_excel(path, **kwargs) elif suffix == 'json': return pd.read_json(path, **kwargs) elif suffix == 'parquet': return pd.read_parquet(path, **kwargs) else: raise ValueError(f"Unsupported format: {suffix}") def _read_csv(self, path: Path, **kwargs) -> pd.DataFrame: """Read CSV with encoding fallback.""" # Try to detect encoding with open(path, 'rb') as f: raw = f.read(10000) detected = chardet.detect(raw) detected_encoding = detected.get('encoding', 'utf-8') # Try detected encoding first, then fallbacks encodings_to_try = [detected_encoding] + self.encoding_fallback for encoding in encodings_to_try: try: df = pd.read_csv(path, encoding=encoding, **kwargs) logger.info(f"Successfully read {path} with encoding: {encoding}") return df except UnicodeDecodeError: continue raise ValueError(f"Could not decode {path} with any encoding") def _read_excel(self, path: Path, **kwargs) -> pd.DataFrame: """Read Excel file.""" return pd.read_excel(path, **kwargs)
Data Validator
from dataclasses import dataclass, field from typing import Callable, List, Dict, Any @dataclass class ValidationResult: """Result of data validation.""" is_valid: bool errors: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) stats: Dict[str, Any] = field(default_factory=dict) class DataValidator: """Validate data against configurable rules.""" def __init__(self): self.rules: List[Callable] = [] def add_rule(self, rule: Callable[[pd.DataFrame], ValidationResult]): """Add a validation rule.""" self.rules.append(rule) def validate(self, df: pd.DataFrame) -> ValidationResult: """Run all validation rules.""" all_errors = [] all_warnings = [] all_stats = {} for rule in self.rules: result = rule(df) all_errors.extend(result.errors) all_warnings.extend(result.warnings) all_stats.update(result.stats) return ValidationResult( is_valid=len(all_errors) == 0, errors=all_errors, warnings=all_warnings, stats=all_stats ) # Common validation rules def required_columns_rule(required: List[str]) -> Callable: """Validate required columns exist.""" def rule(df: pd.DataFrame) -> ValidationResult: missing = [col for col in required if col not in df.columns] return ValidationResult( is_valid=len(missing) == 0, errors=[f"Missing required column: {col}" for col in missing], stats={'columns_found': len(df.columns)} ) return rule def no_duplicates_rule(subset: List[str] = None) -> Callable: """Validate no duplicate rows.""" def rule(df: pd.DataFrame) -> ValidationResult: duplicates = df.duplicated(subset=subset).sum() return ValidationResult( is_valid=duplicates == 0, warnings=[f"Found {duplicates} duplicate rows"] if duplicates > 0 else [], stats={'duplicate_count': duplicates} ) return rule def non_null_rule(columns: List[str]) -> Callable: """Validate specified columns have no null values.""" def rule(df: pd.DataFrame) -> ValidationResult: errors = [] stats = {} for col in columns: if col in df.columns: null_count = df[col].isnull().sum() stats[f'{col}_nulls'] = null_count if null_count > 0: errors.append(f"Column '{col}' has {null_count} null values") return ValidationResult( is_valid=len(errors) == 0, errors=errors, stats=stats ) return rule
Data Transformer
class DataTransformer: """Apply transformations to data.""" def __init__(self, df: pd.DataFrame): self.df = df.copy() def rename_columns(self, mapping: Dict[str, str]) -> 'DataTransformer': """Rename columns.""" self.df = self.df.rename(columns=mapping) return self def filter_rows(self, expression: str) -> 'DataTransformer': """Filter rows using query expression.""" self.df = self.df.query(expression) return self def select_columns(self, columns: List[str]) -> 'DataTransformer': """Select specific columns.""" self.df = self.df[columns] return self def drop_columns(self, columns: List[str]) -> 'DataTransformer': """Drop specified columns.""" self.df = self.df.drop(columns=columns, errors='ignore') return self def fill_nulls(self, value: Any = None, method: str = None) -> 'DataTransformer': """Fill null values.""" if method: self.df = self.df.fillna(method=method) else: self.df = self.df.fillna(value) return self def convert_types(self, type_mapping: Dict[str, str]) -> 'DataTransformer': """Convert column types.""" for col, dtype in type_mapping.items(): if col in self.df.columns: if dtype == 'datetime': self.df[col] = pd.to_datetime(self.df[col]) elif dtype == 'numeric': self.df[col] = pd.to_numeric(self.df[col], errors='coerce') else: self.df[col] = self.df[col].astype(dtype) return self def add_column(self, name: str, expression: Callable) -> 'DataTransformer': """Add computed column.""" self.df[name] = expression(self.df) return self def aggregate(self, group_by: List[str], agg_spec: Dict[str, Any]) -> 'DataTransformer': """Aggregate data by groups.""" self.df = self.df.groupby(group_by).agg(agg_spec).reset_index() return self def sort(self, by: List[str], ascending: bool = True) -> 'DataTransformer': """Sort data.""" self.df = self.df.sort_values(by=by, ascending=ascending) return self def get_result(self) -> pd.DataFrame: """Get transformed DataFrame.""" return self.df
Data Exporter
class DataExporter: """Export data to various formats.""" @staticmethod def to_csv(df: pd.DataFrame, path: str, **kwargs) -> str: """Export to CSV.""" Path(path).parent.mkdir(parents=True, exist_ok=True) df.to_csv(path, index=False, **kwargs) return path @staticmethod def to_excel(df: pd.DataFrame, path: str, sheet_name: str = 'Sheet1', **kwargs) -> str: """Export to Excel.""" Path(path).parent.mkdir(parents=True, exist_ok=True) df.to_excel(path, sheet_name=sheet_name, index=False, **kwargs) return path @staticmethod def to_json(df: pd.DataFrame, path: str, orient: str = 'records', **kwargs) -> str: """Export to JSON.""" Path(path).parent.mkdir(parents=True, exist_ok=True) df.to_json(path, orient=orient, **kwargs) return path @staticmethod def to_parquet(df: pd.DataFrame, path: str, **kwargs) -> str: """Export to Parquet.""" Path(path).parent.mkdir(parents=True, exist_ok=True) df.to_parquet(path, **kwargs) return path
Pipeline Orchestrator
from dataclasses import dataclass from typing import List, Dict, Any, Optional @dataclass class PipelineConfig: """Configuration for data pipeline.""" input_path: str output_path: str input_options: Dict[str, Any] = field(default_factory=dict) validation: Dict[str, Any] = field(default_factory=dict) transformations: List[Dict[str, Any]] = field(default_factory=list) output_format: str = 'csv' output_options: Dict[str, Any] = field(default_factory=dict) class DataPipeline: """Orchestrate data processing pipeline.""" def __init__(self, config: PipelineConfig): self.config = config self.reader = DataReader() self.validator = DataValidator() self.exporter = DataExporter() def _setup_validation(self): """Configure validation rules from config.""" validation = self.config.validation if 'required_columns' in validation: self.validator.add_rule( required_columns_rule(validation['required_columns']) ) if 'no_duplicates' in validation: self.validator.add_rule( no_duplicates_rule(validation.get('no_duplicates_subset')) ) if 'non_null_columns' in validation: self.validator.add_rule( non_null_rule(validation['non_null_columns']) ) def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame: """Apply configured transformations.""" transformer = DataTransformer(df) for transform in self.config.transformations: op = transform['operation'] if op == 'rename': transformer.rename_columns(transform['mapping']) elif op == 'filter': transformer.filter_rows(transform['expression']) elif op == 'select': transformer.select_columns(transform['columns']) elif op == 'drop': transformer.drop_columns(transform['columns']) elif op == 'fill_nulls': transformer.fill_nulls( value=transform.get('value'), method=transform.get('method') ) elif op == 'convert_types': transformer.convert_types(transform['types']) elif op == 'aggregate': transformer.aggregate( group_by=transform['group_by'], agg_spec=transform['aggregations'] ) elif op == 'sort': transformer.sort( by=transform['by'], ascending=transform.get('ascending', True) ) return transformer.get_result() def run(self) -> Dict[str, Any]: """Execute the pipeline.""" logger.info(f"Starting pipeline: {self.config.input_path}") # Read input df = self.reader.read(self.config.input_path, **self.config.input_options) logger.info(f"Loaded {len(df)} rows") # Validate self._setup_validation() validation_result = self.validator.validate(df) if not validation_result.is_valid: logger.error(f"Validation failed: {validation_result.errors}") return { 'status': 'failed', 'stage': 'validation', 'errors': validation_result.errors, 'warnings': validation_result.warnings } if validation_result.warnings: logger.warning(f"Validation warnings: {validation_result.warnings}") # Transform df = self._apply_transformations(df) logger.info(f"Transformed to {len(df)} rows") # Export export_method = getattr(self.exporter, f'to_{self.config.output_format}') output_path = export_method(df, self.config.output_path, **self.config.output_options) logger.info(f"Exported to {output_path}") return { 'status': 'success', 'input_rows': len(df), 'output_rows': len(df), 'output_path': output_path, 'validation_stats': validation_result.stats, 'warnings': validation_result.warnings }
YAML Configuration Format
Basic Pipeline Config
# config/pipelines/data_clean.yaml input: path: data/raw/source.csv options: delimiter: "," skiprows: 1 validation: required_columns: - id - timestamp - value non_null_columns: - id - value no_duplicates: true no_duplicates_subset: - id transformations: - operation: rename mapping: old_name: new_name date_col: timestamp - operation: filter expression: "value > 0 and status != 'invalid'" - operation: convert_types types: timestamp: datetime value: numeric - operation: fill_nulls value: 0 - operation: sort by: [timestamp] ascending: true output: path: data/processed/cleaned.csv format: csv options: index: false
Aggregation Pipeline
# config/pipelines/monthly_summary.yaml input: path: data/processed/daily_data.csv validation: required_columns: - date - category - amount transformations: - operation: convert_types types: date: datetime - operation: aggregate group_by: [category] aggregations: amount: - sum - mean - count - operation: rename mapping: amount_sum: total_amount amount_mean: average_amount amount_count: transaction_count - operation: sort by: [total_amount] ascending: false output: path: data/results/monthly_summary.csv format: csv
Usage Examples
Example 1: Simple CSV Processing
# Process CSV with config python -m data_pipeline config/pipelines/clean_data.yaml # Override input/output python -m data_pipeline config/pipelines/clean_data.yaml \ --input data/custom_input.csv \ --output data/custom_output.csv # Dry run (validate only) python -m data_pipeline config/pipelines/clean_data.yaml --dry-run
Example 2: Programmatic Usage
from data_pipeline import DataPipeline, PipelineConfig config = PipelineConfig( input_path='data/raw/sales.csv', output_path='data/processed/sales_clean.csv', validation={ 'required_columns': ['date', 'product', 'amount'], 'non_null_columns': ['amount'] }, transformations=[ {'operation': 'filter', 'expression': 'amount > 0'}, {'operation': 'sort', 'by': ['date']} ] ) pipeline = DataPipeline(config) result = pipeline.run() print(f"Processed {result['output_rows']} rows")
Example 3: Batch Processing
from pathlib import Path from data_pipeline import DataReader, DataTransformer, DataExporter reader = DataReader() exporter = DataExporter() # Process all CSV files in directory input_dir = Path('data/raw/') output_dir = Path('data/processed/') for csv_file in input_dir.glob('*.csv'): df = reader.read(str(csv_file)) # Apply transformations df_clean = (DataTransformer(df) .fill_nulls(value=0) .filter_rows('value > 0') .sort(['timestamp']) .get_result()) # Export output_path = output_dir / csv_file.name exporter.to_csv(df_clean, str(output_path)) print(f"Processed: {csv_file.name}")
Example 4: Multi-Format Export
def export_all_formats(df: pd.DataFrame, base_path: str): """Export data to multiple formats.""" exporter = DataExporter() outputs = { 'csv': exporter.to_csv(df, f"{base_path}.csv"), 'json': exporter.to_json(df, f"{base_path}.json"), 'parquet': exporter.to_parquet(df, f"{base_path}.parquet"), 'excel': exporter.to_excel(df, f"{base_path}.xlsx") } return outputs
Best Practices
Do
- Always detect encoding before reading CSV
- Use chunked reading for large files (>100MB)
- Specify dtypes to reduce memory usage
- Handle missing values explicitly
- Validate early in the pipeline
- Fail fast on critical errors
- Log warnings for non-critical issues
- Track validation statistics
Don't
- Assume encoding is always UTF-8
- Load entire large files into memory
- Skip validation steps
- Ignore encoding errors
- Mix transformation and validation
Data Reading
- Always detect encoding before reading CSV
- Use chunked reading for large files (>100MB)
- Specify dtypes to reduce memory usage
- Handle missing values explicitly
Validation
- Validate early in the pipeline
- Fail fast on critical errors
- Log warnings for non-critical issues
- Track validation statistics
Transformation
- Use method chaining for readability
- Apply filters before expensive operations
- Convert types early to catch errors
- Document transformation logic
Export
- Create output directories automatically
- Use appropriate formats (Parquet for large data)
- Include metadata in output
- Verify output integrity
File Organization
project/ config/ pipelines/ # Pipeline configs clean_data.yaml aggregate.yaml data/ raw/ # Raw input data processed/ # Cleaned data results/ # Analysis results src/ data_pipeline/ # Pipeline code scripts/ run_pipeline.sh # CLI wrapper
Error Handling
Common Errors
| Error | Cause | Solution |
|---|---|---|
| Wrong encoding | Use DataReader with encoding fallback |
| Missing column | Check column names in config |
| Type conversion failed | Use errors='coerce' or validate first |
| File too large | Use chunked reading |
| Input file missing | Verify file path |
Error Template
def safe_pipeline_run(config: PipelineConfig) -> dict: """Run pipeline with comprehensive error handling.""" try: # Validate input exists if not Path(config.input_path).exists(): return {'status': 'error', 'stage': 'input', 'message': 'File not found'} pipeline = DataPipeline(config) return pipeline.run() except UnicodeDecodeError as e: return {'status': 'error', 'stage': 'read', 'message': f'Encoding error: {e}'} except KeyError as e: return {'status': 'error', 'stage': 'transform', 'message': f'Missing column: {e}'} except Exception as e: return {'status': 'error', 'stage': 'unknown', 'message': str(e)}
Execution Checklist
- Input file exists and is readable
- Encoding detected or specified
- Required columns present
- Validation rules configured
- Transformations in correct order
- Output directory exists or is created
- Export format appropriate for data size
- Error handling covers all failure modes
- Logging configured for debugging
Metrics
| Metric | Target | Description |
|---|---|---|
| Read Time | <1s per 100MB | Data loading speed |
| Validation Time | <500ms | Rule checking duration |
| Transform Time | Varies | Depends on operations |
| Export Time | <1s per 100MB | File writing speed |
| Memory Usage | <2x file size | Peak memory consumption |
Related Skills
- yaml-workflow-executor - Workflow orchestration
- engineering-report-generator - Report generation
- parallel-file-processor - Parallel file operations
Version History
- 1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples
- 1.0.0 (2024-10-15): Initial release with DataReader, DataValidator, DataTransformer, pipeline orchestration