Tony db
Design DuckDB schemas, write SQL queries, and perform analytical data operations using the DuckDB Python library. Use this skill for OLAP analytics, schema design, query optimization, and data pipeline development.
install
source · Clone the upstream repo
git clone https://github.com/jaydeland/Tony
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jaydeland/Tony "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/db" ~/.claude/skills/jaydeland-tony-db && rm -rf "$T"
manifest:
.claude/skills/db/skill.mdsource content
DuckDB Skill
Purpose: Design DuckDB schemas, write SQL queries, and perform analytical data operations
When to use: When user needs to work with DuckDB databases, design schemas, query data, or perform OLAP analytics
Quick Reference
Connect to DuckDB
import duckdb # In-memory database (default, fast for analytics) con = duckdb.connect() # Persistent database file con = duckdb.connect("analytics.db") # Read-only connection con = duckdb.connect("analytics.db", read_only=True) # With configuration con = duckdb.connect("analytics.db", config={ "threads": 4, "memory_limit": "2GB" })
Execute SQL Queries
# Basic execution con.execute("CREATE TABLE users (id INTEGER, name VARCHAR)") # Parameterized (safe from SQL injection) con.execute("INSERT INTO users VALUES (?, ?)", [1, 'Alice']) # Named parameters con.execute("SELECT * FROM users WHERE id = $id", {"id": 1}) # Fetch results result = con.execute("SELECT * FROM users").fetchall() df = con.execute("SELECT * FROM users").fetchdf() # as DataFrame
Module-Level Convenience (No Explicit Connection)
import duckdb duckdb.execute("CREATE TABLE data AS SELECT * FROM 'data.csv'") result = duckdb.sql("SELECT * FROM data").fetchdf()
Schema Design Patterns
Analytical Schema Principles
DuckDB excels at columnar storage for analytical workloads. Design schemas with this in mind:
- Denormalize for reads - Analytical queries scan many rows; join cost matters more than storage
- Use appropriate data types - Smaller types = better compression = faster scans
- Time-series first - Timestamp columns enable efficient time-range partitioning
Common Schema Templates
Event/Transaction Log Schema
CREATE TABLE events ( event_id UUID PRIMARY KEY, event_type VARCHAR NOT NULL, user_id UUID, timestamp TIMESTAMP NOT NULL, metadata JSON, amount DECIMAL(12,2), created_at TIMESTAMP DEFAULT current_timestamp ); -- Optimize for time-range queries CREATE INDEX idx_events_timestamp ON events(timestamp); CREATE INDEX idx_events_user ON events(user_id);
Time-Series Schema
CREATE TABLE metrics ( ts TIMESTAMP, metric_name VARCHAR, value DOUBLE, tags STRUCT(key VARCHAR, value VARCHAR) );
Star Schema (Fact + Dimensions)
-- Fact table (large, aggregated metrics) CREATE TABLE sales_facts ( sale_id BIGINT PRIMARY KEY, date_id INTEGER, product_id INTEGER, customer_id INTEGER, store_id INTEGER, quantity INTEGER, unit_price DECIMAL(10,2), total_amount DECIMAL(12,2) ); -- Dimension tables (smaller, descriptive) CREATE TABLE dim_products ( product_id INTEGER PRIMARY KEY, product_name VARCHAR, category VARCHAR, subcategory VARCHAR ); CREATE TABLE dim_customers ( customer_id INTEGER PRIMARY KEY, customer_name VARCHAR, segment VARCHAR, region VARCHAR );
Query Patterns
Aggregation Patterns
-- Moving average SELECT date, value, AVG(value) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS rolling_7d_avg FROM metrics; -- Cumulative sum SELECT date, amount, SUM(amount) OVER (ORDER BY date) AS cumulative_total FROM transactions; -- Percentile ranking SELECT category, amount, PERCENT_RANK() OVER (PARTITION BY category ORDER BY amount) AS pct_rank FROM sales;
Time-Series Analysis
-- Resample to different granularity SELECT date_trunc('week', timestamp) AS week, COUNT(*) AS event_count FROM events GROUP BY 1 ORDER BY 1; -- Gap detection (find missing time periods) WITH time_series AS ( SELECT generate_series AS ts FROM generate_series('2024-01-01'::TIMESTAMP, '2024-01-31'::TIMESTAMP, INTERVAL '1 day') ), events_per_day AS ( SELECT date_trunc('day', timestamp) AS day, COUNT(*) AS cnt FROM events GROUP BY 1 ) SELECT t.ts, COALESCE(e.cnt, 0) AS event_count FROM time_series t LEFT JOIN events_per_day e ON t.ts = e.day;
Window Functions
-- Row numbers, ranks within groups SELECT department, employee_name, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk, RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rank, DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank FROM employees; -- First/last value in group SELECT customer_id, MAX(created_at) AS last_purchase, FIRST_VALUE(product_name) OVER (PARTITION BY customer_id ORDER BY created_at) AS first_product FROM orders;
Performance Tips
-
Use Parquet for large datasets - DuckDB reads Parquet files directly with zero-copy speed
con.execute("SELECT * FROM 'data.parquet' WHERE date > '2024-01-01'") -
Filter early - Push predicates down
-- Good: Filter before join SELECT * FROM large_table WHERE date = '2024-01-01' JOIN small_table USING (id) -
Batch inserts - Use
for bulk insertsexecutemany()data = [(i, f'value_{i}') for i in range(10000)] con.executemany("INSERT INTO t VALUES (?, ?)", data) -
Use appropriate data types - VARCHAR(12) not VARCHAR(MAX), INTEGER not BIGINT unless needed
Pandas Integration
import pandas as pd import duckdb con = duckdb.connect() # Create DataFrame df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) # Query DataFrame with SQL con.register("my_table", df) result = con.execute("SELECT * FROM my_table WHERE a > 1").fetchdf() # Or use sql() chain method result = con.sql("SELECT * FROM df WHERE a > 1").order("a DESC").fetchdf()
Example: Complete Analytics Pipeline
import duckdb import pandas as pd con = duckdb.connect("analytics.db") # Create schema con.execute(""" CREATE TABLE IF NOT EXISTS orders ( order_id BIGINT PRIMARY KEY, customer_id INTEGER, order_date DATE, total_amount DECIMAL(10,2), status VARCHAR ) """) # Load from CSV con.execute("COPY orders FROM 'orders.csv' (AUTO_DETECT TRUE)") # Analytics query result = con.execute(""" SELECT DATE_TRUNC('month', order_date) AS month, COUNT(*) AS order_count, SUM(total_amount) AS revenue, AVG(total_amount) AS avg_order_value FROM orders WHERE status = 'completed' GROUP BY 1 ORDER BY 1 """).fetchdf() print(result)
Common Data Types
| DuckDB Type | Python Type | Use Case |
|---|---|---|
| INTEGER | int | Whole numbers |
| BIGINT | int | Large integers |
| DOUBLE | float | Decimal numbers |
| DECIMAL(p,s) | Decimal | Precise decimals (p=precision, s=scale) |
| VARCHAR | str | Text strings |
| BOOLEAN | bool | True/False |
| DATE | datetime.date | Calendar dates |
| TIMESTAMP | datetime.datetime | Date + time |
| JSON | str/dict | JSON data |
| ARRAY | list | Variable-length arrays |
| STRUCT | dict | Fixed-type key-value pairs |
| MAP | dict | Key-value pairs |
Constraints
-- Primary key (implicit NOT NULL + UNIQUE) CREATE TABLE t (id INTEGER PRIMARY KEY, name VARCHAR); -- Foreign key CREATE TABLE orders ( order_id INTEGER PRIMARY KEY, customer_id INTEGER REFERENCES customers(id) ); -- Check constraint CREATE TABLE products ( price DECIMAL(10,2) CHECK (price > 0), quantity INTEGER CHECK (quantity >= 0) ); -- Unique constraint CREATE TABLE users (email VARCHAR UNIQUE); -- Not null CREATE TABLE t (name VARCHAR NOT NULL);
Import/Export
# Import CSV con.execute("COPY table_name FROM 'file.csv' (AUTO_DETECT TRUE)") con.execute("COPY table_name FROM 'file.csv' (HEADER TRUE, DELIMITER ',')") # Export to CSV con.execute("COPY (SELECT * FROM table_name) TO 'output.csv' (HEADER TRUE)") # Import/Export Parquet (faster for large data) con.execute("COPY table_name FROM 'data.parquet'") con.execute("COPY table_name TO 'data.parquet'") # Direct reading (no import needed) result = con.execute("SELECT * FROM 'large_file.parquet'").fetchdf()