Claude-skill-registry airflow-workflows
Apache Airflow DAG design, operators, and scheduling best practices.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/airflow-workflows" ~/.claude/skills/majiayu000-claude-skill-registry-airflow-workflows && rm -rf "$T"
manifest:
skills/data/airflow-workflows/SKILL.mdsource content
Airflow Workflows
DAG Structure
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime, timedelta default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'retries': 3, 'retry_delay': timedelta(minutes=5), } with DAG( 'daily_etl', default_args=default_args, description='Daily ETL pipeline', schedule_interval='0 6 * * *', # 6 AM daily start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'daily'], ) as dag: extract = PythonOperator( task_id='extract_data', python_callable=extract_function, ) transform = SQLExecuteQueryOperator( task_id='transform_data', conn_id='warehouse', sql='sql/transform.sql', ) load = PythonOperator( task_id='load_data', python_callable=load_function, ) extract >> transform >> load
Common Operators
| Operator | Use Case |
|---|---|
| Custom Python code |
| Shell commands |
| Database queries |
| Cloud data transfers |
| dbt Cloud jobs |
Best Practices
- Idempotent tasks - Safe to re-run
- Small tasks - Easy to debug, retry
- XCom sparingly - Only small data
- Templating - Use
for dates{{ ds }} - Sensors wisely - Avoid blocking workers
Task Dependencies
# Linear task1 >> task2 >> task3 # Parallel [task1, task2] >> task3 # Complex task1 >> [task2, task3] [task2, task3] >> task4
Dynamic DAGs
for table in ['users', 'orders', 'products']: task = PythonOperator( task_id=f'process_{table}', python_callable=process_table, op_kwargs={'table': table}, )