Skills airflow

install
source · Clone the upstream repo
git clone https://github.com/TerminalSkills/skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/TerminalSkills/skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/airflow" ~/.claude/skills/terminalskills-skills-airflow && rm -rf "$T"
manifest: skills/airflow/SKILL.md
safety · automated scan (low risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
  • references API keys
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content

Airflow

Apache Airflow lets you define workflows as Directed Acyclic Graphs (DAGs) in Python. Each DAG consists of tasks connected by dependencies, scheduled and monitored via a web UI.

Installation

# docker-compose.yml: Airflow with LocalExecutor (simplified)
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-data:/var/lib/postgresql/data

  airflow-webserver:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__WEBSERVER__SECRET_KEY: changeme
    volumes:
      - ./dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    command: bash -c "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"

  airflow-scheduler:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
    command: airflow scheduler

volumes:
  postgres-data:
# Start Airflow
docker compose up -d
# UI at http://localhost:8080 (admin/admin)

Basic DAG

# dags/hello_world.py: Simple DAG with PythonOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='hello_world',
    default_args=default_args,
    description='A simple hello world DAG',
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    def extract(**kwargs):
        import requests
        data = requests.get('https://api.example.com/data').json()
        kwargs['ti'].xcom_push(key='raw_data', value=data)

    def transform(**kwargs):
        data = kwargs['ti'].xcom_pull(key='raw_data', task_ids='extract')
        transformed = [{'id': d['id'], 'value': d['amount'] * 100} for d in data]
        kwargs['ti'].xcom_push(key='transformed', value=transformed)

    extract_task = PythonOperator(task_id='extract', python_callable=extract)
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
    load_task = BashOperator(task_id='load', bash_command='echo "Loading data..."')

    extract_task >> transform_task >> load_task

TaskFlow API

# dags/taskflow_etl.py: Modern TaskFlow API with decorators
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['etl'],
)
def taskflow_etl():

    @task()
    def extract():
        return {'users': 100, 'revenue': 50000}

    @task()
    def transform(data: dict):
        return {
            'users': data['users'],
            'avg_revenue': data['revenue'] / data['users'],
        }

    @task()
    def load(summary: dict):
        print(f"Users: {summary['users']}, Avg Revenue: {summary['avg_revenue']}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

taskflow_etl()

Common Operators

# dags/operators_demo.py: Various operator examples
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor

# SQL execution
create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='my_postgres',
    sql="""
        CREATE TABLE IF NOT EXISTS daily_stats (
            date DATE PRIMARY KEY,
            total_users INT,
            revenue NUMERIC
        );
    """,
)

# HTTP request
fetch_api = SimpleHttpOperator(
    task_id='fetch_api',
    http_conn_id='my_api',
    endpoint='/api/stats',
    method='GET',
    response_filter=lambda r: r.json(),
)

# Wait for file
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/incoming/report.csv',
    poke_interval=60,
    timeout=3600,
)

Connections and Variables

# connections.sh: Set up connections via CLI
airflow connections add 'my_postgres' \
  --conn-type 'postgres' \
  --conn-host 'localhost' \
  --conn-schema 'mydb' \
  --conn-login 'user' \
  --conn-password 'pass' \
  --conn-port 5432

# Set variables
airflow variables set 'api_key' 'abc123'
airflow variables set 'config' '{"batch_size": 1000}' --serialize-json

# Trigger a DAG
airflow dags trigger hello_world --conf '{"date": "2026-02-19"}'