Agents annotating-task-lineage
Annotate Airflow tasks with data lineage using inlets and outlets. Use when the user wants to add lineage metadata to tasks, specify input/output datasets, or enable lineage tracking for operators without built-in OpenLineage extraction.
git clone https://github.com/astronomer/agents
T=$(mktemp -d) && git clone --depth=1 https://github.com/astronomer/agents "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/annotating-task-lineage" ~/.claude/skills/astronomer-agents-annotating-task-lineage && rm -rf "$T"
skills/annotating-task-lineage/SKILL.mdAnnotating Task Lineage with Inlets & Outlets
This skill guides you through adding manual lineage annotations to Airflow tasks using
inlets and outlets.
Reference: See the OpenLineage provider developer guide for the latest supported operators and patterns.
On Astro
Lineage annotations defined with inlets and outlets are visualized in Astro's enhanced Lineage tab, which provides cross-DAG and cross-deployment lineage views. This means your annotations are immediately visible in the Astro UI, giving you a unified view of data flow across your entire Astro organization.
When to Use This Approach
| Scenario | Use Inlets/Outlets? |
|---|---|
Operator has OpenLineage methods () | ❌ Modify the OL method directly |
| Operator has no built-in OpenLineage extractor | ✅ Yes |
| Simple table-level lineage is sufficient | ✅ Yes |
| Quick lineage setup without custom code | ✅ Yes |
| Need column-level lineage | ❌ Use OpenLineage methods or custom extractor |
| Complex extraction logic needed | ❌ Use OpenLineage methods or custom extractor |
Note: Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.
Supported Types for Inlets/Outlets
You can use OpenLineage Dataset objects or Airflow Assets for inlets and outlets:
OpenLineage Datasets (Recommended)
from openlineage.client.event_v2 import Dataset # Database tables source_table = Dataset( namespace="postgres://mydb:5432", name="public.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", ) # Files input_file = Dataset( namespace="s3://my-bucket", name="raw/events/2024-01-01.json", )
Airflow Assets (Airflow 3+)
from airflow.sdk import Asset # Using Airflow's native Asset type orders_asset = Asset(uri="s3://my-bucket/data/orders")
Airflow Datasets (Airflow 2.4+)
from airflow.datasets import Dataset # Using Airflow's Dataset type (Airflow 2.4-2.x) orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
Basic Usage
Setting Inlets and Outlets on Operators
from airflow import DAG from airflow.operators.bash import BashOperator from openlineage.client.event_v2 import Dataset import pendulum # Define your lineage datasets source_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="raw.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", ) output_file = Dataset( namespace="s3://my-bucket", name="exports/orders.parquet", ) with DAG( dag_id="etl_with_lineage", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule="@daily", ) as dag: transform = BashOperator( task_id="transform_orders", bash_command="echo 'transforming...'", inlets=[source_table], # What this task reads outlets=[target_table], # What this task writes ) export = BashOperator( task_id="export_to_s3", bash_command="echo 'exporting...'", inlets=[target_table], # Reads from previous output outlets=[output_file], # Writes to S3 ) transform >> export
Multiple Inputs and Outputs
Tasks often read from multiple sources and write to multiple destinations:
from openlineage.client.event_v2 import Dataset # Multiple source tables customers = Dataset(namespace="postgres://crm:5432", name="public.customers") orders = Dataset(namespace="postgres://sales:5432", name="public.orders") products = Dataset(namespace="postgres://inventory:5432", name="public.products") # Multiple output tables daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary") customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics") aggregate_task = PythonOperator( task_id="build_daily_aggregates", python_callable=build_aggregates, inlets=[customers, orders, products], # All inputs outlets=[daily_summary, customer_metrics], # All outputs )
Setting Lineage in Custom Operators
When building custom operators, you have two options:
Option 1: Implement OpenLineage Methods (Recommended)
This is the preferred approach as it gives you full control over lineage extraction:
from airflow.models import BaseOperator class MyCustomOperator(BaseOperator): def __init__(self, source_table: str, target_table: str, **kwargs): super().__init__(**kwargs) self.source_table = source_table self.target_table = target_table def execute(self, context): # ... perform the actual work ... self.log.info(f"Processing {self.source_table} -> {self.target_table}") def get_openlineage_facets_on_complete(self, task_instance): """Return lineage after successful execution.""" from openlineage.client.event_v2 import Dataset from airflow.providers.openlineage.extractors import OperatorLineage return OperatorLineage( inputs=[Dataset(namespace="warehouse://db", name=self.source_table)], outputs=[Dataset(namespace="warehouse://db", name=self.target_table)], )
Option 2: Set Inlets/Outlets Dynamically
For simpler cases, set lineage within the
execute method (non-deferrable operators only):
from airflow.models import BaseOperator from openlineage.client.event_v2 import Dataset class MyCustomOperator(BaseOperator): def __init__(self, source_table: str, target_table: str, **kwargs): super().__init__(**kwargs) self.source_table = source_table self.target_table = target_table def execute(self, context): # Set lineage dynamically based on operator parameters self.inlets = [ Dataset(namespace="warehouse://db", name=self.source_table) ] self.outlets = [ Dataset(namespace="warehouse://db", name=self.target_table) ] # ... perform the actual work ... self.log.info(f"Processing {self.source_table} -> {self.target_table}")
Dataset Naming Helpers
Use the OpenLineage dataset naming helpers to ensure consistent naming across platforms:
from openlineage.client.event_v2 import Dataset # Snowflake from openlineage.client.naming.snowflake import SnowflakeDatasetNaming naming = SnowflakeDatasetNaming( account_identifier="myorg-myaccount", database="mydb", schema="myschema", table="mytable", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name()) # -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable" # BigQuery from openlineage.client.naming.bigquery import BigQueryDatasetNaming naming = BigQueryDatasetNaming( project="my-project", dataset="my_dataset", table="my_table", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name()) # -> namespace: "bigquery", name: "my-project.my_dataset.my_table" # S3 from openlineage.client.naming.s3 import S3DatasetNaming naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet") dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name()) # -> namespace: "s3://my-bucket", name: "path/to/file.parquet" # PostgreSQL from openlineage.client.naming.postgres import PostgresDatasetNaming naming = PostgresDatasetNaming( host="localhost", port=5432, database="mydb", schema="public", table="users", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name()) # -> namespace: "postgres://localhost:5432", name: "mydb.public.users"
Note: Always use the naming helpers instead of constructing namespaces manually. If a helper is missing for your platform, check the OpenLineage repo or request it.
Precedence Rules
OpenLineage uses this precedence for lineage extraction:
- Custom Extractors (highest) - User-registered extractors
- OpenLineage Methods -
in operatorget_openlineage_facets_on_* - Hook-Level Lineage - Lineage collected from hooks via
HookLineageCollector - Inlets/Outlets (lowest) - Falls back to these if nothing else extracts lineage
Note: If an extractor or method exists but returns no datasets, OpenLineage will check hook-level lineage, then fall back to inlets/outlets.
Best Practices
Use the Naming Helpers
Always use OpenLineage naming helpers for consistent dataset creation:
from openlineage.client.event_v2 import Dataset from openlineage.client.naming.snowflake import SnowflakeDatasetNaming def snowflake_dataset(schema: str, table: str) -> Dataset: """Create a Snowflake Dataset using the naming helper.""" naming = SnowflakeDatasetNaming( account_identifier="mycompany", database="analytics", schema=schema, table=table, ) return Dataset(namespace=naming.get_namespace(), name=naming.get_name()) # Usage source = snowflake_dataset("raw", "orders") target = snowflake_dataset("staging", "orders_clean")
Document Your Lineage
Add comments explaining the data flow:
transform = SqlOperator( task_id="transform_orders", sql="...", # Lineage: Reads raw orders, joins with customers, writes to staging inlets=[ snowflake_dataset("raw", "orders"), snowflake_dataset("raw", "customers"), ], outlets=[ snowflake_dataset("staging", "order_details"), ], )
Keep Lineage Accurate
- Update inlets/outlets when SQL queries change
- Include all tables referenced in JOINs as inlets
- Include all tables written to (including temp tables if relevant)
- Outlet-only and inlet-only annotations are valid. One-sided annotations are encouraged for lineage visibility even without a corresponding inlet or outlet in another DAG.
Limitations
| Limitation | Workaround |
|---|---|
| Table-level only (no column lineage) | Use OpenLineage methods or custom extractor |
| Overridden by extractors/methods | Only use for operators without extractors |
| Static at DAG parse time | Set dynamically in or use OL methods |
| Deferrable operators lose dynamic lineage | Use OL methods instead; attributes set in are lost when deferring |
Related Skills
- creating-openlineage-extractors: For column-level lineage or complex extraction
- tracing-upstream-lineage: Investigate where data comes from
- tracing-downstream-lineage: Investigate what depends on data