Claude-skill-registry airflow-3x-migration

Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/airflow-3x-migration" ~/.claude/skills/majiayu000-claude-skill-registry-airflow-3x-migration && rm -rf "$T"
manifest: skills/data/airflow-3x-migration/SKILL.md
safety · automated scan (low risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
  • makes HTTP requests (curl)
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content

Airflow 3.x Skills

Import Path Changes

Operators

# Airflow 2.x
from airflow.operators.python import PythonOperator

# Airflow 3.x
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator

Sensors

# Airflow 3.x
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.time import TimeSensor

Removed Features

RemovedReplacement
SubDagOperator
TaskGroup
packaged_dag_processor
Use standard DAG loading
airflow.contrib.*
Provider packages
schedule_interval
param
schedule
param

DAG Definition Changes

# Airflow 3.x preferred
from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="my_dag",
    schedule="@daily",  # Not schedule_interval
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["betting", "sports"],
) as dag:
    ...

TaskFlow API (Preferred)

from airflow.decorators import dag, task

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def betting_workflow():

    @task
    def download_games(sport: str) -> list:
        # Returns are automatically passed via XCom
        return fetch_games(sport)

    @task
    def update_elo(games: list) -> dict:
        return calculate_elo(games)

    # Chain tasks
    games = download_games("nba")
    ratings = update_elo(games)

betting_dag = betting_workflow()

Asset-Based Scheduling (Replaces Dataset)

from airflow.sdk import Asset

# Define assets
games_data = Asset("games_data")
elo_ratings = Asset("elo_ratings")

# Producer DAG
@dag(schedule="@daily")
def download_dag():
    @task(outlets=[games_data])
    def download():
        ...

# Consumer DAG - triggers when asset updates
@dag(schedule=[games_data])
def process_dag():
    @task
    def process():
        ...

Setup/Teardown Tasks

@task
def setup_db_connection():
    return create_connection()

@task
def cleanup_connection(conn):
    conn.close()

@task
def process_data(conn):
    ...

# Define setup/teardown relationship
with dag:
    conn = setup_db_connection()
    process_data(conn) >> cleanup_connection(conn)

    # Or use context manager style
    conn.as_setup() >> process_data(conn) >> conn.as_teardown()

DAG Versioning

from airflow import DAG

with DAG(
    dag_id="betting_workflow",
    version="2.0.0",  # New in 3.x
    schedule="@daily",
) as dag:
    ...

Backfill Changes

# Airflow 3.x - use REST API
curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" \
  -H "Content-Type: application/json" \
  -d '{"logical_date": "2024-01-15T00:00:00Z"}'

# Or use new backfill command
airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-15

New REST API Endpoints

import requests

# Get DAG runs
response = requests.get(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    auth=("admin", "admin")
)

# Trigger DAG
response = requests.post(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    json={"conf": {"sport": "nba"}},
    auth=("admin", "admin")
)

Edge Labels

from airflow.utils.edgemodifier import Label

download >> Label("success") >> process
download >> Label("failure") >> alert

Migration Checklist

  • Update all operator imports to provider packages
  • Replace
    schedule_interval
    with
    schedule
  • Convert SubDags to TaskGroups
  • Replace Dataset with Asset
  • Test DAG parsing with
    python dags/my_dag.py
  • Update docker-compose to Airflow 3.x image

Files to Reference

Airflow 3.x CLI

  • Has changed significantly since Airflow 2.
  • Please look at latest docs before running CLI commands