Claude-skill-registry airflow-3x-migration
Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/airflow-3x-migration" ~/.claude/skills/majiayu000-claude-skill-registry-airflow-3x-migration && rm -rf "$T"
manifest:
skills/data/airflow-3x-migration/SKILL.mdsafety · automated scan (low risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
- makes HTTP requests (curl)
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content
Airflow 3.x Skills
Import Path Changes
Operators
# Airflow 2.x from airflow.operators.python import PythonOperator # Airflow 3.x from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator
Sensors
# Airflow 3.x from airflow.providers.standard.sensors.filesystem import FileSensor from airflow.providers.standard.sensors.time import TimeSensor
Removed Features
| Removed | Replacement |
|---|---|
| |
| Use standard DAG loading |
| Provider packages |
param | param |
DAG Definition Changes
# Airflow 3.x preferred from airflow import DAG from datetime import datetime with DAG( dag_id="my_dag", schedule="@daily", # Not schedule_interval start_date=datetime(2024, 1, 1), catchup=False, tags=["betting", "sports"], ) as dag: ...
TaskFlow API (Preferred)
from airflow.decorators import dag, task @dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False) def betting_workflow(): @task def download_games(sport: str) -> list: # Returns are automatically passed via XCom return fetch_games(sport) @task def update_elo(games: list) -> dict: return calculate_elo(games) # Chain tasks games = download_games("nba") ratings = update_elo(games) betting_dag = betting_workflow()
Asset-Based Scheduling (Replaces Dataset)
from airflow.sdk import Asset # Define assets games_data = Asset("games_data") elo_ratings = Asset("elo_ratings") # Producer DAG @dag(schedule="@daily") def download_dag(): @task(outlets=[games_data]) def download(): ... # Consumer DAG - triggers when asset updates @dag(schedule=[games_data]) def process_dag(): @task def process(): ...
Setup/Teardown Tasks
@task def setup_db_connection(): return create_connection() @task def cleanup_connection(conn): conn.close() @task def process_data(conn): ... # Define setup/teardown relationship with dag: conn = setup_db_connection() process_data(conn) >> cleanup_connection(conn) # Or use context manager style conn.as_setup() >> process_data(conn) >> conn.as_teardown()
DAG Versioning
from airflow import DAG with DAG( dag_id="betting_workflow", version="2.0.0", # New in 3.x schedule="@daily", ) as dag: ...
Backfill Changes
# Airflow 3.x - use REST API curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" \ -H "Content-Type: application/json" \ -d '{"logical_date": "2024-01-15T00:00:00Z"}' # Or use new backfill command airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-15
New REST API Endpoints
import requests # Get DAG runs response = requests.get( "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns", auth=("admin", "admin") ) # Trigger DAG response = requests.post( "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns", json={"conf": {"sport": "nba"}}, auth=("admin", "admin") )
Edge Labels
from airflow.utils.edgemodifier import Label download >> Label("success") >> process download >> Label("failure") >> alert
Migration Checklist
- Update all operator imports to provider packages
- Replace
withschedule_intervalschedule - Convert SubDags to TaskGroups
- Replace Dataset with Asset
- Test DAG parsing with
python dags/my_dag.py - Update docker-compose to Airflow 3.x image
Files to Reference
- Already uses 3.x importsdags/multi_sport_betting_workflow.py- Airflow 3.0 Migration Guide
Airflow 3.x CLI
- Has changed significantly since Airflow 2.
- Please look at latest docs before running CLI commands