Agents python-observability
Python observability patterns including structured logging, metrics, and distributed tracing. Use when adding logging, implementing metrics collection, setting up tracing, or debugging production systems.
git clone https://github.com/wshobson/agents
T=$(mktemp -d) && git clone --depth=1 https://github.com/wshobson/agents "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/python-development/skills/python-observability" ~/.claude/skills/wshobson-agents-python-observability && rm -rf "$T"
plugins/python-development/skills/python-observability/SKILL.mdPython Observability
Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.
When to Use This Skill
- Adding structured logging to applications
- Implementing metrics collection with Prometheus
- Setting up distributed tracing across services
- Propagating correlation IDs through request chains
- Debugging production issues
- Building observability dashboards
Core Concepts
1. Structured Logging
Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.
2. The Four Golden Signals
Track latency, traffic, errors, and saturation for every service boundary.
3. Correlation IDs
Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.
4. Bounded Cardinality
Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.
Quick Start
import structlog structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), ], ) logger = structlog.get_logger() logger.info("Request processed", user_id="123", duration_ms=45)
Fundamental Patterns
Pattern 1: Structured Logging with Structlog
Configure structlog for JSON output with consistent fields.
import logging import structlog def configure_logging(log_level: str = "INFO") -> None: """Configure structured logging for the application.""" structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer(), ], wrapper_class=structlog.make_filtering_bound_logger( getattr(logging, log_level.upper()) ), context_class=dict, logger_factory=structlog.PrintLoggerFactory(), cache_logger_on_first_use=True, ) # Initialize at application startup configure_logging("INFO") logger = structlog.get_logger()
Pattern 2: Consistent Log Fields
Every log entry should include standard fields for filtering and correlation.
import structlog from contextvars import ContextVar # Store correlation ID in context correlation_id: ContextVar[str] = ContextVar("correlation_id", default="") logger = structlog.get_logger() def process_request(request: Request) -> Response: """Process request with structured logging.""" logger.info( "Request received", correlation_id=correlation_id.get(), method=request.method, path=request.path, user_id=request.user_id, ) try: result = handle_request(request) logger.info( "Request completed", correlation_id=correlation_id.get(), status_code=200, duration_ms=elapsed, ) return result except Exception as e: logger.error( "Request failed", correlation_id=correlation_id.get(), error_type=type(e).__name__, error_message=str(e), ) raise
Pattern 3: Semantic Log Levels
Use log levels consistently across the application.
| Level | Purpose | Examples |
|---|---|---|
| Development diagnostics | Variable values, internal state |
| Request lifecycle, operations | Request start/end, job completion |
| Recoverable anomalies | Retry attempts, fallback used |
| Failures needing attention | Exceptions, service unavailable |
# DEBUG: Detailed internal information logger.debug("Cache lookup", key=cache_key, hit=cache_hit) # INFO: Normal operational events logger.info("Order created", order_id=order.id, total=order.total) # WARNING: Abnormal but handled situations logger.warning( "Rate limit approaching", current_rate=950, limit=1000, reset_seconds=30, ) # ERROR: Failures requiring investigation logger.error( "Payment processing failed", order_id=order.id, error=str(e), payment_provider="stripe", )
Never log expected behavior at
ERROR. A user entering a wrong password is INFO, not ERROR.
Pattern 4: Correlation ID Propagation
Generate a unique ID at ingress and thread it through all operations.
from contextvars import ContextVar import uuid import structlog correlation_id: ContextVar[str] = ContextVar("correlation_id", default="") def set_correlation_id(cid: str | None = None) -> str: """Set correlation ID for current context.""" cid = cid or str(uuid.uuid4()) correlation_id.set(cid) structlog.contextvars.bind_contextvars(correlation_id=cid) return cid # FastAPI middleware example from fastapi import Request async def correlation_middleware(request: Request, call_next): """Middleware to set and propagate correlation ID.""" # Use incoming header or generate new cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4()) set_correlation_id(cid) response = await call_next(request) response.headers["X-Correlation-ID"] = cid return response
Propagate to outbound requests:
import httpx async def call_downstream_service(endpoint: str, data: dict) -> dict: """Call downstream service with correlation ID.""" async with httpx.AsyncClient() as client: response = await client.post( endpoint, json=data, headers={"X-Correlation-ID": correlation_id.get()}, ) return response.json()
Advanced Patterns
Pattern 5: The Four Golden Signals with Prometheus
Track these metrics for every service boundary:
from prometheus_client import Counter, Histogram, Gauge # Latency: How long requests take REQUEST_LATENCY = Histogram( "http_request_duration_seconds", "Request latency in seconds", ["method", "endpoint", "status"], buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10], ) # Traffic: Request rate REQUEST_COUNT = Counter( "http_requests_total", "Total HTTP requests", ["method", "endpoint", "status"], ) # Errors: Error rate ERROR_COUNT = Counter( "http_errors_total", "Total HTTP errors", ["method", "endpoint", "error_type"], ) # Saturation: Resource utilization DB_POOL_USAGE = Gauge( "db_connection_pool_used", "Number of database connections in use", )
Instrument your endpoints:
import time from functools import wraps def track_request(func): """Decorator to track request metrics.""" @wraps(func) async def wrapper(request: Request, *args, **kwargs): method = request.method endpoint = request.url.path start = time.perf_counter() try: response = await func(request, *args, **kwargs) status = str(response.status_code) return response except Exception as e: status = "500" ERROR_COUNT.labels( method=method, endpoint=endpoint, error_type=type(e).__name__, ).inc() raise finally: duration = time.perf_counter() - start REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc() REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration) return wrapper
Pattern 6: Bounded Cardinality
Avoid labels with unbounded values to prevent metric explosion.
# BAD: User ID has potentially millions of values REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this! # GOOD: Bounded values only REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200") # If you need per-user metrics, use a different approach: # - Log the user_id and query logs # - Use a separate analytics system # - Bucket users by type/tier REQUEST_COUNT.labels( method="GET", endpoint="/users", user_tier="premium", # Bounded set of values )
Pattern 7: Timed Operations with Context Manager
Create a reusable timing context manager for operations.
from contextlib import contextmanager import time import structlog logger = structlog.get_logger() @contextmanager def timed_operation(name: str, **extra_fields): """Context manager for timing and logging operations.""" start = time.perf_counter() logger.debug("Operation started", operation=name, **extra_fields) try: yield except Exception as e: elapsed_ms = (time.perf_counter() - start) * 1000 logger.error( "Operation failed", operation=name, duration_ms=round(elapsed_ms, 2), error=str(e), **extra_fields, ) raise else: elapsed_ms = (time.perf_counter() - start) * 1000 logger.info( "Operation completed", operation=name, duration_ms=round(elapsed_ms, 2), **extra_fields, ) # Usage with timed_operation("fetch_user_orders", user_id=user.id): orders = await order_repository.get_by_user(user.id)
Pattern 8: OpenTelemetry Tracing
Set up distributed tracing with OpenTelemetry.
Note: OpenTelemetry is actively evolving. Check the official Python documentation for the latest API patterns and best practices.
from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter def configure_tracing(service_name: str, otlp_endpoint: str) -> None: """Configure OpenTelemetry tracing.""" provider = TracerProvider() processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint)) provider.add_span_processor(processor) trace.set_tracer_provider(provider) tracer = trace.get_tracer(__name__) async def process_order(order_id: str) -> Order: """Process order with tracing.""" with tracer.start_as_current_span("process_order") as span: span.set_attribute("order.id", order_id) with tracer.start_as_current_span("validate_order"): validate_order(order_id) with tracer.start_as_current_span("charge_payment"): charge_payment(order_id) with tracer.start_as_current_span("send_confirmation"): send_confirmation(order_id) return order
Best Practices Summary
- Use structured logging - JSON logs with consistent fields
- Propagate correlation IDs - Thread through all requests and logs
- Track the four golden signals - Latency, traffic, errors, saturation
- Bound label cardinality - Never use unbounded values as metric labels
- Log at appropriate levels - Don't cry wolf with ERROR
- Include context - User ID, request ID, operation name in logs
- Use context managers - Consistent timing and error handling
- Separate concerns - Observability code shouldn't pollute business logic
- Test your observability - Verify logs and metrics in integration tests
- Set up alerts - Metrics are useless without alerting