Learn-skills.dev ohlcv-processing
Market data preparation including OHLCV resampling, gap handling, anomaly detection, normalization, and multi-source merging
git clone https://github.com/NeverSight/learn-skills.dev
T=$(mktemp -d) && git clone --depth=1 https://github.com/NeverSight/learn-skills.dev "$T" && mkdir -p ~/.claude/skills && cp -r "$T/data/skills-md/agiprolabs/claude-trading-skills/ohlcv-processing" ~/.claude/skills/neversight-learn-skills-dev-ohlcv-processing && rm -rf "$T"
data/skills-md/agiprolabs/claude-trading-skills/ohlcv-processing/SKILL.mdOHLCV Processing — Market Data Preparation
Clean, consistent OHLCV data is the foundation of every trading analysis. Garbage in, garbage out — a single anomalous candle can trigger false signals, corrupt indicator calculations, and produce misleading backtest results. This skill covers the full data preparation pipeline: validation, cleaning, resampling, normalization, and multi-source merging.
Why this matters: Crypto OHLCV data is messier than traditional markets. 24/7 trading means no official close, DEX aggregators disagree on prices, low-liquidity tokens produce impossible candles, and API outages create gaps. Every analysis workflow should start with this pipeline.
Quick Start
1. Install Dependencies
uv pip install pandas numpy httpx
2. Standard OHLCV DataFrame Format
All processing functions expect this canonical format:
import pandas as pd # Canonical OHLCV DataFrame # - DatetimeIndex in UTC # - Columns: open, high, low, close, volume (lowercase) # - Sorted ascending by timestamp # - No duplicate timestamps df = pd.DataFrame({ "open": [1.10, 1.12, 1.11], "high": [1.15, 1.14, 1.13], "low": [1.08, 1.10, 1.09], "close": [1.12, 1.11, 1.12], "volume": [50000, 48000, 52000], }, index=pd.to_datetime([ "2025-01-01 00:00:00", "2025-01-01 00:01:00", "2025-01-01 00:02:00", ], utc=True)) df.index.name = "timestamp"
3. Full Processing Pipeline
import pandas as pd import numpy as np def process_ohlcv(df: pd.DataFrame) -> pd.DataFrame: """Run complete OHLCV processing pipeline.""" df = standardize_columns(df) df = validate_ohlcv(df) df = handle_gaps(df, method="ffill") df = detect_and_flag_anomalies(df) return df
Data Validation
Column Checks
REQUIRED_COLUMNS = {"open", "high", "low", "close", "volume"} def standardize_columns(df: pd.DataFrame) -> pd.DataFrame: """Normalize column names to lowercase standard.""" df.columns = df.columns.str.lower().str.strip() # Common renames rename_map = {"vol": "volume", "v": "volume", "o": "open", "h": "high", "l": "low", "c": "close"} df = df.rename(columns=rename_map) missing = REQUIRED_COLUMNS - set(df.columns) if missing: raise ValueError(f"Missing columns: {missing}") return df[["open", "high", "low", "close", "volume"]]
Structural Validation
def validate_ohlcv(df: pd.DataFrame) -> pd.DataFrame: """Validate OHLCV structural integrity.""" # Ensure DatetimeIndex in UTC if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index, utc=True) if df.index.tz is None: df.index = df.index.tz_localize("UTC") # Sort and deduplicate df = df.sort_index() dupes = df.index.duplicated(keep="last") if dupes.any(): print(f"Warning: Removed {dupes.sum()} duplicate timestamps") df = df[~dupes] # Type enforcement for col in ["open", "high", "low", "close", "volume"]: df[col] = pd.to_numeric(df[col], errors="coerce") return df
Impossible Candle Detection
def find_impossible_candles(df: pd.DataFrame) -> pd.DataFrame: """Find candles that violate OHLC constraints.""" issues = pd.DataFrame(index=df.index) issues["high_lt_low"] = df["high"] < df["low"] issues["high_lt_open"] = df["high"] < df["open"] issues["high_lt_close"] = df["high"] < df["close"] issues["low_gt_open"] = df["low"] > df["open"] issues["low_gt_close"] = df["low"] > df["close"] issues["negative_price"] = (df[["open", "high", "low", "close"]] < 0).any(axis=1) issues["negative_volume"] = df["volume"] < 0 issues["any_issue"] = issues.any(axis=1) return issues[issues["any_issue"]]
Gap Handling
Crypto trades 24/7, but gaps still occur from API outages, low liquidity, or aggregator downtime.
Detect Gaps
def detect_gaps(df: pd.DataFrame, expected_freq: str = "1min") -> pd.Series: """Find missing timestamps based on expected frequency.""" full_index = pd.date_range( start=df.index.min(), end=df.index.max(), freq=expected_freq, tz="UTC" ) missing = full_index.difference(df.index) return missing
Fill Gaps
def handle_gaps( df: pd.DataFrame, freq: str = "1min", method: str = "ffill", max_gap: int = 5, ) -> pd.DataFrame: """Fill gaps in OHLCV data. Args: df: OHLCV DataFrame with DatetimeIndex. freq: Expected bar frequency. method: 'ffill' (forward fill) or 'interpolate'. max_gap: Maximum consecutive bars to fill. Larger gaps are left as NaN. """ full_index = pd.date_range( start=df.index.min(), end=df.index.max(), freq=freq, tz="UTC" ) df = df.reindex(full_index) df.index.name = "timestamp" # Mark which bars were filled df["is_filled"] = df["close"].isna() if method == "ffill": # Forward fill OHLC (flat candle), zero volume df[["open", "high", "low", "close"]] = ( df[["open", "high", "low", "close"]].ffill(limit=max_gap) ) df["volume"] = df["volume"].fillna(0) elif method == "interpolate": df[["open", "high", "low", "close"]] = ( df[["open", "high", "low", "close"]].interpolate( method="time", limit=max_gap ) ) df["volume"] = df["volume"].fillna(0) return df
Anomaly Detection
See
references/data_quality.md for the complete anomaly taxonomy.
Price Spike Detection
def detect_price_spikes( df: pd.DataFrame, window: int = 20, threshold: float = 3.0 ) -> pd.Series: """Flag bars where return exceeds threshold * rolling std.""" returns = df["close"].pct_change() rolling_std = returns.rolling(window, min_periods=5).std() spike = returns.abs() > (threshold * rolling_std) return spike.fillna(False)
Zero Volume Detection
def detect_zero_volume(df: pd.DataFrame, min_volume: float = 0) -> pd.Series: """Flag bars with zero or below-minimum volume.""" return df["volume"] <= min_volume
Composite Anomaly Flagging
def flag_anomalies(df: pd.DataFrame) -> pd.DataFrame: """Add anomaly flag columns to DataFrame.""" df["anomaly_spike"] = detect_price_spikes(df) df["anomaly_zero_vol"] = detect_zero_volume(df) impossible = find_impossible_candles(df) df["anomaly_impossible"] = False if not impossible.empty: df.loc[impossible.index, "anomaly_impossible"] = True df["anomaly_any"] = ( df["anomaly_spike"] | df["anomaly_zero_vol"] | df["anomaly_impossible"] ) return df
Resampling
See
references/resampling_guide.md for detailed guidance.
Standard Resample
OHLCV_RESAMPLE_RULES = { "open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum", } def resample_ohlcv(df: pd.DataFrame, target_freq: str) -> pd.DataFrame: """Resample OHLCV to a coarser timeframe. Args: df: OHLCV DataFrame (must be finer than target_freq). target_freq: Pandas frequency string ('5min', '15min', '1h', '4h', '1D'). Returns: Resampled OHLCV DataFrame with no NaN rows. """ ohlcv_cols = ["open", "high", "low", "close", "volume"] resampled = df[ohlcv_cols].resample(target_freq).agg(OHLCV_RESAMPLE_RULES) return resampled.dropna(subset=["close"])
Common Timeframe Ladder
TIMEFRAME_LADDER = ["1min", "5min", "15min", "1h", "4h", "1D"] def resample_ladder(df: pd.DataFrame) -> dict[str, pd.DataFrame]: """Resample 1-minute data to all standard timeframes.""" results = {"1min": df.copy()} for tf in TIMEFRAME_LADDER[1:]: results[tf] = resample_ohlcv(df, tf) return results
VWAP Calculation
def compute_vwap(df: pd.DataFrame) -> pd.Series: """Compute cumulative VWAP over the DataFrame.""" typical_price = (df["high"] + df["low"] + df["close"]) / 3 cum_vol = df["volume"].cumsum() cum_tp_vol = (typical_price * df["volume"]).cumsum() return cum_tp_vol / cum_vol
Normalization
def normalize_prices( df: pd.DataFrame, method: str = "returns" ) -> pd.DataFrame: """Normalize OHLCV price columns. Methods: 'returns' — Percentage returns (close-to-close). 'log_returns' — Log returns. 'minmax' — Min-max scale to [0, 1]. 'zscore' — Z-score normalization. """ price_cols = ["open", "high", "low", "close"] result = df.copy() if method == "returns": for col in price_cols: result[f"{col}_ret"] = result[col].pct_change() elif method == "log_returns": for col in price_cols: result[f"{col}_logret"] = np.log(result[col] / result[col].shift(1)) elif method == "minmax": for col in price_cols: cmin, cmax = result[col].min(), result[col].max() result[f"{col}_norm"] = (result[col] - cmin) / (cmax - cmin) elif method == "zscore": for col in price_cols: result[f"{col}_z"] = ( (result[col] - result[col].mean()) / result[col].std() ) return result
Multi-Source Merging
When combining data from multiple sources (e.g., Birdeye + DexScreener), timestamps may not align and prices may differ due to different DEX aggregation.
def merge_ohlcv_sources( primary: pd.DataFrame, secondary: pd.DataFrame, tolerance: str = "30s", ) -> pd.DataFrame: """Merge two OHLCV sources, preferring the higher-volume source per bar. Args: primary: First OHLCV source. secondary: Second OHLCV source. tolerance: Maximum time difference for alignment. """ merged = pd.merge_asof( primary.sort_index(), secondary.sort_index(), left_index=True, right_index=True, tolerance=pd.Timedelta(tolerance), suffixes=("_pri", "_sec"), ) # Use higher-volume source per bar use_secondary = merged["volume_sec"] > merged["volume_pri"] for col in ["open", "high", "low", "close", "volume"]: merged[col] = np.where( use_secondary, merged[f"{col}_sec"], merged[f"{col}_pri"] ) merged["source"] = np.where(use_secondary, "secondary", "primary") return merged[["open", "high", "low", "close", "volume", "source"]]
Timezone Handling
Standard: Always store and process in UTC. Convert only for display.
def ensure_utc(df: pd.DataFrame) -> pd.DataFrame: """Ensure DatetimeIndex is UTC.""" if df.index.tz is None: df.index = df.index.tz_localize("UTC") elif str(df.index.tz) != "UTC": df.index = df.index.tz_convert("UTC") return df
Data Quality Report
def quality_report(df: pd.DataFrame) -> dict: """Generate a data quality summary.""" total = len(df) return { "total_bars": total, "date_range": f"{df.index.min()} → {df.index.max()}", "missing_values": int(df[["open", "high", "low", "close"]].isna().sum().sum()), "zero_volume_bars": int((df["volume"] == 0).sum()), "impossible_candles": int((df["high"] < df["low"]).sum()), "duplicate_timestamps": int(df.index.duplicated().sum()), "negative_prices": int((df[["open", "high", "low", "close"]] < 0).any(axis=1).sum()), "completeness_pct": round((1 - df["close"].isna().mean()) * 100, 2), }
Files
References
— Anomaly types, detection methods, correction strategies, crypto-specific data issuesreferences/data_quality.md
— Resample rules, timeframe use cases, partial bar handling, VWAP resampling, multi-timeframe alignmentreferences/resampling_guide.md
Scripts
— Full processing pipeline: validate, clean, resample, normalize with anomaly reporting (run withscripts/process_ohlcv.py
for synthetic data)--demo
— Multi-source OHLCV merging with conflict resolution and discrepancy reporting (run withscripts/merge_sources.py
)--demo