Tony db

Design DuckDB schemas, write SQL queries, and perform analytical data operations using the DuckDB Python library. Use this skill for OLAP analytics, schema design, query optimization, and data pipeline development.

install
source · Clone the upstream repo
git clone https://github.com/jaydeland/Tony
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jaydeland/Tony "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/db" ~/.claude/skills/jaydeland-tony-db && rm -rf "$T"
manifest: .claude/skills/db/skill.md
source content

DuckDB Skill

Purpose: Design DuckDB schemas, write SQL queries, and perform analytical data operations

When to use: When user needs to work with DuckDB databases, design schemas, query data, or perform OLAP analytics


Quick Reference

Connect to DuckDB

import duckdb

# In-memory database (default, fast for analytics)
con = duckdb.connect()

# Persistent database file
con = duckdb.connect("analytics.db")

# Read-only connection
con = duckdb.connect("analytics.db", read_only=True)

# With configuration
con = duckdb.connect("analytics.db", config={
    "threads": 4,
    "memory_limit": "2GB"
})

Execute SQL Queries

# Basic execution
con.execute("CREATE TABLE users (id INTEGER, name VARCHAR)")

# Parameterized (safe from SQL injection)
con.execute("INSERT INTO users VALUES (?, ?)", [1, 'Alice'])

# Named parameters
con.execute("SELECT * FROM users WHERE id = $id", {"id": 1})

# Fetch results
result = con.execute("SELECT * FROM users").fetchall()
df = con.execute("SELECT * FROM users").fetchdf()  # as DataFrame

Module-Level Convenience (No Explicit Connection)

import duckdb

duckdb.execute("CREATE TABLE data AS SELECT * FROM 'data.csv'")
result = duckdb.sql("SELECT * FROM data").fetchdf()

Schema Design Patterns

Analytical Schema Principles

DuckDB excels at columnar storage for analytical workloads. Design schemas with this in mind:

  1. Denormalize for reads - Analytical queries scan many rows; join cost matters more than storage
  2. Use appropriate data types - Smaller types = better compression = faster scans
  3. Time-series first - Timestamp columns enable efficient time-range partitioning

Common Schema Templates

Event/Transaction Log Schema

CREATE TABLE events (
    event_id UUID PRIMARY KEY,
    event_type VARCHAR NOT NULL,
    user_id UUID,
    timestamp TIMESTAMP NOT NULL,
    metadata JSON,
    amount DECIMAL(12,2),
    created_at TIMESTAMP DEFAULT current_timestamp
);

-- Optimize for time-range queries
CREATE INDEX idx_events_timestamp ON events(timestamp);
CREATE INDEX idx_events_user ON events(user_id);

Time-Series Schema

CREATE TABLE metrics (
    ts TIMESTAMP,
    metric_name VARCHAR,
    value DOUBLE,
    tags STRUCT(key VARCHAR, value VARCHAR)
);

Star Schema (Fact + Dimensions)

-- Fact table (large, aggregated metrics)
CREATE TABLE sales_facts (
    sale_id BIGINT PRIMARY KEY,
    date_id INTEGER,
    product_id INTEGER,
    customer_id INTEGER,
    store_id INTEGER,
    quantity INTEGER,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(12,2)
);

-- Dimension tables (smaller, descriptive)
CREATE TABLE dim_products (
    product_id INTEGER PRIMARY KEY,
    product_name VARCHAR,
    category VARCHAR,
    subcategory VARCHAR
);

CREATE TABLE dim_customers (
    customer_id INTEGER PRIMARY KEY,
    customer_name VARCHAR,
    segment VARCHAR,
    region VARCHAR
);

Query Patterns

Aggregation Patterns

-- Moving average
SELECT
    date,
    value,
    AVG(value) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS rolling_7d_avg
FROM metrics;

-- Cumulative sum
SELECT
    date,
    amount,
    SUM(amount) OVER (ORDER BY date) AS cumulative_total
FROM transactions;

-- Percentile ranking
SELECT
    category,
    amount,
    PERCENT_RANK() OVER (PARTITION BY category ORDER BY amount) AS pct_rank
FROM sales;

Time-Series Analysis

-- Resample to different granularity
SELECT
    date_trunc('week', timestamp) AS week,
    COUNT(*) AS event_count
FROM events
GROUP BY 1
ORDER BY 1;

-- Gap detection (find missing time periods)
WITH time_series AS (
    SELECT generate_series AS ts
    FROM generate_series('2024-01-01'::TIMESTAMP, '2024-01-31'::TIMESTAMP, INTERVAL '1 day')
),
events_per_day AS (
    SELECT date_trunc('day', timestamp) AS day, COUNT(*) AS cnt
    FROM events
    GROUP BY 1
)
SELECT t.ts, COALESCE(e.cnt, 0) AS event_count
FROM time_series t
LEFT JOIN events_per_day e ON t.ts = e.day;

Window Functions

-- Row numbers, ranks within groups
SELECT
    department,
    employee_name,
    salary,
    ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rank,
    DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank
FROM employees;

-- First/last value in group
SELECT
    customer_id,
    MAX(created_at) AS last_purchase,
    FIRST_VALUE(product_name) OVER (PARTITION BY customer_id ORDER BY created_at) AS first_product
FROM orders;

Performance Tips

  1. Use Parquet for large datasets - DuckDB reads Parquet files directly with zero-copy speed

    con.execute("SELECT * FROM 'data.parquet' WHERE date > '2024-01-01'")
    
  2. Filter early - Push predicates down

    -- Good: Filter before join
    SELECT * FROM large_table WHERE date = '2024-01-01'
    JOIN small_table USING (id)
    
  3. Batch inserts - Use

    executemany()
    for bulk inserts

    data = [(i, f'value_{i}') for i in range(10000)]
    con.executemany("INSERT INTO t VALUES (?, ?)", data)
    
  4. Use appropriate data types - VARCHAR(12) not VARCHAR(MAX), INTEGER not BIGINT unless needed


Pandas Integration

import pandas as pd
import duckdb

con = duckdb.connect()

# Create DataFrame
df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})

# Query DataFrame with SQL
con.register("my_table", df)
result = con.execute("SELECT * FROM my_table WHERE a > 1").fetchdf()

# Or use sql() chain method
result = con.sql("SELECT * FROM df WHERE a > 1").order("a DESC").fetchdf()

Example: Complete Analytics Pipeline

import duckdb
import pandas as pd

con = duckdb.connect("analytics.db")

# Create schema
con.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        order_id BIGINT PRIMARY KEY,
        customer_id INTEGER,
        order_date DATE,
        total_amount DECIMAL(10,2),
        status VARCHAR
    )
""")

# Load from CSV
con.execute("COPY orders FROM 'orders.csv' (AUTO_DETECT TRUE)")

# Analytics query
result = con.execute("""
    SELECT
        DATE_TRUNC('month', order_date) AS month,
        COUNT(*) AS order_count,
        SUM(total_amount) AS revenue,
        AVG(total_amount) AS avg_order_value
    FROM orders
    WHERE status = 'completed'
    GROUP BY 1
    ORDER BY 1
""").fetchdf()

print(result)

Common Data Types

DuckDB TypePython TypeUse Case
INTEGERintWhole numbers
BIGINTintLarge integers
DOUBLEfloatDecimal numbers
DECIMAL(p,s)DecimalPrecise decimals (p=precision, s=scale)
VARCHARstrText strings
BOOLEANboolTrue/False
DATEdatetime.dateCalendar dates
TIMESTAMPdatetime.datetimeDate + time
JSONstr/dictJSON data
ARRAYlistVariable-length arrays
STRUCTdictFixed-type key-value pairs
MAPdictKey-value pairs

Constraints

-- Primary key (implicit NOT NULL + UNIQUE)
CREATE TABLE t (id INTEGER PRIMARY KEY, name VARCHAR);

-- Foreign key
CREATE TABLE orders (
    order_id INTEGER PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id)
);

-- Check constraint
CREATE TABLE products (
    price DECIMAL(10,2) CHECK (price > 0),
    quantity INTEGER CHECK (quantity >= 0)
);

-- Unique constraint
CREATE TABLE users (email VARCHAR UNIQUE);

-- Not null
CREATE TABLE t (name VARCHAR NOT NULL);

Import/Export

# Import CSV
con.execute("COPY table_name FROM 'file.csv' (AUTO_DETECT TRUE)")
con.execute("COPY table_name FROM 'file.csv' (HEADER TRUE, DELIMITER ',')")

# Export to CSV
con.execute("COPY (SELECT * FROM table_name) TO 'output.csv' (HEADER TRUE)")

# Import/Export Parquet (faster for large data)
con.execute("COPY table_name FROM 'data.parquet'")
con.execute("COPY table_name TO 'data.parquet'")

# Direct reading (no import needed)
result = con.execute("SELECT * FROM 'large_file.parquet'").fetchdf()