Awesome-omni-skill database-engine-implementation
Guide for implementing database engines in IterableData. Use when adding support for new SQL or NoSQL databases, implementing DBDriver classes, or extending database capabilities.
install
source · Clone the upstream repo
git clone https://github.com/diegosouzapw/awesome-omni-skill
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/development/database-engine-implementation" ~/.claude/skills/diegosouzapw-awesome-omni-skill-database-engine-implementation && rm -rf "$T"
manifest:
skills/development/database-engine-implementation/SKILL.mdsource content
Database Engine Implementation Guide
Overview
Database engines provide read-only access to SQL and NoSQL databases as iterable data sources. They wrap database drivers to provide a unified
BaseIterable interface.
Architecture
- DBDriver (
) - Abstract base class for database driversiterable/db/base.py - DatabaseIterable (
) - Wrapper that makes DBDriver work as BaseIterableiterable/db/iterable.py - Driver Registry (
) - Registry for mapping engine names to driver classesiterable/db/__init__.py
Adding New Database Engines
Step-by-Step Process
- Create driver file:
iterable/db/<engine>.py - Implement DBDriver: Inherit from
inDBDriveriterable/db/base.py - Required methods:
,connect()
,iterate()
(close has default implementation)close() - Register driver: Call
inregister_driver()iterable/db/__init__.py - Add detection: Update
to recognize database URLsiterable/helpers/detect.py - Create tests:
or add to existing test filetests/test_db_engines.py - Update dependencies: Add optional dependency to
pyproject.toml - Update documentation: Add engine to docs
Implementation Pattern
from collections.abc import Iterator from typing import Any from ..types import Row from .base import DBDriver class NewEngineDriver(DBDriver): """Database driver for NewEngine. Supports streaming queries using batch processing. """ def __init__(self, source: str | Any, **kwargs: Any) -> None: """Initialize driver. Args: source: Connection string/URL or existing connection object **kwargs: Additional parameters: - query: Query string or table name - batch_size: Rows per batch (default: 10000) - on_error: Error handling policy ('raise', 'skip', 'warn') """ super().__init__(source, **kwargs) self._cursor: Any = None def connect(self) -> None: """Establish database connection. Raises: ImportError: If required driver library is not installed ConnectionError: If connection fails """ try: import database_library except ImportError: raise ImportError( "database-library is required. Install with: pip install database-library" ) from None # Handle existing connection object if hasattr(self.source, "cursor"): self.conn = self.source self._connected = True return # Parse connection string if not isinstance(self.source, str): raise ValueError("Source must be connection string or connection object") try: self.conn = database_library.connect(self.source, **self.kwargs.get("connect_args", {})) self._connected = True except Exception as e: self._connected = False raise ConnectionError(f"Failed to connect: {e}") from e def iterate(self) -> Iterator[Row]: """Return iterator of dict rows. Yields: dict: Database row as dictionary Raises: RuntimeError: If not connected """ if not self._connected: raise RuntimeError("Not connected. Call connect() first.") self._start_metrics() batch_size = self.kwargs.get("batch_size", 10000) try: query = self._build_query() cursor = self.conn.cursor() # Execute query with batching cursor.execute(query) while True: rows = cursor.fetchmany(batch_size) if not rows: break # Convert rows to dicts column_names = [desc[0] for desc in cursor.description] for row in rows: row_dict = dict(zip(column_names, row)) self._update_metrics(rows_read=1) yield row_dict except Exception as e: self._handle_error(e, "during iteration") if self._on_error == "raise": raise
Key Implementation Details
Connection Handling
- Support both connection strings and existing connection objects
- Parse connection strings appropriately (URL format)
- Handle connection errors gracefully with clear messages
- Set
on successself._connected = True
Query Building
- Accept
parameter (SQL query or table name)query - If table name provided, build SELECT query
- Support filtering, projection, sorting via kwargs
- Use parameterized queries to prevent SQL injection
Iteration
- Use batch processing (
parameter)batch_size - Convert database rows to dict objects
- Update metrics:
self._update_metrics(rows_read=1) - Handle errors according to
policyon_error - Start metrics:
before iterationself._start_metrics()
Error Handling
Three policies (set via
on_error kwarg):
- Raise exceptions (default)'raise'
- Skip problematic rows'skip'
- Warn and continue'warn'
Use
self._handle_error(error, context) for consistent handling.
Metrics Tracking
Automatic metrics available via
self.metrics:
- Number of rows readrows_read
- Bytes read (may be None)bytes_read
- Time elapsedelapsed_seconds
Update during iteration:
self._update_metrics(rows_read=count)
Registration
Register driver in
iterable/db/__init__.py:
from .newengine import NewEngineDriver register_driver("newengine", NewEngineDriver)
Format Detection
Update
iterable/helpers/detect.py to recognize database URLs:
def detect_file_type(filename, content=None): # Database URL detection if filename.startswith(("postgresql://", "postgres://")): return "database" if filename.startswith("newengine://"): return "database" # ... existing detection
Testing Requirements
Test Structure
import pytest from iterable.db.newengine import NewEngineDriver class TestNewEngineDriver: def test_connect(self): driver = NewEngineDriver("newengine://localhost/db") driver.connect() assert driver.is_connected driver.close() def test_iterate(self): driver = NewEngineDriver("newengine://localhost/db", query="SELECT * FROM table") driver.connect() rows = list(driver.iterate()) assert len(rows) > 0 assert isinstance(rows[0], dict) driver.close() def test_batch_processing(self): driver = NewEngineDriver("newengine://localhost/db", query="SELECT * FROM table", batch_size=100) driver.connect() # Verify batching works correctly driver.close() def test_error_handling(self): driver = NewEngineDriver("newengine://invalid", query="SELECT * FROM table", on_error="skip") # Test error handling policies
Test Coverage
- Connection with string and object sources
- Query building (table name vs SQL)
- Batch processing
- Error handling policies
- Metrics tracking
- Context manager support (
statement)with - Missing dependencies (should raise ImportError)
Dependencies
Optional Dependencies
Add to
pyproject.toml:
[project.optional-dependencies] newengine = ["database-library>=1.0.0"]
Import Handling
Handle missing dependencies:
try: import database_library except ImportError: raise ImportError( "newengine support requires 'database-library'. " "Install with: pip install iterabledata[newengine]" ) from None
Examples
Look at existing implementations:
- SQL database example (PostgreSQL)iterable/db/postgres.py
- NoSQL database example (MongoDB)iterable/db/mongo.py
- Columnar database exampleiterable/db/clickhouse.py
- Search engine exampleiterable/db/elasticsearch.py
Common Patterns
SQL Databases
- Use server-side cursors for memory efficiency
- Support read-only transactions
- Handle connection pooling
- Use parameterized queries
NoSQL Databases
- Support query filters and projections
- Handle pagination/cursors
- Support aggregation pipelines
- Batch document retrieval
Connection String Parsing
from urllib.parse import urlparse def parse_connection_string(conn_str: str) -> dict: parsed = urlparse(conn_str) return { "host": parsed.hostname, "port": parsed.port, "database": parsed.path.lstrip("/"), "username": parsed.username, "password": parsed.password, }
Common Pitfalls
- Memory issues: Always use batch processing, don't load all rows at once
- Connection leaks: Always close connections, use context managers
- Error handling: Implement all three error policies consistently
- Metrics: Update metrics during iteration, not after
- Reset support: Most databases don't support reset (set
)_reset_supported = False - Read-only: Default to read-only access for safety