Claude-skill-registry Data Freshness and Latency
Monitoring and optimizing how quickly data flows through pipelines and ensuring it meets timeliness requirements.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/freshness-latency" ~/.claude/skills/majiayu000-claude-skill-registry-data-freshness-and-latency && rm -rf "$T"
manifest:
skills/data/freshness-latency/SKILL.mdsource content
Data Freshness and Latency
Overview
Data Freshness measures how current the data is (age), while Latency measures how long it takes for data to flow through the pipeline (processing time). Both are critical for real-time analytics, operational dashboards, and time-sensitive decision-making.
Core Principle: "Stale data leads to stale decisions. Monitor freshness, optimize latency."
1. Freshness vs. Latency
| Metric | Definition | Example | Measurement |
|---|---|---|---|
| Freshness | How old is the data? | Data is 5 minutes old | |
| Latency | How long does processing take? | Pipeline takes 2 minutes | |
Example
Event occurs: 10:00:00 Event arrives in pipeline: 10:00:05 (5 sec ingestion latency) Processing completes: 10:02:00 (2 min processing latency) Data queried by user: 10:05:00 Freshness at query time: 5 minutes (10:05 - 10:00) Total latency: 2 minutes 5 seconds
2. Freshness Requirements by Use Case
| Use Case | Freshness SLO | Acceptable Latency | Example |
|---|---|---|---|
| Real-time fraud detection | < 1 second | < 100ms | Credit card transaction scoring |
| Live dashboards | < 1 minute | < 10 seconds | Website analytics |
| Operational metrics | < 5 minutes | < 1 minute | Server health monitoring |
| Business intelligence | < 1 hour | < 15 minutes | Sales reports |
| Data warehouse | < 24 hours | < 4 hours | Historical analysis |
| Compliance reporting | < 7 days | Days | Annual audits |
3. Measuring Data Freshness
SQL Freshness Check
-- Check freshness of latest record SELECT MAX(created_at) as latest_record, NOW() as current_time, EXTRACT(EPOCH FROM (NOW() - MAX(created_at))) / 60 as age_minutes FROM events; -- Alert if data is stale (> 10 minutes old) SELECT CASE WHEN MAX(created_at) < NOW() - INTERVAL '10 minutes' THEN 'STALE' ELSE 'FRESH' END as freshness_status FROM events;
Python Freshness Monitoring
from datetime import datetime, timedelta import pandas as pd def check_freshness(df: pd.DataFrame, timestamp_col: str, max_age_minutes: int = 10): """Check if data is fresh enough""" latest_timestamp = df[timestamp_col].max() age = datetime.now() - latest_timestamp age_minutes = age.total_seconds() / 60 is_fresh = age_minutes <= max_age_minutes return { 'is_fresh': is_fresh, 'latest_timestamp': latest_timestamp, 'age_minutes': age_minutes, 'threshold_minutes': max_age_minutes } # Usage result = check_freshness(df, 'event_time', max_age_minutes=10) if not result['is_fresh']: alert(f"Data is stale: {result['age_minutes']} minutes old")
dbt Freshness Tests
# models/sources.yml version: 2 sources: - name: production database: analytics freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} tables: - name: events loaded_at_field: created_at freshness: warn_after: {count: 10, period: minute} error_after: {count: 30, period: minute}
4. Latency Measurement
End-to-End Pipeline Latency
def measure_pipeline_latency(event_id: str): """Measure latency from event to availability""" # Get event timestamp from source event_time = get_event_timestamp(event_id) # Get processing completion time processed_time = get_processed_timestamp(event_id) # Calculate latency latency = (processed_time - event_time).total_seconds() # Track percentiles latency_metrics.observe(latency) return { 'event_id': event_id, 'event_time': event_time, 'processed_time': processed_time, 'latency_seconds': latency }
Per-Stage Latency Tracking
class PipelineStage: def __init__(self, name: str): self.name = name self.start_time = None self.end_time = None def __enter__(self): self.start_time = datetime.now() return self def __exit__(self, *args): self.end_time = datetime.now() latency = (self.end_time - self.start_time).total_seconds() # Log to monitoring log_metric(f'pipeline.{self.name}.latency', latency) # Usage with PipelineStage('ingestion'): ingest_data() with PipelineStage('transformation'): transform_data() with PipelineStage('loading'): load_data()
Prometheus Metrics
from prometheus_client import Histogram # Define latency histogram pipeline_latency = Histogram( 'pipeline_latency_seconds', 'Time taken for data to flow through pipeline', ['stage', 'source'], buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300] # seconds ) # Record latency with pipeline_latency.labels(stage='transform', source='kafka').time(): transform_data()
5. Freshness Monitoring and Alerting
Automated Freshness Checks
import schedule import time def monitor_freshness(): """Continuously monitor data freshness""" tables = ['events', 'users', 'orders'] for table in tables: freshness = check_table_freshness(table) if not freshness['is_fresh']: alert( severity='warning', message=f"Table {table} is stale: {freshness['age_minutes']} minutes old", threshold=freshness['threshold_minutes'] ) # Run every 5 minutes schedule.every(5).minutes.do(monitor_freshness) while True: schedule.run_pending() time.sleep(60)
Watermark Tracking
class WatermarkTracker: """Track high-water mark for streaming data""" def __init__(self, table_name: str): self.table_name = table_name self.watermark = self.load_watermark() def load_watermark(self) -> datetime: """Load last processed timestamp""" result = db.execute( f"SELECT MAX(processed_at) FROM {self.table_name}_watermark" ).fetchone() return result[0] if result[0] else datetime.min def update_watermark(self, timestamp: datetime): """Update watermark after processing""" db.execute( f"INSERT INTO {self.table_name}_watermark (processed_at) VALUES (%s)", (timestamp,) ) self.watermark = timestamp def get_lag(self) -> timedelta: """Get lag between watermark and current time""" return datetime.now() - self.watermark def is_lagging(self, threshold_minutes: int = 10) -> bool: """Check if processing is lagging""" lag_minutes = self.get_lag().total_seconds() / 60 return lag_minutes > threshold_minutes
6. Improving Freshness
Change Data Capture (CDC)
# Debezium CDC example # Instead of batch ETL every hour, stream changes in real-time from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'dbserver1.inventory.customers', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: change_event = message.value if change_event['op'] == 'c': # Create insert_to_warehouse(change_event['after']) elif change_event['op'] == 'u': # Update update_warehouse(change_event['after']) elif change_event['op'] == 'd': # Delete delete_from_warehouse(change_event['before'])
Incremental Updates
-- Instead of full table refresh -- DELETE FROM target_table; -- INSERT INTO target_table SELECT * FROM source_table; -- Use incremental update INSERT INTO target_table SELECT * FROM source_table WHERE updated_at > (SELECT MAX(updated_at) FROM target_table) ON CONFLICT (id) DO UPDATE SET column1 = EXCLUDED.column1, updated_at = EXCLUDED.updated_at;
Parallel Processing
from concurrent.futures import ThreadPoolExecutor import pandas as pd def process_partition(partition_df: pd.DataFrame): """Process a partition of data""" # Transform and load transformed = transform(partition_df) load_to_warehouse(transformed) # Split data into partitions partitions = np.array_split(large_df, 10) # Process in parallel with ThreadPoolExecutor(max_workers=10) as executor: executor.map(process_partition, partitions)
7. Trade-offs
Freshness vs. Cost
Real-time streaming (< 1 min freshness): - Cost: $$$$ (Kafka, Flink, dedicated infrastructure) - Use when: Fraud detection, live dashboards Micro-batch (5-15 min freshness): - Cost: $$ (Spark Streaming, scheduled jobs) - Use when: Operational metrics, near-real-time analytics Batch (hourly/daily freshness): - Cost: $ (Airflow, cron jobs) - Use when: Reporting, historical analysis
Freshness vs. Completeness
# Trade-off: Wait for all data vs. process what we have def process_with_timeout(timeout_seconds: int = 300): """Process data with timeout to ensure freshness""" start_time = time.time() data_buffer = [] while time.time() - start_time < timeout_seconds: new_data = fetch_data() data_buffer.extend(new_data) if is_complete(data_buffer): break # Got all data # Process what we have, even if incomplete if len(data_buffer) > 0: process(data_buffer) else: alert("No data received within timeout")
8. Freshness SLAs and SLOs
Defining SLOs
# data_freshness_slos.yml services: - name: user_events freshness_slo: target: 95 # 95% of data should be fresh threshold: 5 # within 5 minutes measurement_window: 1h - name: order_analytics freshness_slo: target: 99 threshold: 15 # within 15 minutes measurement_window: 24h
Measuring SLO Compliance
def calculate_freshness_slo(table_name: str, threshold_minutes: int, window_hours: int = 1): """Calculate % of data meeting freshness SLO""" query = f""" SELECT COUNT(*) FILTER ( WHERE created_at > NOW() - INTERVAL '{threshold_minutes} minutes' )::FLOAT / COUNT(*) * 100 as freshness_percent FROM {table_name} WHERE created_at > NOW() - INTERVAL '{window_hours} hours' """ result = db.execute(query).fetchone() freshness_percent = result[0] return { 'table': table_name, 'freshness_percent': freshness_percent, 'threshold_minutes': threshold_minutes, 'meets_slo': freshness_percent >= 95 # 95% target }
9. Tools for Freshness Monitoring
Monte Carlo Freshness Checks
# Monte Carlo automatically monitors freshness monitors: - type: freshness table: production.events field: created_at threshold: 10 minutes alert: - slack: #data-alerts - pagerduty: data-team
Custom Grafana Dashboard
# Prometheus query for freshness time() - max(event_timestamp) by (table) # Alert rule ALERT DataStale IF (time() - max(event_timestamp)) > 600 # 10 minutes FOR 5m LABELS { severity="warning" } ANNOTATIONS { summary="Data is stale in {{ $labels.table }}", description="Latest data is {{ $value }}s old" }
10. Handling Stale Data
Fallback to Cached Data
def get_data_with_fallback(cache_ttl_minutes: int = 60): """Get fresh data or fall back to cache""" # Try to get fresh data fresh_data = fetch_from_warehouse() freshness = check_freshness(fresh_data, 'updated_at', max_age_minutes=10) if freshness['is_fresh']: # Update cache cache.set('latest_data', fresh_data, ttl=cache_ttl_minutes * 60) return fresh_data else: # Fall back to cache cached_data = cache.get('latest_data') if cached_data: logger.warning(f"Using cached data (warehouse data is stale)") return cached_data else: raise DataUnavailableError("No fresh or cached data available")
Display Staleness to Users
def get_dashboard_data(): """Get data with freshness indicator""" data = fetch_data() freshness = check_freshness(data, 'event_time') return { 'data': data, 'metadata': { 'last_updated': freshness['latest_timestamp'], 'age_minutes': freshness['age_minutes'], 'is_fresh': freshness['is_fresh'], 'warning': f"Data is {freshness['age_minutes']:.0f} minutes old" if not freshness['is_fresh'] else None } }
11. Real Freshness Issues
Case Study: The Stale Dashboard
- Problem: Executive dashboard showing yesterday's revenue
- Root Cause: ETL job failed at 2 AM, no alerting on freshness
- Impact: Wrong business decisions made based on stale data
- Solution: Added freshness monitoring with PagerDuty alerts
- Prevention: Implemented SLO tracking and automated freshness tests
Case Study: The Slow Pipeline
- Problem: Real-time fraud detection taking 5 minutes (SLO: < 1 second)
- Root Cause: Single-threaded processing, no partitioning
- Solution: Implemented Kafka partitioning and parallel consumers
- Result: Latency reduced from 5 minutes to 200ms
12. Data Freshness Checklist
- SLOs Defined: Do we have freshness SLOs for each critical table?
- Monitoring: Are we continuously monitoring freshness?
- Alerting: Do we get alerted when data goes stale?
- Latency Tracking: Are we measuring P50/P95/P99 latencies?
- Optimization: Have we optimized for our freshness requirements?
- Fallbacks: Do we have fallback strategies for stale data?
- User Communication: Do we show data age to end users?
- SLO Compliance: Are we meeting our freshness SLOs > 95% of the time?
Related Skills
43-data-reliability/data-quality-monitoring43-data-reliability/data-contracts42-cost-engineering/infra-sizing