Agents openmetadata-sdk-dev

Develop and contribute to OpenMetadata SDKs, connectors, and core platform. Use when implementing new language SDKs, building connectors for contribution, extending SDK capabilities, or setting up the OpenMetadata development environment.

install
source · Clone the upstream repo
git clone https://github.com/aRustyDev/agents
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/aRustyDev/agents "$T" && mkdir -p ~/.claude/skills && cp -r "$T/content/skills/openmetadata-sdk-dev" ~/.claude/skills/arustydev-agents-openmetadata-sdk-dev && rm -rf "$T"
manifest: content/skills/openmetadata-sdk-dev/SKILL.md
source content

OpenMetadata SDK & Connector Development

Guide for developing OpenMetadata SDKs, connectors, and contributions to the core platform. All SDK and connector development is intended to be contributed back to the community.

Note: This skill extends patterns from

meta-sdk-patterns-eng
. See that skill for foundational SDK patterns (architecture, error handling, configuration, testing strategies, packaging).

When to Use This Skill

  • Implementing OpenMetadata SDK for a new language
  • Extending existing Python or Java SDK with new features
  • Contributing new connectors to OpenMetadata
  • Adding new entity type support
  • Implementing authentication providers
  • Setting up OpenMetadata development environment
  • Generating entity models from JSON Schemas

This Skill Does NOT Cover

  • Using the existing Python/Java SDK to interact with OpenMetadata (see
    openmetadata-dev
    )
  • Deploying or operating OpenMetadata
  • Administering users, bots, and policies (see
    openmetadata-ops
    )

OpenMetadata SDK Architecture

Core Components

Every OpenMetadata SDK implements these components:

┌─────────────────────────────────────────────────────────────┐
│                    OpenMetadata Client                       │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │ Connection  │  │    Auth     │  │   API Clients       │  │
│  │   Config    │  │  Provider   │  │  (Tables, Dashes..) │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────┐│
│  │              Entity Models (Generated)                  ││
│  │   Table, Database, Dashboard, Pipeline, MlModel, etc.   ││
│  └─────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────┐│
│  │              HTTP Client / Transport Layer              ││
│  └─────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────┘

Pattern: Gateway with Typed API Clients

OpenMetadata SDKs use a gateway pattern where the main client builds typed API clients:

# Python Pattern
class OpenMetadata:
    def __init__(self, config: OpenMetadataConnection):
        self._config = config
        self._client = self._build_client()

    def get_by_name(self, entity: Type[T], fqn: str) -> Optional[T]:
        """Generic method using TypeVar for type safety."""
        ...

    def create_or_update(self, data: CreateEntity) -> Entity:
        """Handles both create and update operations."""
        ...
// Java Pattern
public class OpenMetadata {
    private final OpenMetadataConnection config;

    public <T> T buildClient(Class<T> apiClass) {
        // Build typed API client
        return clientBuilder.build(apiClass);
    }
}

// Usage
TablesApi tablesApi = openMetadata.buildClient(TablesApi.class);
DashboardsApi dashboardApi = openMetadata.buildClient(DashboardsApi.class);

Connection Configuration

Configuration Object

# Python
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
    AuthProvider,
)

server_config = OpenMetadataConnection(
    hostPort="http://localhost:8585/api",
    authProvider=AuthProvider.openmetadata,
    securityConfig=OpenMetadataJWTClientConfig(jwtToken="<token>"),
    verifySSL="validate",  # or "ignore", "no-ssl"
    sslConfig=ValidateSslClientConfig(caCertificate="/path/to/cert"),
)
// Java
OpenMetadataConnection server = new OpenMetadataConnection();
server.setHostPort("http://localhost:8585/api");
server.setApiVersion("v1");
server.setAuthProvider(OpenMetadataConnection.AuthProvider.OPENMETADATA);
server.setSecurityConfig(jwtClientConfig);

Configuration Fields

FieldRequiredDescription
hostPort
YesBase URL including
/api
authProvider
YesAuthentication provider type
securityConfig
YesProvider-specific auth config
apiVersion
NoAPI version (default:
v1
)
verifySSL
NoSSL verification mode
sslConfig
NoCustom SSL certificates

Authentication Providers

Provider Architecture

Implement pluggable authentication with a provider interface:

# Python
class AuthenticationProvider(ABC):
    @abstractmethod
    def get_access_token(self) -> str:
        """Return valid access token."""
        pass

class OpenMetadataJWTProvider(AuthenticationProvider):
    def __init__(self, config: OpenMetadataJWTClientConfig):
        self._token = config.jwtToken

    def get_access_token(self) -> str:
        return self._token

class OktaProvider(AuthenticationProvider):
    def __init__(self, config: OktaClientConfig):
        self._client_id = config.clientId
        self._org_url = config.orgURL
        self._scopes = config.scopes

    def get_access_token(self) -> str:
        # OAuth2 token exchange
        ...
// Java
public interface AuthenticationProvider {
    String getAccessToken();
}

public class NoOpAuthenticationProvider implements AuthenticationProvider {
    @Override
    public String getAccessToken() {
        return "";
    }
}

public class GoogleAuthenticationProvider implements AuthenticationProvider {
    private final GoogleSSOClientConfig config;

    @Override
    public String getAccessToken() {
        // OAuth2 flow with Google
        ...
    }
}

Supported Providers

ProviderConfig ClassAuth Flow
openmetadata
OpenMetadataJWTClientConfig
Static JWT token
google
GoogleSSOClientConfig
OAuth2 OIDC
okta
OktaClientConfig
OAuth2 OIDC
auth0
Auth0ClientConfig
OAuth2 OIDC
azure
AzureClientConfig
OAuth2 OIDC
custom-oidc
CustomOIDCClientConfig
OAuth2 OIDC
no-auth
NoneNo authentication

Implementing New Provider

  1. Define configuration schema (JSON Schema)
  2. Generate config class from schema
  3. Implement
    AuthenticationProvider
    interface
  4. Register in provider factory
  5. Add to
    AuthProvider
    enum

Bot Token Internals

OpenMetadata Bots are service accounts that provide JWT tokens for SDK authentication. When implementing SDK auth:

Bot Token Structure

Bot tokens are JWTs with specific claims:

{
  "sub": "ingestion-bot",
  "iss": "open-metadata.org",
  "iat": 1234567890,
  "exp": 1234567890,
  "email": "ingestion-bot@openmetadata.org",
  "isBot": true
}

SDK Token Validation

When implementing auth provider, validate bot tokens:

# Python
import jwt
from typing import Optional

class BotTokenValidator:
    def __init__(self, public_key: str, issuer: str = "open-metadata.org"):
        self._public_key = public_key
        self._issuer = issuer

    def validate(self, token: str) -> Optional[dict]:
        try:
            payload = jwt.decode(
                token,
                self._public_key,
                algorithms=["RS256"],
                issuer=self._issuer,
            )
            if not payload.get("isBot", False):
                raise ValueError("Token is not a bot token")
            return payload
        except jwt.ExpiredSignatureError:
            raise AuthenticationError("Bot token expired")
        except jwt.InvalidTokenError as e:
            raise AuthenticationError(f"Invalid bot token: {e}")
// Rust
use jsonwebtoken::{decode, DecodingKey, Validation, Algorithm};

#[derive(Debug, Deserialize)]
struct BotClaims {
    sub: String,
    iss: String,
    exp: u64,
    is_bot: bool,
}

impl BotTokenValidator {
    pub fn validate(&self, token: &str) -> Result<BotClaims, AuthError> {
        let mut validation = Validation::new(Algorithm::RS256);
        validation.set_issuer(&["open-metadata.org"]);

        let token_data = decode::<BotClaims>(
            token,
            &DecodingKey::from_rsa_pem(self.public_key.as_bytes())?,
            &validation,
        )?;

        if !token_data.claims.is_bot {
            return Err(AuthError::NotBotToken);
        }

        Ok(token_data.claims)
    }
}

Token Refresh Handling

Bot tokens have expiration. SDKs should handle refresh:

class BotAuthProvider(AuthenticationProvider):
    def __init__(self, config: BotConfig):
        self._config = config
        self._cached_token: Optional[str] = None
        self._expires_at: Optional[datetime] = None

    def get_access_token(self) -> str:
        if self._is_token_valid():
            return self._cached_token

        # Refresh token from OpenMetadata API
        self._cached_token = self._refresh_token()
        self._expires_at = self._parse_expiry(self._cached_token)
        return self._cached_token

    def _is_token_valid(self) -> bool:
        if not self._cached_token or not self._expires_at:
            return False
        # Refresh 5 minutes before expiry
        return datetime.utcnow() < (self._expires_at - timedelta(minutes=5))

Entity Models

Schema-Driven Generation

OpenMetadata entities are defined as JSON Schemas and models are generated:

json-schemas/
├── entity/
│   ├── data/
│   │   ├── table.json
│   │   ├── database.json
│   │   └── dashboard.json
│   ├── services/
│   │   └── databaseService.json
│   └── teams/
│       └── user.json
└── api/
    ├── data/
    │   ├── createTable.json
    │   └── createDatabase.json
    └── services/
        └── createDatabaseService.json

Entity vs API Models

OpenMetadata separates entity definitions from API request models:

TypePurposeExample
EntityResponse/read models
Table
,
Database
,
Dashboard
CreatePOST request body
CreateTable
,
CreateDatabase
UpdatePATCH request bodyPartial entity fields
# Entity model (response)
class Table(BaseModel):
    id: UUID
    name: str
    fullyQualifiedName: str
    columns: List[Column]
    database: EntityReference
    ...

# API model (request)
class CreateTable(BaseModel):
    name: str
    columns: List[Column]
    databaseSchema: FullyQualifiedEntityName
    ...

Entity Hierarchy

DatabaseService
    └── Database
        └── DatabaseSchema
            └── Table
                └── Column

DashboardService
    └── Dashboard
        └── Chart

PipelineService
    └── Pipeline
        └── Task

MessagingService
    └── Topic

Entity References

Link entities using references:

# By fully qualified name
table = CreateTable(
    name="orders",
    databaseSchema="prod.sales.public",  # FQN string
    columns=[...],
)

# By EntityReference
table.owner = EntityReference(
    id=user_uuid,
    type="user",
)

Custom Property Model Handling

OpenMetadata supports user-defined custom properties on entities. SDKs must handle these dynamic fields.

Schema Definition

Custom properties are defined per entity type:

{
  "name": "customField",
  "propertyType": {
    "id": "uuid",
    "type": "type",
    "name": "string"
  },
  "description": "Custom field description"
}

SDK Model Strategy

Option 1: Extension Dictionary (Recommended)

Keep generated models clean, store custom properties separately:

# Python
class Table(BaseModel):
    id: UUID
    name: str
    columns: List[Column]
    # ... standard fields

    extension: Optional[Dict[str, Any]] = None  # Custom properties

    def get_custom_property(self, name: str) -> Any:
        if self.extension is None:
            return None
        return self.extension.get(name)

    def set_custom_property(self, name: str, value: Any) -> None:
        if self.extension is None:
            self.extension = {}
        self.extension[name] = value
// TypeScript
interface Table {
    id: string;
    name: string;
    columns: Column[];
    // ... standard fields

    extension?: Record<string, unknown>;  // Custom properties
}

function getCustomProperty<T>(entity: Table, name: string): T | undefined {
    return entity.extension?.[name] as T | undefined;
}
// Rust
#[derive(Debug, Serialize, Deserialize)]
pub struct Table {
    pub id: Uuid,
    pub name: String,
    pub columns: Vec<Column>,
    // ... standard fields

    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub extension: Option<HashMap<String, serde_json::Value>>,
}

impl Table {
    pub fn get_custom_property<T: DeserializeOwned>(&self, name: &str) -> Option<T> {
        self.extension
            .as_ref()?
            .get(name)
            .and_then(|v| serde_json::from_value(v.clone()).ok())
    }
}

Option 2: Dynamic Model Generation

Generate models at runtime based on custom property definitions:

# Python - dynamic model creation
from pydantic import create_model

def build_table_model(custom_properties: List[CustomProperty]) -> Type[BaseModel]:
    """Build Table model with custom properties as typed fields."""
    extra_fields = {}
    for prop in custom_properties:
        field_type = PROPERTY_TYPE_MAP.get(prop.propertyType.name, Any)
        extra_fields[prop.name] = (Optional[field_type], None)

    return create_model(
        'TableWithCustomProperties',
        __base__=Table,
        **extra_fields,
    )

Type Mapping for Custom Properties

OpenMetadata TypePythonTypeScriptRustGo
string
str
string
String
string
integer
int
number
i64
int64
number
float
number
f64
float64
markdown
str
string
String
string
enum
Enum
string
enum
string
date
date
string
NaiveDate
time.Time
dateTime
datetime
string
DateTime<Utc>
time.Time
time
time
string
NaiveTime
time.Time
duration
timedelta
string
Duration
time.Duration
entityReference
EntityReference
EntityReference
EntityReference
EntityReference
entityReferenceList
List[EntityReference]
EntityReference[]
Vec<EntityReference>
[]EntityReference

Serialization Considerations

Custom properties use the

extension
field in API payloads:

{
  "id": "uuid",
  "name": "orders",
  "columns": [...],
  "extension": {
    "customField1": "value",
    "customField2": 123,
    "customEntityRef": {
      "id": "uuid",
      "type": "user",
      "name": "john"
    }
  }
}

SDKs should:

  1. Preserve unknown fields during round-trip (deserialize → serialize)
  2. Validate custom property types if schema is available
  3. Handle missing custom properties gracefully (return
    None
    /
    null
    /
    Option::None
    )

API Client Implementation

Standard CRUD Operations

Every entity API should implement:

class EntityAPI(Generic[T, CreateT]):
    def create_or_update(self, entity: CreateT) -> T:
        """POST /api/v1/{entities}"""
        ...

    def get_by_id(self, entity_id: UUID) -> Optional[T]:
        """GET /api/v1/{entities}/{id}"""
        ...

    def get_by_name(self, fqn: str, fields: List[str] = None) -> Optional[T]:
        """GET /api/v1/{entities}/name/{fqn}"""
        ...

    def list(self, limit: int = 10, fields: List[str] = None) -> ResultList[T]:
        """GET /api/v1/{entities}"""
        ...

    def delete(
        self,
        entity_id: UUID,
        recursive: bool = False,
        hard_delete: bool = False,
    ) -> None:
        """DELETE /api/v1/{entities}/{id}"""
        ...

API Endpoints Pattern

OperationMethodEndpoint
ListGET
/api/v1/{entities}
Get by IDGET
/api/v1/{entities}/{id}
Get by NameGET
/api/v1/{entities}/name/{fqn}
Create/UpdatePUT
/api/v1/{entities}
PatchPATCH
/api/v1/{entities}/{id}
DeleteDELETE
/api/v1/{entities}/{id}

Query Parameters

ParameterDescriptionExample
fields
Include optional fields
?fields=columns,owner
limit
Pagination limit
?limit=100
before
/
after
Cursor pagination
?after={cursor}
include
Include deleted
?include=deleted

Mixins for Special Behaviors

Lineage Mixin

class LineageMixin:
    def add_lineage(self, edge: AddLineage) -> None:
        """PUT /api/v1/lineage"""
        ...

    def get_lineage(
        self,
        entity_type: str,
        entity_id: UUID,
        up_depth: int = 1,
        down_depth: int = 1,
    ) -> EntityLineage:
        """GET /api/v1/lineage/{type}/{id}"""
        ...

Tag Mixin

class TagMixin:
    def add_tag(self, entity_id: UUID, tag_fqn: str) -> None:
        """PATCH /api/v1/{entities}/{id}"""
        ...

    def remove_tag(self, entity_id: UUID, tag_fqn: str) -> None:
        ...

Owner Mixin

class OwnerMixin:
    def set_owner(self, entity_id: UUID, owner: EntityReference) -> None:
        ...

Composing Mixins

class OpenMetadata(LineageMixin, TagMixin, OwnerMixin):
    """Main client composes all mixins."""

    def __init__(self, config: OpenMetadataConnection):
        self._config = config
        self._client = self._build_http_client()

Error Handling

Exception Hierarchy

class OpenMetadataException(Exception):
    """Base exception for all SDK errors."""
    pass

class AuthenticationError(OpenMetadataException):
    """Authentication failed."""
    pass

class EntityNotFoundError(OpenMetadataException):
    """Entity does not exist."""
    pass

class ValidationError(OpenMetadataException):
    """Request validation failed."""
    pass

class ConflictError(OpenMetadataException):
    """Entity already exists or version conflict."""
    pass

class RateLimitError(OpenMetadataException):
    """Rate limit exceeded."""
    retry_after: int

HTTP Status Mapping

StatusExceptionAction
401
AuthenticationError
Re-authenticate
403
AuthorizationError
Check permissions
404
EntityNotFoundError
Return None or raise
409
ConflictError
Handle version conflict
422
ValidationError
Fix request payload
429
RateLimitError
Retry with backoff
5xx
ServerError
Retry with backoff

Return None vs Raise

def get_by_name(self, entity: Type[T], fqn: str) -> Optional[T]:
    """Return None for 404, raise for other errors."""
    try:
        response = self._client.get(f"/api/v1/{entity.path}/name/{fqn}")
        return entity.parse_obj(response.json())
    except HTTPError as e:
        if e.response.status_code == 404:
            return None
        raise self._map_exception(e)

Implementing a New Language SDK

Step 1: Project Setup

# Directory structure
openmetadata-sdk-{lang}/
├── src/
│   ├── client/
│   │   ├── openmetadata.{ext}
│   │   └── connection.{ext}
│   ├── auth/
│   │   ├── provider.{ext}
│   │   └── jwt.{ext}
│   ├── api/
│   │   ├── tables.{ext}
│   │   ├── databases.{ext}
│   │   └── ...
│   ├── models/
│   │   └── generated/      # From JSON schemas
│   └── mixins/
│       ├── lineage.{ext}
│       └── tags.{ext}
├── tests/
├── examples/
└── README.md

Step 2: Model Generation

Use JSON Schema to generate models:

# Python: datamodel-codegen
datamodel-codegen \
    --input json-schemas/ \
    --output src/models/generated/ \
    --output-model-type pydantic_v2.BaseModel

# TypeScript: json-schema-to-typescript
npx json-schema-to-typescript \
    json-schemas/**/*.json \
    --out src/models/

# Rust: schemafy or typify
cargo run --bin generate-models -- \
    --schema-dir json-schemas/ \
    --out-dir src/models/

Step 3: Implement Core Client

// Rust Example
pub struct OpenMetadata {
    config: OpenMetadataConnection,
    client: reqwest::Client,
    auth: Box<dyn AuthenticationProvider>,
}

impl OpenMetadata {
    pub fn new(config: OpenMetadataConnection) -> Result<Self, Error> {
        let auth = Self::build_auth_provider(&config)?;
        let client = Self::build_http_client(&config)?;

        Ok(Self { config, client, auth })
    }

    pub fn health_check(&self) -> Result<(), Error> {
        let response = self.client
            .get(format!("{}/health-check", self.config.host_port))
            .send()?;

        if response.status().is_success() {
            Ok(())
        } else {
            Err(Error::HealthCheckFailed)
        }
    }

    pub fn tables(&self) -> TablesApi {
        TablesApi::new(&self.client, &self.auth)
    }
}

Step 4: Implement Entity APIs

// TypeScript Example
export class TablesApi {
    constructor(
        private client: HttpClient,
        private auth: AuthenticationProvider,
    ) {}

    async getByName(fqn: string, fields?: string[]): Promise<Table | null> {
        const params = fields ? { fields: fields.join(',') } : {};
        try {
            const response = await this.client.get(
                `/api/v1/tables/name/${encodeURIComponent(fqn)}`,
                { params },
            );
            return response.data as Table;
        } catch (e) {
            if (e.response?.status === 404) return null;
            throw this.mapError(e);
        }
    }

    async createOrUpdate(table: CreateTable): Promise<Table> {
        const response = await this.client.put('/api/v1/tables', table);
        return response.data as Table;
    }

    async delete(
        id: string,
        options: { recursive?: boolean; hardDelete?: boolean } = {},
    ): Promise<void> {
        await this.client.delete(`/api/v1/tables/${id}`, {
            params: {
                recursive: options.recursive ?? false,
                hardDelete: options.hardDelete ?? false,
            },
        });
    }
}

Step 5: Add Authentication Providers

// Go Example
type AuthenticationProvider interface {
    GetAccessToken() (string, error)
}

type JWTProvider struct {
    token string
}

func (p *JWTProvider) GetAccessToken() (string, error) {
    return p.token, nil
}

type OktaProvider struct {
    clientID    string
    orgURL      string
    privateKey  string
    scopes      []string
    cachedToken string
    expiresAt   time.Time
}

func (p *OktaProvider) GetAccessToken() (string, error) {
    if time.Now().Before(p.expiresAt) {
        return p.cachedToken, nil
    }
    // Refresh token via OAuth2
    token, expiry, err := p.refreshToken()
    if err != nil {
        return "", err
    }
    p.cachedToken = token
    p.expiresAt = expiry
    return token, nil
}

Step 6: Implement Mixins

// Kotlin Example
interface LineageMixin {
    val client: HttpClient

    suspend fun addLineage(edge: AddLineage) {
        client.put("/api/v1/lineage", edge)
    }

    suspend fun getLineage(
        entityType: String,
        entityId: UUID,
        upDepth: Int = 1,
        downDepth: Int = 1,
    ): EntityLineage {
        return client.get(
            "/api/v1/lineage/$entityType/$entityId",
            mapOf("upDepth" to upDepth, "downDepth" to downDepth),
        )
    }
}

class OpenMetadata(
    private val config: OpenMetadataConnection,
) : LineageMixin, TagMixin {
    override val client = buildHttpClient()
    // ...
}

Extending Existing SDKs

Adding New Entity Type

  1. Add JSON Schema:

    // json-schemas/entity/data/newEntity.json
    {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "title": "NewEntity",
      "type": "object",
      "properties": {
        "id": { "type": "string", "format": "uuid" },
        "name": { "type": "string" },
        ...
      }
    }
    
  2. Generate Models:

    make generate-models
    
  3. Add API Client:

    class NewEntityAPI:
        ENTITY_PATH = "newEntities"
    
        def get_by_name(self, fqn: str) -> Optional[NewEntity]:
            ...
    
  4. Register in Main Client:

    class OpenMetadata:
        def new_entities(self) -> NewEntityAPI:
            return NewEntityAPI(self._client)
    

Adding New Mixin

  1. Define Interface:

    class CustomBehaviorMixin:
        def custom_operation(self, entity_id: UUID) -> Result:
            ...
    
  2. Add to Main Client:

    class OpenMetadata(LineageMixin, TagMixin, CustomBehaviorMixin):
        ...
    

Adding New Auth Provider

  1. Define Config Schema:

    {
      "title": "NewProviderConfig",
      "properties": {
        "apiKey": { "type": "string" },
        "endpoint": { "type": "string" }
      }
    }
    
  2. Implement Provider:

    class NewProvider(AuthenticationProvider):
        def __init__(self, config: NewProviderConfig):
            self._api_key = config.apiKey
            self._endpoint = config.endpoint
    
        def get_access_token(self) -> str:
            # Custom auth flow
            ...
    
  3. Register in Factory:

    AUTH_PROVIDERS = {
        AuthProvider.openmetadata: OpenMetadataJWTProvider,
        AuthProvider.google: GoogleProvider,
        AuthProvider.new_provider: NewProvider,  # Add here
    }
    

Testing Strategy

Unit Tests

def test_table_get_by_name():
    with responses.RequestsMock() as rsps:
        rsps.add(
            responses.GET,
            "http://localhost:8585/api/v1/tables/name/db.schema.table",
            json={"id": "123", "name": "table", ...},
            status=200,
        )

        client = OpenMetadata(test_config)
        table = client.get_by_name(Table, "db.schema.table")

        assert table.name == "table"

def test_table_get_by_name_not_found():
    with responses.RequestsMock() as rsps:
        rsps.add(
            responses.GET,
            "http://localhost:8585/api/v1/tables/name/missing",
            status=404,
        )

        client = OpenMetadata(test_config)
        table = client.get_by_name(Table, "missing")

        assert table is None

Integration Tests

@pytest.fixture
def openmetadata():
    """Connect to test OpenMetadata instance."""
    config = OpenMetadataConnection(
        hostPort=os.getenv("OM_HOST", "http://localhost:8585/api"),
        authProvider=AuthProvider.openmetadata,
        securityConfig=OpenMetadataJWTClientConfig(
            jwtToken=os.getenv("OM_TOKEN"),
        ),
    )
    client = OpenMetadata(config)
    client.health_check()
    return client

def test_create_and_get_table(openmetadata):
    create = CreateTable(
        name=f"test_table_{uuid4().hex[:8]}",
        databaseSchema="default.default",
        columns=[
            Column(name="id", dataType=DataType.INT),
            Column(name="name", dataType=DataType.STRING),
        ],
    )

    table = openmetadata.create_or_update(create)
    assert table.id is not None

    fetched = openmetadata.get_by_name(Table, table.fullyQualifiedName)
    assert fetched.name == create.name

    # Cleanup
    openmetadata.delete(Table, table.id, hard_delete=True)

SDK Implementation Checklist

Core Components

  • Connection configuration with all auth providers
  • HTTP client with retry, timeout, and error handling
  • Authentication provider interface and implementations
  • Model generation from JSON Schemas
  • Health check endpoint

Entity APIs

  • Tables API
  • Databases API
  • Database Schemas API
  • Database Services API
  • Dashboard API
  • Dashboard Services API
  • Pipeline API
  • Pipeline Services API
  • Topic API
  • Messaging Services API
  • ML Model API
  • ML Model Services API
  • User/Team APIs
  • Tag/Classification APIs

Mixins

  • Lineage operations
  • Tag operations
  • Owner operations
  • Custom properties operations

Quality

  • Type safety throughout
  • Comprehensive error handling
  • Unit test coverage > 80%
  • Integration test suite
  • API documentation
  • Usage examples

Contributing to OpenMetadata

All SDK and connector development should be contributed back to the OpenMetadata community. This section covers setting up the development environment and contribution workflows.

Development Environment Setup

Prerequisites

ToolVersionInstallation
Docker20+docs.docker.com
Java JDK21
brew install openjdk@21
or SDKMAN
Maven3.5+
brew install maven
Python3.9-3.11System or pyenv
Node.js18.x
brew install node@18
Yarn1.22+
npm install -g yarn
Antlr4.9.2
sudo make install_antlr_cli
JQLatest
brew install jq

Verify Prerequisites

make prerequisites

Clone and Setup

# Clone repository
git clone https://github.com/open-metadata/OpenMetadata
cd OpenMetadata

# Setup Python environment
python3 -m venv env
source env/bin/activate
pip install pre-commit

# Install development dependencies
make install_dev
make install_test
make precommit_install

# Generate models from schemas
make generate

Start Development Stack

# MySQL + Elasticsearch (default)
docker compose -f docker/development/docker-compose.yml up mysql elasticsearch --build -d

# OR PostgreSQL + OpenSearch
docker compose -f docker/development/docker-compose-postgres.yml up postgresql opensearch --build -d

Build and Run Server

# Build (skip tests for speed)
mvn clean install -DskipTests

# Bootstrap database
cd openmetadata-dist/target/openmetadata-*/
sh bootstrap/openmetadata-ops.sh drop-create

# Start server
sh bin/openmetadata-server-start.sh conf/openmetadata.yaml

Access at

http://localhost:8585


Repository Structure

OpenMetadata/
├── openmetadata-spec/                    # JSON Schemas (source of truth)
│   └── src/main/resources/json/schema/
│       ├── entity/                       # Entity definitions
│       │   ├── data/                     # Table, Database, Dashboard...
│       │   ├── services/                 # Service definitions
│       │   └── teams/                    # User, Team...
│       ├── api/                          # API request schemas
│       └── type/                         # Common types
│
├── openmetadata-service/                 # Java backend
│   └── src/main/java/org/openmetadata/service/
│       ├── resources/                    # REST API endpoints (Dropwizard)
│       ├── jdbi3/                        # Database access layer
│       ├── events/                       # Change event handlers
│       ├── security/                     # Auth & authorization
│       └── secrets/converter/            # ClassConverters for oneOf
│
├── ingestion/                            # Python ingestion framework
│   └── src/metadata/
│       ├── ingestion/
│       │   ├── source/                   # Source connectors
│       │   ├── processor/                # Processors
│       │   ├── sink/                     # Sinks
│       │   └── api/                      # Workflow APIs
│       └── generated/                    # Generated Pydantic models
│
└── openmetadata-ui/                      # React frontend
    └── src/main/resources/ui/
        ├── src/
        │   ├── utils/                    # ServiceUtils files
        │   └── locale/languages/         # i18n translations
        └── public/locales/               # Entity documentation

Key Directories for Contributions

Contribution TypePrimary Directory
New connector schema
openmetadata-spec/.../connections/
Connector Python code
ingestion/src/metadata/ingestion/source/
Java ClassConverter
openmetadata-service/.../secrets/converter/
UI connector config
openmetadata-ui/.../utils/

Contributing New Connectors

When to Contribute vs Custom Connector

ScenarioApproach
Connector useful to many usersContribute to OpenMetadata
Single-use, custom data sourceBuild Custom Connector (not contributed)

Connector Development Workflow

1. Define JSON Schema
       ↓
2. Generate Types (Java/Python/TS)
       ↓
3. Implement Python Ingestion Code
       ↓
4. Create Java ClassConverter (if oneOf used)
       ↓
5. Apply UI Changes
       ↓
6. Write Tests
       ↓
7. Update Documentation
       ↓
8. Submit PR

Step 1: Define JSON Schema

Create connection schema at:

openmetadata-spec/src/main/resources/json/schema/entity/services/connections/{source_type}/

Example:

myDatabaseConnection.json

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "https://open-metadata.org/schema/entity/services/connections/database/myDatabaseConnection.json",
  "title": "MyDatabaseConnection",
  "description": "Connection to MyDatabase",
  "type": "object",
  "javaType": "org.openmetadata.schema.services.connections.database.MyDatabaseConnection",
  "definitions": {
    "myDatabaseType": {
      "description": "Service type",
      "type": "string",
      "enum": ["MyDatabase"],
      "default": "MyDatabase"
    },
    "myDatabaseScheme": {
      "description": "SQLAlchemy driver scheme",
      "type": "string",
      "enum": ["mydatabase+driver"],
      "default": "mydatabase+driver"
    }
  },
  "properties": {
    "type": {
      "$ref": "#/definitions/myDatabaseType"
    },
    "scheme": {
      "$ref": "#/definitions/myDatabaseScheme"
    },
    "hostPort": {
      "description": "Host and port",
      "type": "string"
    },
    "username": {
      "description": "Username",
      "type": "string"
    },
    "password": {
      "description": "Password",
      "type": "string",
      "format": "password"
    },
    "database": {
      "description": "Database name",
      "type": "string"
    },
    "supportsMetadataExtraction": {
      "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
    }
  },
  "additionalProperties": false,
  "required": ["hostPort"]
}

Register in service schema (

databaseService.json
):

{
  "config": {
    "oneOf": [
      { "$ref": "./connections/database/myDatabaseConnection.json" }
    ]
  }
}

Step 2: Generate Types

# Regenerate all models
mvn clean install -DskipTests

# Python models
cd ingestion
make generate

# TypeScript models (for UI)
cd openmetadata-ui/src/main/resources/ui
yarn install
./json2ts.sh path/to/myDatabaseConnection.json

Step 3: Implement Python Ingestion

Create connector at:

ingestion/src/metadata/ingestion/source/database/mydatabase/
├── __init__.py
├── connection.py
├── metadata.py
└── service_spec.py

service_spec.py
:

from metadata.ingestion.source.database.mydatabase.metadata import MydatabaseSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(metadata_source_class=MydatabaseSource)

connection.py
:

from metadata.generated.schema.entity.services.connections.database.myDatabaseConnection import (
    MyDatabaseConnection,
)
from metadata.ingestion.connections.builders import create_generic_db_connection
from metadata.ingestion.connections.test_connections import test_connection_db_schema_sources

def get_connection(connection: MyDatabaseConnection):
    return create_generic_db_connection(
        connection=connection,
        get_connection_url_fn=get_connection_url,
    )

def get_connection_url(connection: MyDatabaseConnection) -> str:
    return f"{connection.scheme.value}://{connection.username}:{connection.password}@{connection.hostPort}/{connection.database}"

def test_connection(engine) -> None:
    test_connection_db_schema_sources(engine)

metadata.py
:

from metadata.ingestion.source.database.common_db_source import CommonDbSourceService

class MydatabaseSource(CommonDbSourceService):
    """MyDatabase metadata extraction source."""

    @classmethod
    def create(cls, config_dict, metadata, pipeline_name=None):
        config = WorkflowSource.parse_obj(config_dict)
        return cls(config, metadata)

    # Override methods as needed for custom extraction logic

Step 4: Create Java ClassConverter (if using
oneOf
)

Only needed if your schema uses

oneOf
for auth types:

// openmetadata-service/.../secrets/converter/MyDatabaseConnectionClassConverter.java
package org.openmetadata.service.secrets.converter;

import org.openmetadata.schema.services.connections.database.MyDatabaseConnection;

public class MyDatabaseConnectionClassConverter extends ClassConverter {
    @Override
    public Object convert(Object object) {
        MyDatabaseConnection connection = (MyDatabaseConnection) JsonUtils.convertValue(object, MyDatabaseConnection.class);
        // Handle oneOf auth types if needed
        return connection;
    }
}

Register in

ClassConverterFactory.java
:

Map.entry(MyDatabaseConnection.class, new MyDatabaseConnectionClassConverter())

Step 5: Apply UI Changes

Update ServiceUtils (

DatabaseServiceUtils.ts
):

import myDatabaseConnection from '../jsons/connectionSchemas/connections/database/myDatabaseConnection.json';

// In getDatabaseConfig switch:
case DatabaseServiceType.MyDatabase: {
    schema = myDatabaseConnection;
    break;
}

Create documentation at:

openmetadata-ui/.../public/locales/en-US/Database/MyDatabase.md

Step 6: Write Tests

# ingestion/tests/unit/source/database/test_mydatabase.py
import pytest
from metadata.ingestion.source.database.mydatabase.metadata import MydatabaseSource

def test_connection_url():
    connection = MyDatabaseConnection(
        hostPort="localhost:5432",
        username="user",
        password="pass",
        database="mydb",
    )
    url = get_connection_url(connection)
    assert url == "mydatabase+driver://user:pass@localhost:5432/mydb"

Step 7: Update Documentation

Create comprehensive docs following OpenMetadata patterns:

  • Connector overview
  • Prerequisites
  • Configuration steps
  • Troubleshooting

Type Generation

JSON Schema → Multi-Language Models

JSON Schema (source of truth)
    ↓
┌───────────────────────────────────────────┐
│                                           │
↓               ↓               ↓           ↓
Java          Python        TypeScript    (Others)
POJOs         Pydantic      Interfaces
              Models

Generation Commands

LanguageToolCommand
Javajsonschema2pojo
mvn clean install
Pythondatamodel-codegen
make generate
TypeScriptquicktype
./json2ts.sh <schema>

Generated Output Locations

LanguageOutput Directory
Java
openmetadata-spec/target/classes/org/openmetadata/schema/
Python
ingestion/src/metadata/generated/
TypeScript
openmetadata-ui/.../src/generated/

Testing

Python Tests

cd ingestion

# Install test dependencies
make install_test

# Run all tests with coverage
make coverage

# Run specific tests
pytest tests/unit/source/database/test_mydatabase.py -v

# Lint and format
make lint
make black
make isort

Java Tests

# Run all tests
mvn test

# Run specific test class
mvn test -Dtest=MyDatabaseConnectionTest

# Skip tests during build
mvn clean install -DskipTests

Integration Tests

Require running OpenMetadata server:

# Start server first
sh bin/openmetadata-server-start.sh conf/openmetadata.yaml

# Run integration tests
pytest tests/integration/ -v

Pre-commit Hooks

# Install hooks
make precommit_install

# Run manually
pre-commit run --all-files

Contribution Checklist

New Connector

  • JSON Schema defined with all required properties
  • Schema registered in service type file
  • Java/Python/TypeScript types generated
  • Python Source implemented
  • Java ClassConverter (if oneOf used)
  • UI ServiceUtils updated
  • UI documentation created
  • Unit tests written
  • Integration tests passing
  • Documentation updated
  • Pre-commit hooks passing
  • PR submitted with description

SDK Extension

  • JSON Schema updated/created
  • Types regenerated
  • Python/Java code implemented
  • Tests written
  • Documentation updated

References

SDK Documentation

Contributing

Source Code

Related Skills

  • meta-sdk-patterns-eng
    - Foundational SDK patterns
  • openmetadata-dev
    - Using OpenMetadata SDKs/APIs
  • openmetadata-ops
    - Administering OpenMetadata