Marketplace data-orchestrator
Coordinates data pipeline tasks (ETL, analytics, feature engineering). Use when implementing data ingestion, transformations, quality checks, or analytics. Applies data-quality-standard.md (95% minimum).
install
source · Clone the upstream repo
git clone https://github.com/aiskillstore/marketplace
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/aiskillstore/marketplace "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/brownbull/data-orchestrator" ~/.claude/skills/aiskillstore-marketplace-data-orchestrator && rm -rf "$T"
manifest:
skills/brownbull/data-orchestrator/SKILL.mdsource content
Data Orchestrator Skill
Role
Acts as CTO-Data, managing all data processing, analytics, and pipeline tasks.
Responsibilities
-
Data Pipeline Management
- ETL/ELT processes
- Data validation
- Quality assurance
- Pipeline monitoring
-
Analytics Coordination
- Feature engineering
- Model integration
- Report generation
- Metric calculation
-
Data Governance
- Schema management
- Data lineage tracking
- Privacy compliance
- Access control
-
Context Maintenance
ai-state/active/data/ ├── pipelines.json # Pipeline definitions ├── features.json # Feature registry ├── quality.json # Data quality metrics └── tasks/ # Active data tasks
Skill Coordination
Available Data Skills
- Extract, transform, load operationsetl-skill
- Feature creationfeature-engineering-skill
- Analysis and reportinganalytics-skill
- Data quality checksquality-skill
- Pipeline orchestrationpipeline-skill
Context Package to Skills
context: task_id: "task-003-pipeline" pipelines: existing: ["daily_aggregation", "customer_segmentation"] schedule: "0 2 * * *" features: current: ["revenue_30d", "churn_risk"] dependencies: ["transactions", "customers"] standards: - "data-quality-standard.md" - "feature-engineering.md" test_requirements: quality: ["completeness", "accuracy", "timeliness"]
Task Processing Flow
-
Receive Task
- Identify data sources
- Check dependencies
- Validate requirements
-
Prepare Context
- Current pipeline state
- Feature definitions
- Quality metrics
-
Assign to Skill
- Choose data skill
- Set parameters
- Define outputs
-
Monitor Execution
- Track pipeline progress
- Monitor resource usage
- Check quality gates
-
Validate Results
- Data quality checks
- Output validation
- Performance metrics
- Lineage tracking
Data-Specific Standards
Pipeline Checklist
- Input validation
- Error handling
- Checkpoint/recovery
- Monitoring enabled
- Documentation updated
- Performance optimized
Quality Checklist
- Completeness checks
- Accuracy validation
- Consistency rules
- Timeliness metrics
- Uniqueness constraints
- Validity ranges
Feature Engineering Checklist
- Business logic documented
- Dependencies tracked
- Version controlled
- Performance tested
- Edge cases handled
- Monitoring added
Integration Points
With Backend Orchestrator
- Data model alignment
- API data contracts
- Database optimization
- Cache strategies
With Frontend Orchestrator
- Dashboard data requirements
- Real-time vs batch
- Data freshness SLAs
- Visualization formats
With Human-Docs
Updates documentation with:
- Pipeline changes
- Feature definitions
- Data dictionaries
- Quality reports
Event Communication
Listening For
{ "event": "data.source.updated", "source": "transactions", "schema_change": true, "impact": ["daily_pipeline", "revenue_features"] }
Broadcasting
{ "event": "data.pipeline.completed", "pipeline": "daily_aggregation", "records_processed": 50000, "duration": "5m 32s", "quality_score": 98.5 }
Test Requirements
Every Data Task Must Include
- Unit Tests - Transformation logic
- Integration Tests - Pipeline flow
- Data Quality Tests - Accuracy, completeness
- Performance Tests - Processing speed
- Edge Case Tests - Null, empty, invalid data
- Regression Tests - Output consistency
Success Metrics
- Pipeline success rate > 99%
- Data quality score > 95%
- Processing time < SLA
- Zero data loss
- Feature coverage > 90%
Common Patterns
ETL Pattern
class ETLOrchestrator: def run_pipeline(self, task): # 1. Extract from sources # 2. Validate input data # 3. Transform data # 4. Quality checks # 5. Load to destination # 6. Update lineage
Feature Pattern
class FeatureOrchestrator: def create_feature(self, task): # 1. Define feature logic # 2. Identify dependencies # 3. Implement calculation # 4. Add to feature store # 5. Create monitoring
Data Processing Guidelines
Batch Processing
- Use for large volumes
- Schedule during off-peak
- Implement checkpointing
- Monitor resource usage
Stream Processing
- Use for real-time needs
- Implement windowing
- Handle late arrivals
- Maintain state
Data Quality Rules
- Completeness - No missing required fields
- Accuracy - Values within expected ranges
- Consistency - Cross-dataset alignment
- Timeliness - Data freshness requirements
- Uniqueness - No unwanted duplicates
- Validity - Format and type correctness
Anti-Patterns to Avoid
❌ Processing without validation ❌ No error recovery mechanism ❌ Missing data lineage ❌ Hardcoded transformations ❌ No monitoring/alerting ❌ Manual intervention required