Claude-skill-registry core-api-reference
Use when implementing pgdbm database operations - provides complete AsyncDatabaseManager and DatabaseConfig API with all methods and parameters
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/core-api-reference" ~/.claude/skills/majiayu000-claude-skill-registry-core-api-reference && rm -rf "$T"
manifest:
skills/data/core-api-reference/SKILL.mdsource content
pgdbm Core API Reference
Overview
Complete API reference for AsyncDatabaseManager, DatabaseConfig, and TransactionManager.
All signatures, parameters, return types, and usage examples. No documentation lookup needed.
AsyncDatabaseManager
Initialization
# Pattern 1: Create own pool AsyncDatabaseManager(config: DatabaseConfig) # Pattern 2: Use external pool AsyncDatabaseManager( pool: asyncpg.Pool, schema: Optional[str] = None )
Rules:
- Cannot provide both
andconfigpool
only valid with external poolschema- Must call
if using configconnect() - Never call
if using external poolconnect()
Connection Lifecycle
# Create shared pool (class method) pool = await AsyncDatabaseManager.create_shared_pool(config: DatabaseConfig) -> asyncpg.Pool # Connect (only for config-based init) await db.connect() -> None # Raises PoolError if using external pool # Disconnect (only for config-based init) await db.disconnect() -> None # Does nothing if using external pool
Query Methods
All methods automatically apply
{{tables.}} template substitution.
# Execute without return await db.execute( query: str, *args: Any, timeout: Optional[float] = None ) -> str # Returns: asyncpg status string like "INSERT 0 1" # Execute and return generated ID await db.execute_and_return_id( query: str, *args: Any ) -> Any # Automatically appends RETURNING id if not present # Returns: The id value # Fetch single value await db.fetch_value( query: str, *args: Any, column: int = 0, timeout: Optional[float] = None ) -> Any # Returns: Single value from result (or None) # Fetch single row await db.fetch_one( query: str, *args: Any, timeout: Optional[float] = None ) -> Optional[dict[str, Any]] # Returns: Dictionary of column->value (or None if no results) # Fetch all rows await db.fetch_all( query: str, *args: Any, timeout: Optional[float] = None ) -> list[dict[str, Any]] # Returns: List of dictionaries # Batch execute (multiple parameter sets) await db.executemany( query: str, args_list: list[tuple] ) -> None # Executes same query with different parameter sets # More efficient than looping execute()
Examples:
# execute_and_return_id - Common for inserts user_id = await db.execute_and_return_id( "INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)", "alice@example.com", "Alice" ) # Automatically becomes: ... RETURNING id # fetch_value with column parameter email = await db.fetch_value( "SELECT email, name FROM {{tables.users}} WHERE id = $1", user_id, column=0 # Get first column (email) ) # executemany for batch inserts users = [ ("alice@example.com", "Alice"), ("bob@example.com", "Bob"), ("charlie@example.com", "Charlie"), ] await db.executemany( "INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)", users )
Bulk Operations
# Copy records (MUCH faster than INSERT for bulk data) await db.copy_records_to_table( table_name: str, records: list[tuple], columns: Optional[list[str]] = None ) -> int # Uses PostgreSQL COPY command # Returns: Number of records copied # Example records = [ ("alice@example.com", "Alice"), ("bob@example.com", "Bob"), ] count = await db.copy_records_to_table( "users", # Don't use {{tables.}} here - just table name records=records, columns=["email", "name"] ) # Returns: 2
Pydantic Integration
from pydantic import BaseModel class User(BaseModel): id: int email: str name: str # Fetch single row as model user = await db.fetch_as_model( User, query: str, *args: Any, timeout: Optional[float] = None ) -> Optional[User] # Fetch all rows as models users = await db.fetch_all_as_model( User, query: str, *args: Any, timeout: Optional[float] = None ) -> list[User] # Example user = await db.fetch_as_model( User, "SELECT * FROM {{tables.users}} WHERE id = $1", user_id ) # Returns: User(id=1, email="alice@example.com", name="Alice")
Schema Operations
# Check if table exists exists = await db.table_exists(table_name: str) -> bool # Examples exists = await db.table_exists("users") # Check in current schema exists = await db.table_exists("other_schema.users") # Check in specific schema
Transaction Management
# Create transaction context async with db.transaction() as tx: # tx has same API as db (execute, fetch_one, fetch_all, etc.) user_id = await tx.fetch_value( "INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id", email ) await tx.execute( "INSERT INTO {{tables.profiles}} (user_id) VALUES ($1)", user_id ) # Auto-commits on success, rolls back on exception # Nested transactions (savepoints) async with db.transaction() as tx: await tx.execute("INSERT INTO {{tables.users}} ...") async with tx.transaction() as nested: await nested.execute("UPDATE {{tables.users}} ...") # Nested transaction uses SAVEPOINT
Monitoring and Performance
# Get pool statistics stats = await db.get_pool_stats() -> dict[str, Any] # Returns: { # "status": "connected", # "min_size": 10, # "max_size": 50, # "size": 15, # Current total connections # "free_size": 10, # Idle connections # "used_size": 5, # Active connections # "database": "myapp", # "schema": "myschema", # "pid": 12345, # "version": "PostgreSQL 15.3" # } # Add prepared statement (performance optimization) db.add_prepared_statement( name: str, query: str ) -> None # Prepared statements created on all connections in pool # Improves performance for frequently-used queries
Advanced Operations
# Acquire connection directly (advanced) async with db.acquire() as conn: # conn is raw asyncpg connection # Use for operations not covered by AsyncDatabaseManager await conn.execute("...")
DatabaseConfig
Complete Parameter Reference
from pgdbm import DatabaseConfig config = DatabaseConfig( # Connection (either connection_string OR individual params) connection_string: Optional[str] = None, # e.g., "postgresql://user:pass@host/db" host: str = "localhost", port: int = 5432, database: str = "postgres", user: str = "postgres", password: Optional[str] = None, schema: Optional[str] = None, # Alias: schema_name # Connection Pool min_connections: int = 10, max_connections: int = 20, max_queries: int = 50000, # Queries per connection before recycling max_inactive_connection_lifetime: float = 300.0, # Seconds command_timeout: float = 60.0, # Default query timeout (seconds) # Connection Initialization server_settings: Optional[dict[str, str]] = None, # PostgreSQL settings init_commands: Optional[list[str]] = None, # Run on each connection # TLS/SSL Configuration ssl_enabled: bool = False, ssl_mode: Optional[str] = None, # 'require', 'verify-ca', 'verify-full' ssl_ca_file: Optional[str] = None, # Path to CA certificate ssl_cert_file: Optional[str] = None, # Path to client certificate ssl_key_file: Optional[str] = None, # Path to client key ssl_key_password: Optional[str] = None, # Key password if encrypted # Server-Side Timeouts (milliseconds, None to disable) statement_timeout_ms: Optional[int] = 60000, # Abort long queries idle_in_transaction_session_timeout_ms: Optional[int] = 60000, # Abort idle transactions lock_timeout_ms: Optional[int] = 5000, # Abort lock waits # Retry Configuration retry_attempts: int = 3, retry_delay: float = 1.0, # Initial delay (seconds) retry_backoff: float = 2.0, # Exponential backoff multiplier retry_max_delay: float = 30.0, # Maximum delay (seconds) )
Common Configurations
Development:
config = DatabaseConfig( connection_string="postgresql://localhost/myapp_dev", min_connections=2, max_connections=10, )
Production with TLS:
config = DatabaseConfig( connection_string="postgresql://db.example.com/myapp", min_connections=20, max_connections=100, ssl_enabled=True, ssl_mode="verify-full", ssl_ca_file="/etc/ssl/certs/ca.pem", statement_timeout_ms=30000, # 30 second timeout lock_timeout_ms=5000, # 5 second lock timeout )
Custom initialization:
config = DatabaseConfig( connection_string="postgresql://localhost/myapp", init_commands=[ "SET timezone TO 'UTC'", "SET statement_timeout TO '30s'", ], server_settings={ "jit": "off", # Disable JIT compilation "application_name": "myapp", }, )
TransactionManager
Same API as AsyncDatabaseManager but within transaction context:
async with db.transaction() as tx: # All methods available await tx.execute(query, *args, timeout=None) -> str await tx.executemany(query, args_list) -> None await tx.fetch_one(query, *args, timeout=None) -> Optional[dict] await tx.fetch_all(query, *args, timeout=None) -> list[dict] await tx.fetch_value(query, *args, column=0, timeout=None) -> Any # Nested transactions (savepoints) async with tx.transaction() as nested_tx: ... # Access underlying connection conn = tx.connection # Property, not method
Complete Method Summary
AsyncDatabaseManager - All Methods
| Method | Parameters | Returns | Use Case |
|---|---|---|---|
| query, *args, timeout | str | No results needed |
| query, *args | Any | INSERT with auto RETURNING id |
| query, args_list | None | Batch execute same query |
| query, *args, column, timeout | Any | Single value |
| query, *args, timeout | dict|None | Single row |
| query, *args, timeout | list[dict] | Multiple rows |
| model, query, *args, timeout | Model|None | Single row as Pydantic |
| model, query, *args, timeout | list[Model] | Rows as Pydantic |
| table, records, columns | int | Bulk COPY (fast) |
| table_name | bool | Schema checking |
| - | TransactionManager | Transaction context |
| - | dict | Pool monitoring |
| name, query | None | Performance optimization |
| - | Connection | Advanced: raw connection |
| - | None | Initialize pool (config-based only) |
| - | None | Close pool (config-based only) |
| config | asyncpg.Pool | Class method: create shared pool |
Compatibility aliases
→fetch_val(...)fetch_value(...)
→execute_many(...)executemany(...)
TransactionManager - All Methods
| Method | Parameters | Returns |
|---|---|---|
| query, *args, timeout | str |
| query, args_list | None |
| query, *args, column, timeout | Any |
| query, *args, timeout | dict|None |
| query, *args, timeout | list[dict] |
| - | TransactionManager (nested) |
| - | Connection (property) |
Note: TransactionManager does NOT have:
- execute_and_return_id
- copy_records_to_table
- fetch_as_model
- table_exists
- Pool management methods
Use regular fetch_value for IDs within transactions.
Template Syntax
All query methods support template substitution:
# Available templates {{tables.tablename}} # → "schema".tablename (or tablename if no schema) {{schema}} # → "schema" (or empty) # Example query = "SELECT * FROM {{tables.users}} WHERE created_at > $1" # With schema="myapp" # Becomes: SELECT * FROM "myapp".users WHERE created_at > $1 # Without schema # Becomes: SELECT * FROM users WHERE created_at > $1
Usage Examples
Basic Queries
# Insert and get ID user_id = await db.execute_and_return_id( "INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)", "alice@example.com", "Alice" ) # Fetch single value count = await db.fetch_value( "SELECT COUNT(*) FROM {{tables.users}}" ) # Fetch with specific column email = await db.fetch_value( "SELECT email, name FROM {{tables.users}} WHERE id = $1", user_id, column=0 # Get email (first column) ) # Fetch one row user = await db.fetch_one( "SELECT * FROM {{tables.users}} WHERE id = $1", user_id ) # user = {"id": 1, "email": "...", "name": "..."} # Fetch all rows users = await db.fetch_all( "SELECT * FROM {{tables.users}} WHERE is_active = $1", True ) # users = [{"id": 1, ...}, {"id": 2, ...}] # Execute without results await db.execute( "DELETE FROM {{tables.users}} WHERE id = $1", user_id ) # Check table exists if await db.table_exists("users"): print("Users table exists")
Batch Operations
# executemany - same query, different params users = [ ("alice@example.com", "Alice"), ("bob@example.com", "Bob"), ("charlie@example.com", "Charlie"), ] await db.executemany( "INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)", users ) # copy_records_to_table - fastest for bulk data records = [ ("alice@example.com", "Alice"), ("bob@example.com", "Bob"), # ... thousands more ] count = await db.copy_records_to_table( "users", # Just table name (template applied internally) records=records, columns=["email", "name"] ) # Much faster than executemany for >1000 rows
Pydantic Models
from pydantic import BaseModel class User(BaseModel): id: int email: str name: str is_active: bool = True # Fetch as model user = await db.fetch_as_model( User, "SELECT * FROM {{tables.users}} WHERE id = $1", user_id ) # user is User instance (typed) # Fetch all as models users = await db.fetch_all_as_model( User, "SELECT * FROM {{tables.users}} WHERE is_active = $1", True ) # users is list[User] (typed)
Transactions
# Basic transaction async with db.transaction() as tx: user_id = await tx.fetch_value( "INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id", email ) await tx.execute( "INSERT INTO {{tables.profiles}} (user_id, bio) VALUES ($1, $2)", user_id, "Bio text" ) # Commits on success, rolls back on exception # Nested transaction (savepoint) async with db.transaction() as tx: await tx.execute("INSERT INTO {{tables.users}} ...") try: async with tx.transaction() as nested: await nested.execute("UPDATE {{tables.users}} SET risky_field = $1", value) # This can rollback without affecting outer transaction except Exception: # Nested rolled back, outer transaction continues pass
Monitoring
# Get pool statistics stats = await db.get_pool_stats() print(f"Total connections: {stats['size']}") print(f"Active: {stats['used_size']}") print(f"Idle: {stats['free_size']}") print(f"Usage: {stats['used_size'] / stats['size']:.1%}") # Monitor pool health usage = stats['used_size'] / stats['size'] if usage > 0.8: logger.warning(f"High pool usage: {usage:.1%}")
Prepared Statements
# Add frequently-used query as prepared statement db.add_prepared_statement( "get_user_by_email", "SELECT * FROM {{tables.users}} WHERE email = $1" ) # Prepared statements are created on all pool connections # Improves performance for queries executed repeatedly
DatabaseConfig Complete Reference
Connection Parameters
# Use connection_string (recommended) config = DatabaseConfig( connection_string="postgresql://user:pass@host:port/database" ) # OR use individual parameters config = DatabaseConfig( host="localhost", port=5432, database="myapp", user="postgres", password="secret", schema="myschema", # Optional schema )
Pool Configuration
config = DatabaseConfig( connection_string="...", # Pool sizing min_connections=10, # Minimum idle connections max_connections=50, # Maximum total connections # Connection lifecycle max_queries=50000, # Queries before recycling connection max_inactive_connection_lifetime=300.0, # Seconds before closing idle command_timeout=60.0, # Default query timeout (seconds) )
SSL/TLS Configuration
config = DatabaseConfig( connection_string="postgresql://db.example.com/myapp", # Enable SSL ssl_enabled=True, ssl_mode="verify-full", # 'require', 'verify-ca', 'verify-full' # Certificate files ssl_ca_file="/etc/ssl/certs/ca.pem", ssl_cert_file="/etc/ssl/certs/client.crt", # For mutual TLS ssl_key_file="/etc/ssl/private/client.key", ssl_key_password="keypass", # If key is encrypted )
SSL Modes:
: Encrypt connection (don't verify certificate)require
: Verify certificate is signed by trusted CAverify-ca
: Verify certificate AND hostname matchverify-full
Server-Side Timeouts
Prevent runaway queries and stuck transactions:
config = DatabaseConfig( connection_string="...", # Timeouts in milliseconds (None to disable) statement_timeout_ms=30000, # Abort queries >30 seconds idle_in_transaction_session_timeout_ms=60000, # Abort idle transactions >1 minute lock_timeout_ms=5000, # Abort lock waits >5 seconds )
Default values:
: 60000 (60 seconds)statement_timeout_ms
: 60000idle_in_transaction_session_timeout_ms
: 5000lock_timeout_ms
Set to
None to disable.
Connection Initialization
config = DatabaseConfig( connection_string="...", # Custom server settings server_settings={ "jit": "off", # Disable JIT (prevents latency spikes) "application_name": "myapp", "timezone": "UTC", }, # Commands run on each new connection init_commands=[ "SET timezone TO 'UTC'", "SET work_mem TO '256MB'", ], )
Retry Configuration
config = DatabaseConfig( connection_string="...", # Connection retry settings retry_attempts=3, # Number of retries retry_delay=1.0, # Initial delay (seconds) retry_backoff=2.0, # Exponential backoff multiplier retry_max_delay=30.0, # Maximum delay between retries )
Related Skills
- For patterns:
,pgdbm:using-pgdbmpgdbm:choosing-pattern - For migrations:
pgdbm:migrations-api-reference - For testing:
pgdbm:testing-database-code