Skillshub airflow

Airflow

install
source · Clone the upstream repo
git clone https://github.com/ComeOnOliver/skillshub
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/ComeOnOliver/skillshub "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/TerminalSkills/skills/airflow" ~/.claude/skills/comeonoliver-skillshub-airflow && rm -rf "$T"
manifest: skills/TerminalSkills/skills/airflow/SKILL.md
source content

Airflow

Apache Airflow lets you define workflows as Directed Acyclic Graphs (DAGs) in Python. Each DAG consists of tasks connected by dependencies, scheduled and monitored via a web UI.

Installation

# docker-compose.yml: Airflow with LocalExecutor (simplified)
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-data:/var/lib/postgresql/data

  airflow-webserver:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__WEBSERVER__SECRET_KEY: changeme
    volumes:
      - ./dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    command: bash -c "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"

  airflow-scheduler:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
    command: airflow scheduler

volumes:
  postgres-data:
# Start Airflow
docker compose up -d
# UI at http://localhost:8080 (admin/admin)

Basic DAG

# dags/hello_world.py: Simple DAG with PythonOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='hello_world',
    default_args=default_args,
    description='A simple hello world DAG',
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    def extract(**kwargs):
        import requests
        data = requests.get('https://api.example.com/data').json()
        kwargs['ti'].xcom_push(key='raw_data', value=data)

    def transform(**kwargs):
        data = kwargs['ti'].xcom_pull(key='raw_data', task_ids='extract')
        transformed = [{'id': d['id'], 'value': d['amount'] * 100} for d in data]
        kwargs['ti'].xcom_push(key='transformed', value=transformed)

    extract_task = PythonOperator(task_id='extract', python_callable=extract)
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
    load_task = BashOperator(task_id='load', bash_command='echo "Loading data..."')

    extract_task >> transform_task >> load_task

TaskFlow API

# dags/taskflow_etl.py: Modern TaskFlow API with decorators
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['etl'],
)
def taskflow_etl():

    @task()
    def extract():
        return {'users': 100, 'revenue': 50000}

    @task()
    def transform(data: dict):
        return {
            'users': data['users'],
            'avg_revenue': data['revenue'] / data['users'],
        }

    @task()
    def load(summary: dict):
        print(f"Users: {summary['users']}, Avg Revenue: {summary['avg_revenue']}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

taskflow_etl()

Common Operators

# dags/operators_demo.py: Various operator examples
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor

# SQL execution
create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='my_postgres',
    sql="""
        CREATE TABLE IF NOT EXISTS daily_stats (
            date DATE PRIMARY KEY,
            total_users INT,
            revenue NUMERIC
        );
    """,
)

# HTTP request
fetch_api = SimpleHttpOperator(
    task_id='fetch_api',
    http_conn_id='my_api',
    endpoint='/api/stats',
    method='GET',
    response_filter=lambda r: r.json(),
)

# Wait for file
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/incoming/report.csv',
    poke_interval=60,
    timeout=3600,
)

Connections and Variables

# connections.sh: Set up connections via CLI
airflow connections add 'my_postgres' \
  --conn-type 'postgres' \
  --conn-host 'localhost' \
  --conn-schema 'mydb' \
  --conn-login 'user' \
  --conn-password 'pass' \
  --conn-port 5432

# Set variables
airflow variables set 'api_key' 'abc123'
airflow variables set 'config' '{"batch_size": 1000}' --serialize-json

# Trigger a DAG
airflow dags trigger hello_world --conf '{"date": "2026-02-19"}'