Agents openmetadata-dev
Use OpenMetadata SDK and APIs to build integrations, connectors, and automations. Use when querying metadata, creating custom properties, building ingestion pipelines, automating governance workflows, or integrating OpenMetadata with other systems.
git clone https://github.com/aRustyDev/agents
T=$(mktemp -d) && git clone --depth=1 https://github.com/aRustyDev/agents "$T" && mkdir -p ~/.claude/skills && cp -r "$T/content/skills/openmetadata-dev" ~/.claude/skills/arustydev-agents-openmetadata-dev && rm -rf "$T"
content/skills/openmetadata-dev/SKILL.mdOpenMetadata Development
Guide for using OpenMetadata Python/Java SDKs and REST APIs to build integrations, connectors, and automations.
When to Use This Skill
- Querying and updating metadata via SDK/API
- Building custom ingestion connectors
- Creating and managing custom properties
- Automating governance workflows
- Integrating OpenMetadata with external systems
- Managing lineage programmatically
This Skill Does NOT Cover
- Implementing new language SDKs (see
)openmetadata-sdk-dev - Administering OpenMetadata (bots, users, security) (see
)openmetadata-ops - Deploying or operating OpenMetadata infrastructure
SDK Setup
Python SDK Installation
pip install openmetadata-ingestion
Initialize Client
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, AuthProvider, ) from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( OpenMetadataJWTClientConfig, ) # Configure connection server_config = OpenMetadataConnection( hostPort="http://localhost:8585/api", authProvider=AuthProvider.openmetadata, securityConfig=OpenMetadataJWTClientConfig( jwtToken="<your-jwt-token>", ), ) # Create client metadata = OpenMetadata(server_config) # Verify connection assert metadata.health_check()
Java SDK Setup
<dependency> <groupId>org.open-metadata</groupId> <artifactId>openmetadata-java-client</artifactId> <version>1.3.0</version> </dependency>
OpenMetadataConnection server = new OpenMetadataConnection(); server.setHostPort("http://localhost:8585/api"); server.setAuthProvider(AuthProvider.OPENMETADATA); server.setSecurityConfig(new OpenMetadataJWTClientConfig().withJwtToken("<token>")); OpenMetadata client = new OpenMetadata(server);
Querying Metadata
Get Entity by Name
from metadata.generated.schema.entity.data.table import Table # Get table by fully qualified name table = metadata.get_by_name( entity=Table, fqn="prod.sales.public.orders", fields=["columns", "owner", "tags"], # Optional: include related data ) if table: print(f"Table: {table.name}") print(f"Columns: {[col.name for col in table.columns]}") print(f"Owner: {table.owner.name if table.owner else 'None'}")
Get Entity by ID
from uuid import UUID table = metadata.get_by_id( entity=Table, entity_id=UUID("12345678-1234-1234-1234-123456789abc"), fields=["columns"], )
List Entities with Pagination
from metadata.generated.schema.entity.data.table import Table # List tables with pagination tables = metadata.list_entities( entity=Table, limit=100, fields=["owner", "database"], ) for table in tables.entities: print(f"{table.fullyQualifiedName}") # Get next page if tables.paging.after: next_page = metadata.list_entities( entity=Table, limit=100, after=tables.paging.after, )
Search Entities
# Search using Elasticsearch query results = metadata.es_search_from_fqn( entity_type=Table, fqn_search_string="*orders*", size=50, ) for hit in results: print(hit["_source"]["fullyQualifiedName"])
Creating and Updating Entities
Create Table
from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.entity.data.table import Column, DataType create_request = CreateTableRequest( name="new_orders", databaseSchema="prod.sales.public", columns=[ Column(name="id", dataType=DataType.BIGINT), Column(name="customer_id", dataType=DataType.BIGINT), Column(name="total", dataType=DataType.DECIMAL), Column(name="created_at", dataType=DataType.TIMESTAMP), ], description="Order transactions", ) table = metadata.create_or_update(create_request) print(f"Created table: {table.fullyQualifiedName}")
Update Entity Description
from metadata.generated.schema.type.tagLabel import TagLabel # Update table description metadata.patch_description( entity=Table, source=table, description="Updated description for orders table", )
Add Tags to Entity
from metadata.generated.schema.type.tagLabel import ( TagLabel, TagSource, LabelType, State, ) tag = TagLabel( tagFQN="PII.Sensitive", source=TagSource.Classification, labelType=LabelType.Manual, state=State.Confirmed, ) metadata.patch_tag( entity=Table, source=table, tag_label=tag, )
Set Owner
from metadata.generated.schema.type.entityReference import EntityReference # Get user reference user = metadata.get_by_name(entity=User, fqn="john.doe") # Set owner metadata.patch_owner( entity=Table, source=table, owner=EntityReference(id=user.id, type="user"), )
Custom Properties
Create Custom Property Type
from metadata.generated.schema.api.data.createCustomProperty import ( CreateCustomPropertyRequest, ) from metadata.generated.schema.type.customProperty import PropertyType # Create custom property on Table entity metadata.create_or_update_custom_property( ometa_custom_property=CreateCustomPropertyRequest( name="costCenter", description="Cost center for billing", propertyType=PropertyType( id=metadata.get_property_type("string").id, type="type", ), ), entity_type=Table, )
Set Custom Property Value
# Set custom property value using extension table = metadata.get_by_name(entity=Table, fqn="prod.sales.orders") # Patch extension with custom property metadata.patch( entity=Table, source=table, destination=table.copy( update={"extension": {"costCenter": "SALES-001"}} ), )
Read Custom Property Value
table = metadata.get_by_name(entity=Table, fqn="prod.sales.orders") if table.extension: cost_center = table.extension.get("costCenter") print(f"Cost Center: {cost_center}")
Supported Property Types
| Type | Description | Example Value |
|---|---|---|
| Text value | |
| Whole number | |
| Decimal number | |
| Rich text | |
| Predefined values | |
| Date only | |
| Date and time | |
| Time only | |
| Time duration | |
| Link to entity | |
| Multiple links | |
Lineage Management
Add Lineage Edge
from metadata.generated.schema.api.lineage.addLineage import AddLineage from metadata.generated.schema.type.entityLineage import EntitiesEdge # Get source and target tables source = metadata.get_by_name(entity=Table, fqn="raw.events") target = metadata.get_by_name(entity=Table, fqn="analytics.user_events") # Add lineage metadata.add_lineage( AddLineage( edge=EntitiesEdge( fromEntity=EntityReference(id=source.id, type="table"), toEntity=EntityReference(id=target.id, type="table"), ), ) )
Add Column-Level Lineage
from metadata.generated.schema.type.entityLineage import ColumnLineage metadata.add_lineage( AddLineage( edge=EntitiesEdge( fromEntity=EntityReference(id=source.id, type="table"), toEntity=EntityReference(id=target.id, type="table"), lineageDetails=LineageDetails( columnsLineage=[ ColumnLineage( fromColumns=["raw.events.user_id"], toColumn="analytics.user_events.user_id", ), ColumnLineage( fromColumns=["raw.events.event_type"], toColumn="analytics.user_events.event_type", ), ], ), ), ) )
Query Lineage
lineage = metadata.get_lineage_by_name( entity=Table, fqn="analytics.user_events", up_depth=3, # Upstream hops down_depth=3, # Downstream hops ) print("Upstream tables:") for node in lineage.upstreamEdges: print(f" - {node.fromEntity.name}") print("Downstream tables:") for node in lineage.downstreamEdges: print(f" - {node.toEntity.name}")
Building Custom Connectors
Connector Architecture
┌─────────────────────────────────────────────────────────────┐ │ Workflow │ ├─────────────────────────────────────────────────────────────┤ │ Source → Processor → Processor → Sink │ │ ↓ ↓ ↓ ↓ │ │ Extract Transform Enrich Load to │ │ Records Data Data OpenMetadata │ └─────────────────────────────────────────────────────────────┘
Source Implementation
from abc import ABC, abstractmethod from typing import Iterable, Optional from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata class MyCustomSource(Source): """Custom source for extracting metadata.""" def __init__(self): super().__init__() self.config = None self.metadata = None @classmethod def create( cls, config_dict: dict, metadata: OpenMetadata, ) -> "MyCustomSource": instance = cls() instance.config = MySourceConfig.parse_obj(config_dict) instance.metadata = metadata return instance def prepare(self): """Initialize connections before extraction.""" self.client = MyApiClient(self.config.api_url) def _iter(self) -> Iterable[Either]: """Yield records to downstream steps.""" for item in self.client.list_items(): yield Either(right=self._convert_to_entity(item)) def _convert_to_entity(self, item) -> CreateTableRequest: """Convert API response to OpenMetadata entity.""" return CreateTableRequest( name=item["name"], databaseSchema=self.config.database_schema, columns=[ Column(name=col["name"], dataType=self._map_type(col["type"])) for col in item["columns"] ], ) def close(self): """Cleanup resources.""" if self.client: self.client.close() def test_connection(self) -> None: """Verify connectivity to source system.""" self.client.health_check()
Processor Implementation
from metadata.ingestion.api.steps import Processor class EnrichmentProcessor(Processor): """Add additional metadata to records.""" @classmethod def create(cls, config_dict: dict, metadata: OpenMetadata): instance = cls() instance.config = EnrichmentConfig.parse_obj(config_dict) instance.metadata = metadata return instance def _run(self, record: CreateTableRequest) -> Either: """Process each record.""" # Add custom enrichment record.description = self._generate_description(record) record.tags = self._auto_classify(record) return Either(right=record) def close(self): pass
Sink Implementation
from metadata.ingestion.api.steps import Sink class OpenMetadataSink(Sink): """Write records to OpenMetadata.""" @classmethod def create(cls, config_dict: dict, metadata: OpenMetadata): instance = cls() instance.metadata = metadata return instance def _run(self, record: CreateTableRequest) -> Either: """Write record to OpenMetadata.""" try: entity = self.metadata.create_or_update(record) return Either(right=entity) except Exception as e: return Either(left=StackTraceError(str(e))) def close(self): pass
Workflow Configuration
# connector.yaml source: type: MyCustomSource serviceName: my-source serviceConnection: config: api_url: https://api.example.com database_schema: prod.my_db.public processor: type: EnrichmentProcessor config: auto_classify: true sink: type: metadata-rest config: {} workflowConfig: openMetadataServerConfig: hostPort: http://localhost:8585/api authProvider: openmetadata securityConfig: jwtToken: ${OM_JWT_TOKEN}
Run Workflow
from metadata.workflow.metadata import MetadataWorkflow config = yaml.safe_load(open("connector.yaml")) workflow = MetadataWorkflow.create(config) workflow.execute() workflow.print_status() workflow.stop()
Automation Patterns
Bulk Tagging
def bulk_tag_tables(metadata: OpenMetadata, pattern: str, tag_fqn: str): """Apply tag to all tables matching pattern.""" tables = metadata.es_search_from_fqn( entity_type=Table, fqn_search_string=pattern, ) tag = TagLabel( tagFQN=tag_fqn, source=TagSource.Classification, labelType=LabelType.Automated, state=State.Confirmed, ) for hit in tables: table = metadata.get_by_id( entity=Table, entity_id=UUID(hit["_source"]["id"]), ) metadata.patch_tag(entity=Table, source=table, tag_label=tag) print(f"Tagged: {table.fullyQualifiedName}") # Tag all PII tables bulk_tag_tables(metadata, "*customer*", "PII.Sensitive")
Auto-Assign Owners
def auto_assign_owners(metadata: OpenMetadata, rules: dict): """Assign owners based on schema/database patterns.""" for pattern, owner_fqn in rules.items(): owner = metadata.get_by_name(entity=User, fqn=owner_fqn) owner_ref = EntityReference(id=owner.id, type="user") tables = metadata.es_search_from_fqn( entity_type=Table, fqn_search_string=pattern, ) for hit in tables: table = metadata.get_by_id( entity=Table, entity_id=UUID(hit["_source"]["id"]), ) if table.owner is None: metadata.patch_owner(entity=Table, source=table, owner=owner_ref) print(f"Assigned {owner_fqn} to {table.fullyQualifiedName}") # Define ownership rules rules = { "*.sales.*": "sales-team-lead", "*.analytics.*": "analytics-team-lead", "*.finance.*": "finance-team-lead", } auto_assign_owners(metadata, rules)
Data Quality Automation
from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testDefinition import TestDefinition def add_null_check_tests(metadata: OpenMetadata, table_fqn: str): """Add null check tests to all required columns.""" table = metadata.get_by_name( entity=Table, fqn=table_fqn, fields=["columns"], ) null_test = metadata.get_by_name( entity=TestDefinition, fqn="columnValuesToBeNotNull", ) for column in table.columns: if column.constraint == "NOT NULL": test_case = TestCase( name=f"{column.name}_not_null", testDefinition=EntityReference(id=null_test.id, type="testDefinition"), entityLink=f"<#E::table::{table_fqn}::columns::{column.name}>", parameterValues=[], ) metadata.create_or_update(test_case) print(f"Added null check for {column.name}")
Lineage Propagation
def propagate_tags_downstream( metadata: OpenMetadata, source_fqn: str, tag_fqn: str, max_depth: int = 3, ): """Propagate tags through lineage.""" source = metadata.get_by_name(entity=Table, fqn=source_fqn) lineage = metadata.get_lineage_by_name( entity=Table, fqn=source_fqn, down_depth=max_depth, ) tag = TagLabel( tagFQN=tag_fqn, source=TagSource.Classification, labelType=LabelType.Propagated, state=State.Confirmed, ) for edge in lineage.downstreamEdges: downstream = metadata.get_by_id( entity=Table, entity_id=edge.toEntity.id, ) metadata.patch_tag(entity=Table, source=downstream, tag_label=tag) print(f"Propagated {tag_fqn} to {downstream.fullyQualifiedName}") # Propagate PII tag through lineage propagate_tags_downstream(metadata, "raw.customers", "PII.Sensitive")
REST API Direct Usage
Authentication
# Get JWT token curl -X POST "http://localhost:8585/api/v1/users/login" \ -H "Content-Type: application/json" \ -d '{"email": "admin@openmetadata.org", "password": "admin"}'
Common Endpoints
| Operation | Method | Endpoint |
|---|---|---|
| List tables | GET | |
| Get table | GET | |
| Get by name | GET | |
| Create/Update | PUT | |
| Patch | PATCH | |
| Delete | DELETE | |
| Search | GET | |
| Lineage | GET | |
Example: Create Table via REST
curl -X PUT "http://localhost:8585/api/v1/tables" \ -H "Authorization: Bearer ${JWT_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "name": "new_table", "databaseSchema": "prod.db.schema", "columns": [ {"name": "id", "dataType": "BIGINT"}, {"name": "name", "dataType": "VARCHAR"} ] }'
Error Handling
Common Exceptions
from metadata.ingestion.ometa.client import APIError try: table = metadata.get_by_name(entity=Table, fqn="nonexistent.table") except APIError as e: if e.status_code == 404: print("Table not found") elif e.status_code == 403: print("Permission denied") else: raise
Retry Pattern
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), ) def resilient_create(metadata: OpenMetadata, entity): return metadata.create_or_update(entity)
Best Practices
Connection Management
# Use context manager pattern class OpenMetadataSession: def __init__(self, config: OpenMetadataConnection): self.config = config self.client = None def __enter__(self) -> OpenMetadata: self.client = OpenMetadata(self.config) self.client.health_check() return self.client def __exit__(self, exc_type, exc_val, exc_tb): # Client cleanup if needed pass # Usage with OpenMetadataSession(config) as metadata: table = metadata.get_by_name(entity=Table, fqn="prod.sales.orders")
Batch Operations
def batch_update(metadata: OpenMetadata, entities: list, batch_size: int = 50): """Update entities in batches to avoid rate limits.""" for i in range(0, len(entities), batch_size): batch = entities[i:i + batch_size] for entity in batch: metadata.create_or_update(entity) time.sleep(0.5) # Rate limit protection
Idempotent Operations
def ensure_table_exists(metadata: OpenMetadata, create_request: CreateTableRequest): """Create table if not exists, otherwise return existing.""" existing = metadata.get_by_name( entity=Table, fqn=f"{create_request.databaseSchema}.{create_request.name}", ) if existing: return existing return metadata.create_or_update(create_request)
MCP Server Integration
OpenMetadata provides a Model Context Protocol (MCP) server that enables AI assistants (like Claude and ChatGPT) to interact with your metadata catalog using natural language.
What is MCP?
MCP (Model Context Protocol) is an open standard that allows AI systems to interact with external tools and data sources in a uniform, secure way. OpenMetadata's MCP server exposes its metadata knowledge graph to AI tools.
Use Cases
- Natural language queries about data assets
- "What tables contain customer data?"
- "Who owns the orders table?"
- "Show me the lineage for the sales dashboard"
- AI-powered data discovery
- Conversational data governance
Setting Up MCP
1. Install MCP Application
- Navigate to Settings → Applications → Marketplace
- Find the MCP application
- Click Install
- Configure the application settings
2. Generate Personal Access Token
- Go to Profile → Access Token
- Click Generate New Token
- Set appropriate expiration
- Copy token (shown only once)
3. Configure MCP Client
For Claude Desktop:
Add to your
claude_desktop_config.json:
{ "mcpServers": { "openmetadata": { "url": "http://localhost:8585/api/v1/mcp", "headers": { "Authorization": "Bearer <your-token>" } } } }
For API Integration:
import requests MCP_ENDPOINT = "http://localhost:8585/api/v1/mcp" TOKEN = "<your-token>" def query_mcp(prompt: str) -> dict: """Send natural language query to OpenMetadata MCP.""" response = requests.post( f"{MCP_ENDPOINT}/query", headers={ "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json", }, json={"prompt": prompt}, ) return response.json() # Example queries result = query_mcp("What tables are in the sales database?") result = query_mcp("Show me the owner of the customers table") result = query_mcp("What is the lineage for the revenue dashboard?")
Available MCP Tools
The MCP server exposes tools for:
| Tool | Description |
|---|---|
| search_assets | Search for data assets by keyword |
| get_asset_details | Get detailed metadata for an asset |
| get_lineage | Retrieve lineage for an entity |
| get_owner | Find asset ownership |
| list_tables | List tables in a database |
| get_schema | Get table schema details |
Security Considerations
- Use dedicated tokens - Don't share personal tokens
- Set appropriate permissions - MCP uses token's permissions
- Rotate tokens regularly - Follow security policies
- Audit usage - Monitor MCP queries in logs
References
- OpenMetadata Python SDK
- OpenMetadata Java SDK
- OpenMetadata API Reference
- Building Connectors
- Data Governance Automation
- MCP Server Guide
- Implementing SDKs for new languagesopenmetadata-sdk-dev
- Administering OpenMetadataopenmetadata-ops
- UI navigation and discoveryopenmetadata-user
- Data quality and observabilityopenmetadata-dq