Agents openmetadata-sdk-dev
Develop and contribute to OpenMetadata SDKs, connectors, and core platform. Use when implementing new language SDKs, building connectors for contribution, extending SDK capabilities, or setting up the OpenMetadata development environment.
git clone https://github.com/aRustyDev/agents
T=$(mktemp -d) && git clone --depth=1 https://github.com/aRustyDev/agents "$T" && mkdir -p ~/.claude/skills && cp -r "$T/content/skills/openmetadata-sdk-dev" ~/.claude/skills/arustydev-agents-openmetadata-sdk-dev && rm -rf "$T"
content/skills/openmetadata-sdk-dev/SKILL.mdOpenMetadata SDK & Connector Development
Guide for developing OpenMetadata SDKs, connectors, and contributions to the core platform. All SDK and connector development is intended to be contributed back to the community.
Note: This skill extends patterns from
. See that skill for foundational SDK patterns (architecture, error handling, configuration, testing strategies, packaging).meta-sdk-patterns-eng
When to Use This Skill
- Implementing OpenMetadata SDK for a new language
- Extending existing Python or Java SDK with new features
- Contributing new connectors to OpenMetadata
- Adding new entity type support
- Implementing authentication providers
- Setting up OpenMetadata development environment
- Generating entity models from JSON Schemas
This Skill Does NOT Cover
- Using the existing Python/Java SDK to interact with OpenMetadata (see
)openmetadata-dev - Deploying or operating OpenMetadata
- Administering users, bots, and policies (see
)openmetadata-ops
OpenMetadata SDK Architecture
Core Components
Every OpenMetadata SDK implements these components:
┌─────────────────────────────────────────────────────────────┐ │ OpenMetadata Client │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ Connection │ │ Auth │ │ API Clients │ │ │ │ Config │ │ Provider │ │ (Tables, Dashes..) │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────────────────────────────────────────────────┐│ │ │ Entity Models (Generated) ││ │ │ Table, Database, Dashboard, Pipeline, MlModel, etc. ││ │ └─────────────────────────────────────────────────────────┘│ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────────────────────────────────────────────────┐│ │ │ HTTP Client / Transport Layer ││ │ └─────────────────────────────────────────────────────────┘│ └─────────────────────────────────────────────────────────────┘
Pattern: Gateway with Typed API Clients
OpenMetadata SDKs use a gateway pattern where the main client builds typed API clients:
# Python Pattern class OpenMetadata: def __init__(self, config: OpenMetadataConnection): self._config = config self._client = self._build_client() def get_by_name(self, entity: Type[T], fqn: str) -> Optional[T]: """Generic method using TypeVar for type safety.""" ... def create_or_update(self, data: CreateEntity) -> Entity: """Handles both create and update operations.""" ...
// Java Pattern public class OpenMetadata { private final OpenMetadataConnection config; public <T> T buildClient(Class<T> apiClass) { // Build typed API client return clientBuilder.build(apiClass); } } // Usage TablesApi tablesApi = openMetadata.buildClient(TablesApi.class); DashboardsApi dashboardApi = openMetadata.buildClient(DashboardsApi.class);
Connection Configuration
Configuration Object
# Python from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, AuthProvider, ) server_config = OpenMetadataConnection( hostPort="http://localhost:8585/api", authProvider=AuthProvider.openmetadata, securityConfig=OpenMetadataJWTClientConfig(jwtToken="<token>"), verifySSL="validate", # or "ignore", "no-ssl" sslConfig=ValidateSslClientConfig(caCertificate="/path/to/cert"), )
// Java OpenMetadataConnection server = new OpenMetadataConnection(); server.setHostPort("http://localhost:8585/api"); server.setApiVersion("v1"); server.setAuthProvider(OpenMetadataConnection.AuthProvider.OPENMETADATA); server.setSecurityConfig(jwtClientConfig);
Configuration Fields
| Field | Required | Description |
|---|---|---|
| Yes | Base URL including |
| Yes | Authentication provider type |
| Yes | Provider-specific auth config |
| No | API version (default: ) |
| No | SSL verification mode |
| No | Custom SSL certificates |
Authentication Providers
Provider Architecture
Implement pluggable authentication with a provider interface:
# Python class AuthenticationProvider(ABC): @abstractmethod def get_access_token(self) -> str: """Return valid access token.""" pass class OpenMetadataJWTProvider(AuthenticationProvider): def __init__(self, config: OpenMetadataJWTClientConfig): self._token = config.jwtToken def get_access_token(self) -> str: return self._token class OktaProvider(AuthenticationProvider): def __init__(self, config: OktaClientConfig): self._client_id = config.clientId self._org_url = config.orgURL self._scopes = config.scopes def get_access_token(self) -> str: # OAuth2 token exchange ...
// Java public interface AuthenticationProvider { String getAccessToken(); } public class NoOpAuthenticationProvider implements AuthenticationProvider { @Override public String getAccessToken() { return ""; } } public class GoogleAuthenticationProvider implements AuthenticationProvider { private final GoogleSSOClientConfig config; @Override public String getAccessToken() { // OAuth2 flow with Google ... } }
Supported Providers
| Provider | Config Class | Auth Flow |
|---|---|---|
| | Static JWT token |
| | OAuth2 OIDC |
| | OAuth2 OIDC |
| | OAuth2 OIDC |
| | OAuth2 OIDC |
| | OAuth2 OIDC |
| None | No authentication |
Implementing New Provider
- Define configuration schema (JSON Schema)
- Generate config class from schema
- Implement
interfaceAuthenticationProvider - Register in provider factory
- Add to
enumAuthProvider
Bot Token Internals
OpenMetadata Bots are service accounts that provide JWT tokens for SDK authentication. When implementing SDK auth:
Bot Token Structure
Bot tokens are JWTs with specific claims:
{ "sub": "ingestion-bot", "iss": "open-metadata.org", "iat": 1234567890, "exp": 1234567890, "email": "ingestion-bot@openmetadata.org", "isBot": true }
SDK Token Validation
When implementing auth provider, validate bot tokens:
# Python import jwt from typing import Optional class BotTokenValidator: def __init__(self, public_key: str, issuer: str = "open-metadata.org"): self._public_key = public_key self._issuer = issuer def validate(self, token: str) -> Optional[dict]: try: payload = jwt.decode( token, self._public_key, algorithms=["RS256"], issuer=self._issuer, ) if not payload.get("isBot", False): raise ValueError("Token is not a bot token") return payload except jwt.ExpiredSignatureError: raise AuthenticationError("Bot token expired") except jwt.InvalidTokenError as e: raise AuthenticationError(f"Invalid bot token: {e}")
// Rust use jsonwebtoken::{decode, DecodingKey, Validation, Algorithm}; #[derive(Debug, Deserialize)] struct BotClaims { sub: String, iss: String, exp: u64, is_bot: bool, } impl BotTokenValidator { pub fn validate(&self, token: &str) -> Result<BotClaims, AuthError> { let mut validation = Validation::new(Algorithm::RS256); validation.set_issuer(&["open-metadata.org"]); let token_data = decode::<BotClaims>( token, &DecodingKey::from_rsa_pem(self.public_key.as_bytes())?, &validation, )?; if !token_data.claims.is_bot { return Err(AuthError::NotBotToken); } Ok(token_data.claims) } }
Token Refresh Handling
Bot tokens have expiration. SDKs should handle refresh:
class BotAuthProvider(AuthenticationProvider): def __init__(self, config: BotConfig): self._config = config self._cached_token: Optional[str] = None self._expires_at: Optional[datetime] = None def get_access_token(self) -> str: if self._is_token_valid(): return self._cached_token # Refresh token from OpenMetadata API self._cached_token = self._refresh_token() self._expires_at = self._parse_expiry(self._cached_token) return self._cached_token def _is_token_valid(self) -> bool: if not self._cached_token or not self._expires_at: return False # Refresh 5 minutes before expiry return datetime.utcnow() < (self._expires_at - timedelta(minutes=5))
Entity Models
Schema-Driven Generation
OpenMetadata entities are defined as JSON Schemas and models are generated:
json-schemas/ ├── entity/ │ ├── data/ │ │ ├── table.json │ │ ├── database.json │ │ └── dashboard.json │ ├── services/ │ │ └── databaseService.json │ └── teams/ │ └── user.json └── api/ ├── data/ │ ├── createTable.json │ └── createDatabase.json └── services/ └── createDatabaseService.json
Entity vs API Models
OpenMetadata separates entity definitions from API request models:
| Type | Purpose | Example |
|---|---|---|
| Entity | Response/read models | , , |
| Create | POST request body | , |
| Update | PATCH request body | Partial entity fields |
# Entity model (response) class Table(BaseModel): id: UUID name: str fullyQualifiedName: str columns: List[Column] database: EntityReference ... # API model (request) class CreateTable(BaseModel): name: str columns: List[Column] databaseSchema: FullyQualifiedEntityName ...
Entity Hierarchy
DatabaseService └── Database └── DatabaseSchema └── Table └── Column DashboardService └── Dashboard └── Chart PipelineService └── Pipeline └── Task MessagingService └── Topic
Entity References
Link entities using references:
# By fully qualified name table = CreateTable( name="orders", databaseSchema="prod.sales.public", # FQN string columns=[...], ) # By EntityReference table.owner = EntityReference( id=user_uuid, type="user", )
Custom Property Model Handling
OpenMetadata supports user-defined custom properties on entities. SDKs must handle these dynamic fields.
Schema Definition
Custom properties are defined per entity type:
{ "name": "customField", "propertyType": { "id": "uuid", "type": "type", "name": "string" }, "description": "Custom field description" }
SDK Model Strategy
Option 1: Extension Dictionary (Recommended)
Keep generated models clean, store custom properties separately:
# Python class Table(BaseModel): id: UUID name: str columns: List[Column] # ... standard fields extension: Optional[Dict[str, Any]] = None # Custom properties def get_custom_property(self, name: str) -> Any: if self.extension is None: return None return self.extension.get(name) def set_custom_property(self, name: str, value: Any) -> None: if self.extension is None: self.extension = {} self.extension[name] = value
// TypeScript interface Table { id: string; name: string; columns: Column[]; // ... standard fields extension?: Record<string, unknown>; // Custom properties } function getCustomProperty<T>(entity: Table, name: string): T | undefined { return entity.extension?.[name] as T | undefined; }
// Rust #[derive(Debug, Serialize, Deserialize)] pub struct Table { pub id: Uuid, pub name: String, pub columns: Vec<Column>, // ... standard fields #[serde(default, skip_serializing_if = "Option::is_none")] pub extension: Option<HashMap<String, serde_json::Value>>, } impl Table { pub fn get_custom_property<T: DeserializeOwned>(&self, name: &str) -> Option<T> { self.extension .as_ref()? .get(name) .and_then(|v| serde_json::from_value(v.clone()).ok()) } }
Option 2: Dynamic Model Generation
Generate models at runtime based on custom property definitions:
# Python - dynamic model creation from pydantic import create_model def build_table_model(custom_properties: List[CustomProperty]) -> Type[BaseModel]: """Build Table model with custom properties as typed fields.""" extra_fields = {} for prop in custom_properties: field_type = PROPERTY_TYPE_MAP.get(prop.propertyType.name, Any) extra_fields[prop.name] = (Optional[field_type], None) return create_model( 'TableWithCustomProperties', __base__=Table, **extra_fields, )
Type Mapping for Custom Properties
| OpenMetadata Type | Python | TypeScript | Rust | Go |
|---|---|---|---|---|
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
Serialization Considerations
Custom properties use the
extension field in API payloads:
{ "id": "uuid", "name": "orders", "columns": [...], "extension": { "customField1": "value", "customField2": 123, "customEntityRef": { "id": "uuid", "type": "user", "name": "john" } } }
SDKs should:
- Preserve unknown fields during round-trip (deserialize → serialize)
- Validate custom property types if schema is available
- Handle missing custom properties gracefully (return
/None
/null
)Option::None
API Client Implementation
Standard CRUD Operations
Every entity API should implement:
class EntityAPI(Generic[T, CreateT]): def create_or_update(self, entity: CreateT) -> T: """POST /api/v1/{entities}""" ... def get_by_id(self, entity_id: UUID) -> Optional[T]: """GET /api/v1/{entities}/{id}""" ... def get_by_name(self, fqn: str, fields: List[str] = None) -> Optional[T]: """GET /api/v1/{entities}/name/{fqn}""" ... def list(self, limit: int = 10, fields: List[str] = None) -> ResultList[T]: """GET /api/v1/{entities}""" ... def delete( self, entity_id: UUID, recursive: bool = False, hard_delete: bool = False, ) -> None: """DELETE /api/v1/{entities}/{id}""" ...
API Endpoints Pattern
| Operation | Method | Endpoint |
|---|---|---|
| List | GET | |
| Get by ID | GET | |
| Get by Name | GET | |
| Create/Update | PUT | |
| Patch | PATCH | |
| Delete | DELETE | |
Query Parameters
| Parameter | Description | Example |
|---|---|---|
| Include optional fields | |
| Pagination limit | |
/ | Cursor pagination | |
| Include deleted | |
Mixins for Special Behaviors
Lineage Mixin
class LineageMixin: def add_lineage(self, edge: AddLineage) -> None: """PUT /api/v1/lineage""" ... def get_lineage( self, entity_type: str, entity_id: UUID, up_depth: int = 1, down_depth: int = 1, ) -> EntityLineage: """GET /api/v1/lineage/{type}/{id}""" ...
Tag Mixin
class TagMixin: def add_tag(self, entity_id: UUID, tag_fqn: str) -> None: """PATCH /api/v1/{entities}/{id}""" ... def remove_tag(self, entity_id: UUID, tag_fqn: str) -> None: ...
Owner Mixin
class OwnerMixin: def set_owner(self, entity_id: UUID, owner: EntityReference) -> None: ...
Composing Mixins
class OpenMetadata(LineageMixin, TagMixin, OwnerMixin): """Main client composes all mixins.""" def __init__(self, config: OpenMetadataConnection): self._config = config self._client = self._build_http_client()
Error Handling
Exception Hierarchy
class OpenMetadataException(Exception): """Base exception for all SDK errors.""" pass class AuthenticationError(OpenMetadataException): """Authentication failed.""" pass class EntityNotFoundError(OpenMetadataException): """Entity does not exist.""" pass class ValidationError(OpenMetadataException): """Request validation failed.""" pass class ConflictError(OpenMetadataException): """Entity already exists or version conflict.""" pass class RateLimitError(OpenMetadataException): """Rate limit exceeded.""" retry_after: int
HTTP Status Mapping
| Status | Exception | Action |
|---|---|---|
| 401 | | Re-authenticate |
| 403 | | Check permissions |
| 404 | | Return None or raise |
| 409 | | Handle version conflict |
| 422 | | Fix request payload |
| 429 | | Retry with backoff |
| 5xx | | Retry with backoff |
Return None vs Raise
def get_by_name(self, entity: Type[T], fqn: str) -> Optional[T]: """Return None for 404, raise for other errors.""" try: response = self._client.get(f"/api/v1/{entity.path}/name/{fqn}") return entity.parse_obj(response.json()) except HTTPError as e: if e.response.status_code == 404: return None raise self._map_exception(e)
Implementing a New Language SDK
Step 1: Project Setup
# Directory structure openmetadata-sdk-{lang}/ ├── src/ │ ├── client/ │ │ ├── openmetadata.{ext} │ │ └── connection.{ext} │ ├── auth/ │ │ ├── provider.{ext} │ │ └── jwt.{ext} │ ├── api/ │ │ ├── tables.{ext} │ │ ├── databases.{ext} │ │ └── ... │ ├── models/ │ │ └── generated/ # From JSON schemas │ └── mixins/ │ ├── lineage.{ext} │ └── tags.{ext} ├── tests/ ├── examples/ └── README.md
Step 2: Model Generation
Use JSON Schema to generate models:
# Python: datamodel-codegen datamodel-codegen \ --input json-schemas/ \ --output src/models/generated/ \ --output-model-type pydantic_v2.BaseModel # TypeScript: json-schema-to-typescript npx json-schema-to-typescript \ json-schemas/**/*.json \ --out src/models/ # Rust: schemafy or typify cargo run --bin generate-models -- \ --schema-dir json-schemas/ \ --out-dir src/models/
Step 3: Implement Core Client
// Rust Example pub struct OpenMetadata { config: OpenMetadataConnection, client: reqwest::Client, auth: Box<dyn AuthenticationProvider>, } impl OpenMetadata { pub fn new(config: OpenMetadataConnection) -> Result<Self, Error> { let auth = Self::build_auth_provider(&config)?; let client = Self::build_http_client(&config)?; Ok(Self { config, client, auth }) } pub fn health_check(&self) -> Result<(), Error> { let response = self.client .get(format!("{}/health-check", self.config.host_port)) .send()?; if response.status().is_success() { Ok(()) } else { Err(Error::HealthCheckFailed) } } pub fn tables(&self) -> TablesApi { TablesApi::new(&self.client, &self.auth) } }
Step 4: Implement Entity APIs
// TypeScript Example export class TablesApi { constructor( private client: HttpClient, private auth: AuthenticationProvider, ) {} async getByName(fqn: string, fields?: string[]): Promise<Table | null> { const params = fields ? { fields: fields.join(',') } : {}; try { const response = await this.client.get( `/api/v1/tables/name/${encodeURIComponent(fqn)}`, { params }, ); return response.data as Table; } catch (e) { if (e.response?.status === 404) return null; throw this.mapError(e); } } async createOrUpdate(table: CreateTable): Promise<Table> { const response = await this.client.put('/api/v1/tables', table); return response.data as Table; } async delete( id: string, options: { recursive?: boolean; hardDelete?: boolean } = {}, ): Promise<void> { await this.client.delete(`/api/v1/tables/${id}`, { params: { recursive: options.recursive ?? false, hardDelete: options.hardDelete ?? false, }, }); } }
Step 5: Add Authentication Providers
// Go Example type AuthenticationProvider interface { GetAccessToken() (string, error) } type JWTProvider struct { token string } func (p *JWTProvider) GetAccessToken() (string, error) { return p.token, nil } type OktaProvider struct { clientID string orgURL string privateKey string scopes []string cachedToken string expiresAt time.Time } func (p *OktaProvider) GetAccessToken() (string, error) { if time.Now().Before(p.expiresAt) { return p.cachedToken, nil } // Refresh token via OAuth2 token, expiry, err := p.refreshToken() if err != nil { return "", err } p.cachedToken = token p.expiresAt = expiry return token, nil }
Step 6: Implement Mixins
// Kotlin Example interface LineageMixin { val client: HttpClient suspend fun addLineage(edge: AddLineage) { client.put("/api/v1/lineage", edge) } suspend fun getLineage( entityType: String, entityId: UUID, upDepth: Int = 1, downDepth: Int = 1, ): EntityLineage { return client.get( "/api/v1/lineage/$entityType/$entityId", mapOf("upDepth" to upDepth, "downDepth" to downDepth), ) } } class OpenMetadata( private val config: OpenMetadataConnection, ) : LineageMixin, TagMixin { override val client = buildHttpClient() // ... }
Extending Existing SDKs
Adding New Entity Type
-
Add JSON Schema:
// json-schemas/entity/data/newEntity.json { "$schema": "http://json-schema.org/draft-07/schema#", "title": "NewEntity", "type": "object", "properties": { "id": { "type": "string", "format": "uuid" }, "name": { "type": "string" }, ... } } -
Generate Models:
make generate-models -
Add API Client:
class NewEntityAPI: ENTITY_PATH = "newEntities" def get_by_name(self, fqn: str) -> Optional[NewEntity]: ... -
Register in Main Client:
class OpenMetadata: def new_entities(self) -> NewEntityAPI: return NewEntityAPI(self._client)
Adding New Mixin
-
Define Interface:
class CustomBehaviorMixin: def custom_operation(self, entity_id: UUID) -> Result: ... -
Add to Main Client:
class OpenMetadata(LineageMixin, TagMixin, CustomBehaviorMixin): ...
Adding New Auth Provider
-
Define Config Schema:
{ "title": "NewProviderConfig", "properties": { "apiKey": { "type": "string" }, "endpoint": { "type": "string" } } } -
Implement Provider:
class NewProvider(AuthenticationProvider): def __init__(self, config: NewProviderConfig): self._api_key = config.apiKey self._endpoint = config.endpoint def get_access_token(self) -> str: # Custom auth flow ... -
Register in Factory:
AUTH_PROVIDERS = { AuthProvider.openmetadata: OpenMetadataJWTProvider, AuthProvider.google: GoogleProvider, AuthProvider.new_provider: NewProvider, # Add here }
Testing Strategy
Unit Tests
def test_table_get_by_name(): with responses.RequestsMock() as rsps: rsps.add( responses.GET, "http://localhost:8585/api/v1/tables/name/db.schema.table", json={"id": "123", "name": "table", ...}, status=200, ) client = OpenMetadata(test_config) table = client.get_by_name(Table, "db.schema.table") assert table.name == "table" def test_table_get_by_name_not_found(): with responses.RequestsMock() as rsps: rsps.add( responses.GET, "http://localhost:8585/api/v1/tables/name/missing", status=404, ) client = OpenMetadata(test_config) table = client.get_by_name(Table, "missing") assert table is None
Integration Tests
@pytest.fixture def openmetadata(): """Connect to test OpenMetadata instance.""" config = OpenMetadataConnection( hostPort=os.getenv("OM_HOST", "http://localhost:8585/api"), authProvider=AuthProvider.openmetadata, securityConfig=OpenMetadataJWTClientConfig( jwtToken=os.getenv("OM_TOKEN"), ), ) client = OpenMetadata(config) client.health_check() return client def test_create_and_get_table(openmetadata): create = CreateTable( name=f"test_table_{uuid4().hex[:8]}", databaseSchema="default.default", columns=[ Column(name="id", dataType=DataType.INT), Column(name="name", dataType=DataType.STRING), ], ) table = openmetadata.create_or_update(create) assert table.id is not None fetched = openmetadata.get_by_name(Table, table.fullyQualifiedName) assert fetched.name == create.name # Cleanup openmetadata.delete(Table, table.id, hard_delete=True)
SDK Implementation Checklist
Core Components
- Connection configuration with all auth providers
- HTTP client with retry, timeout, and error handling
- Authentication provider interface and implementations
- Model generation from JSON Schemas
- Health check endpoint
Entity APIs
- Tables API
- Databases API
- Database Schemas API
- Database Services API
- Dashboard API
- Dashboard Services API
- Pipeline API
- Pipeline Services API
- Topic API
- Messaging Services API
- ML Model API
- ML Model Services API
- User/Team APIs
- Tag/Classification APIs
Mixins
- Lineage operations
- Tag operations
- Owner operations
- Custom properties operations
Quality
- Type safety throughout
- Comprehensive error handling
- Unit test coverage > 80%
- Integration test suite
- API documentation
- Usage examples
Contributing to OpenMetadata
All SDK and connector development should be contributed back to the OpenMetadata community. This section covers setting up the development environment and contribution workflows.
Development Environment Setup
Prerequisites
| Tool | Version | Installation |
|---|---|---|
| Docker | 20+ | docs.docker.com |
| Java JDK | 21 | or SDKMAN |
| Maven | 3.5+ | |
| Python | 3.9-3.11 | System or pyenv |
| Node.js | 18.x | |
| Yarn | 1.22+ | |
| Antlr | 4.9.2 | |
| JQ | Latest | |
Verify Prerequisites
make prerequisites
Clone and Setup
# Clone repository git clone https://github.com/open-metadata/OpenMetadata cd OpenMetadata # Setup Python environment python3 -m venv env source env/bin/activate pip install pre-commit # Install development dependencies make install_dev make install_test make precommit_install # Generate models from schemas make generate
Start Development Stack
# MySQL + Elasticsearch (default) docker compose -f docker/development/docker-compose.yml up mysql elasticsearch --build -d # OR PostgreSQL + OpenSearch docker compose -f docker/development/docker-compose-postgres.yml up postgresql opensearch --build -d
Build and Run Server
# Build (skip tests for speed) mvn clean install -DskipTests # Bootstrap database cd openmetadata-dist/target/openmetadata-*/ sh bootstrap/openmetadata-ops.sh drop-create # Start server sh bin/openmetadata-server-start.sh conf/openmetadata.yaml
Access at
http://localhost:8585
Repository Structure
OpenMetadata/ ├── openmetadata-spec/ # JSON Schemas (source of truth) │ └── src/main/resources/json/schema/ │ ├── entity/ # Entity definitions │ │ ├── data/ # Table, Database, Dashboard... │ │ ├── services/ # Service definitions │ │ └── teams/ # User, Team... │ ├── api/ # API request schemas │ └── type/ # Common types │ ├── openmetadata-service/ # Java backend │ └── src/main/java/org/openmetadata/service/ │ ├── resources/ # REST API endpoints (Dropwizard) │ ├── jdbi3/ # Database access layer │ ├── events/ # Change event handlers │ ├── security/ # Auth & authorization │ └── secrets/converter/ # ClassConverters for oneOf │ ├── ingestion/ # Python ingestion framework │ └── src/metadata/ │ ├── ingestion/ │ │ ├── source/ # Source connectors │ │ ├── processor/ # Processors │ │ ├── sink/ # Sinks │ │ └── api/ # Workflow APIs │ └── generated/ # Generated Pydantic models │ └── openmetadata-ui/ # React frontend └── src/main/resources/ui/ ├── src/ │ ├── utils/ # ServiceUtils files │ └── locale/languages/ # i18n translations └── public/locales/ # Entity documentation
Key Directories for Contributions
| Contribution Type | Primary Directory |
|---|---|
| New connector schema | |
| Connector Python code | |
| Java ClassConverter | |
| UI connector config | |
Contributing New Connectors
When to Contribute vs Custom Connector
| Scenario | Approach |
|---|---|
| Connector useful to many users | Contribute to OpenMetadata |
| Single-use, custom data source | Build Custom Connector (not contributed) |
Connector Development Workflow
1. Define JSON Schema ↓ 2. Generate Types (Java/Python/TS) ↓ 3. Implement Python Ingestion Code ↓ 4. Create Java ClassConverter (if oneOf used) ↓ 5. Apply UI Changes ↓ 6. Write Tests ↓ 7. Update Documentation ↓ 8. Submit PR
Step 1: Define JSON Schema
Create connection schema at:
openmetadata-spec/src/main/resources/json/schema/entity/services/connections/{source_type}/
Example: myDatabaseConnection.json
{ "$schema": "http://json-schema.org/draft-07/schema#", "$id": "https://open-metadata.org/schema/entity/services/connections/database/myDatabaseConnection.json", "title": "MyDatabaseConnection", "description": "Connection to MyDatabase", "type": "object", "javaType": "org.openmetadata.schema.services.connections.database.MyDatabaseConnection", "definitions": { "myDatabaseType": { "description": "Service type", "type": "string", "enum": ["MyDatabase"], "default": "MyDatabase" }, "myDatabaseScheme": { "description": "SQLAlchemy driver scheme", "type": "string", "enum": ["mydatabase+driver"], "default": "mydatabase+driver" } }, "properties": { "type": { "$ref": "#/definitions/myDatabaseType" }, "scheme": { "$ref": "#/definitions/myDatabaseScheme" }, "hostPort": { "description": "Host and port", "type": "string" }, "username": { "description": "Username", "type": "string" }, "password": { "description": "Password", "type": "string", "format": "password" }, "database": { "description": "Database name", "type": "string" }, "supportsMetadataExtraction": { "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" } }, "additionalProperties": false, "required": ["hostPort"] }
Register in service schema (
databaseService.json):
{ "config": { "oneOf": [ { "$ref": "./connections/database/myDatabaseConnection.json" } ] } }
Step 2: Generate Types
# Regenerate all models mvn clean install -DskipTests # Python models cd ingestion make generate # TypeScript models (for UI) cd openmetadata-ui/src/main/resources/ui yarn install ./json2ts.sh path/to/myDatabaseConnection.json
Step 3: Implement Python Ingestion
Create connector at:
ingestion/src/metadata/ingestion/source/database/mydatabase/ ├── __init__.py ├── connection.py ├── metadata.py └── service_spec.py
:service_spec.py
from metadata.ingestion.source.database.mydatabase.metadata import MydatabaseSource from metadata.utils.service_spec.default import DefaultDatabaseSpec ServiceSpec = DefaultDatabaseSpec(metadata_source_class=MydatabaseSource)
:connection.py
from metadata.generated.schema.entity.services.connections.database.myDatabaseConnection import ( MyDatabaseConnection, ) from metadata.ingestion.connections.builders import create_generic_db_connection from metadata.ingestion.connections.test_connections import test_connection_db_schema_sources def get_connection(connection: MyDatabaseConnection): return create_generic_db_connection( connection=connection, get_connection_url_fn=get_connection_url, ) def get_connection_url(connection: MyDatabaseConnection) -> str: return f"{connection.scheme.value}://{connection.username}:{connection.password}@{connection.hostPort}/{connection.database}" def test_connection(engine) -> None: test_connection_db_schema_sources(engine)
:metadata.py
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService class MydatabaseSource(CommonDbSourceService): """MyDatabase metadata extraction source.""" @classmethod def create(cls, config_dict, metadata, pipeline_name=None): config = WorkflowSource.parse_obj(config_dict) return cls(config, metadata) # Override methods as needed for custom extraction logic
Step 4: Create Java ClassConverter (if using oneOf
)
oneOfOnly needed if your schema uses
oneOf for auth types:
// openmetadata-service/.../secrets/converter/MyDatabaseConnectionClassConverter.java package org.openmetadata.service.secrets.converter; import org.openmetadata.schema.services.connections.database.MyDatabaseConnection; public class MyDatabaseConnectionClassConverter extends ClassConverter { @Override public Object convert(Object object) { MyDatabaseConnection connection = (MyDatabaseConnection) JsonUtils.convertValue(object, MyDatabaseConnection.class); // Handle oneOf auth types if needed return connection; } }
Register in
ClassConverterFactory.java:
Map.entry(MyDatabaseConnection.class, new MyDatabaseConnectionClassConverter())
Step 5: Apply UI Changes
Update ServiceUtils (
DatabaseServiceUtils.ts):
import myDatabaseConnection from '../jsons/connectionSchemas/connections/database/myDatabaseConnection.json'; // In getDatabaseConfig switch: case DatabaseServiceType.MyDatabase: { schema = myDatabaseConnection; break; }
Create documentation at:
openmetadata-ui/.../public/locales/en-US/Database/MyDatabase.md
Step 6: Write Tests
# ingestion/tests/unit/source/database/test_mydatabase.py import pytest from metadata.ingestion.source.database.mydatabase.metadata import MydatabaseSource def test_connection_url(): connection = MyDatabaseConnection( hostPort="localhost:5432", username="user", password="pass", database="mydb", ) url = get_connection_url(connection) assert url == "mydatabase+driver://user:pass@localhost:5432/mydb"
Step 7: Update Documentation
Create comprehensive docs following OpenMetadata patterns:
- Connector overview
- Prerequisites
- Configuration steps
- Troubleshooting
Type Generation
JSON Schema → Multi-Language Models
JSON Schema (source of truth) ↓ ┌───────────────────────────────────────────┐ │ │ ↓ ↓ ↓ ↓ Java Python TypeScript (Others) POJOs Pydantic Interfaces Models
Generation Commands
| Language | Tool | Command |
|---|---|---|
| Java | jsonschema2pojo | |
| Python | datamodel-codegen | |
| TypeScript | quicktype | |
Generated Output Locations
| Language | Output Directory |
|---|---|
| Java | |
| Python | |
| TypeScript | |
Testing
Python Tests
cd ingestion # Install test dependencies make install_test # Run all tests with coverage make coverage # Run specific tests pytest tests/unit/source/database/test_mydatabase.py -v # Lint and format make lint make black make isort
Java Tests
# Run all tests mvn test # Run specific test class mvn test -Dtest=MyDatabaseConnectionTest # Skip tests during build mvn clean install -DskipTests
Integration Tests
Require running OpenMetadata server:
# Start server first sh bin/openmetadata-server-start.sh conf/openmetadata.yaml # Run integration tests pytest tests/integration/ -v
Pre-commit Hooks
# Install hooks make precommit_install # Run manually pre-commit run --all-files
Contribution Checklist
New Connector
- JSON Schema defined with all required properties
- Schema registered in service type file
- Java/Python/TypeScript types generated
- Python Source implemented
- Java ClassConverter (if oneOf used)
- UI ServiceUtils updated
- UI documentation created
- Unit tests written
- Integration tests passing
- Documentation updated
- Pre-commit hooks passing
- PR submitted with description
SDK Extension
- JSON Schema updated/created
- Types regenerated
- Python/Java code implemented
- Tests written
- Documentation updated
References
SDK Documentation
- OpenMetadata SDK Documentation
- OpenMetadata Python SDK
- OpenMetadata Java SDK
- OpenMetadata API (Swagger)
Contributing
- Build Prerequisites
- Build & Run Server
- Ingestion Framework
- Developing New Connectors
- Architecture Overview
- Code Layout
Source Code
Related Skills
- Foundational SDK patternsmeta-sdk-patterns-eng
- Using OpenMetadata SDKs/APIsopenmetadata-dev
- Administering OpenMetadataopenmetadata-ops