Claude-skill-registry-data python-performance
Use when Python code runs slowly, needs profiling, requires async/await patterns, or needs concurrent execution - covers profiling tools, optimization patterns, and asyncio; measure before optimizing (plugin:python@dot-claude)
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry-data
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry-data "$T" && mkdir -p ~/.claude/skills && cp -r "$T/data/man" ~/.claude/skills/majiayu000-claude-skill-registry-data-python-performance && rm -rf "$T"
manifest:
data/man/SKILL.mdsource content
Python Performance & Concurrency
Profiling, optimization, and async patterns for Python.
Before Writing Code
- Read
for style conventionsreferences/pythonic-style.md - Check Python version: 3.13+ enables free-threaded concurrency options
- Profile before optimizing - never guess at bottlenecks
Reference Files
| Topic | When to Load | File |
|---|---|---|
| Pythonic style | Before generating code | |
| Semaphores, locks, producers/consumers | Advanced async | |
| Database, memory, NumPy optimization | Deep optimization | |
Profiling First
cProfile (CPU)
python -m cProfile -o output.prof script.py python -m pstats output.prof # Interactive analysis
import cProfile import pstats with cProfile.Profile() as pr: main() stats = pstats.Stats(pr) stats.sort_stats("cumulative").print_stats(10)
line_profiler (Line-by-Line)
uv add line-profiler kernprof -l -v script.py # Requires @profile decorator
py-spy (Production)
py-spy record -o profile.svg -- python script.py py-spy top --pid 12345 # Live profiling
memory_profiler
uv add memory-profiler python -m memory_profiler script.py # Requires @profile decorator
Concurrency Pattern Selection
| Workload | Solution |
|---|---|
| I/O-bound (network, disk) | / |
| CPU-bound (GIL Python) | |
| CPU-bound (nogil 3.13+) | |
| Mixed | async + ProcessPoolExecutor |
Async/Await Patterns
Basic Async
import asyncio async def fetch_data(url: str) -> dict: await asyncio.sleep(1) # Simulates I/O return {"url": url, "data": "result"} async def main(): result = await fetch_data("https://api.example.com") print(result) asyncio.run(main())
Concurrent Execution
async def fetch_all(urls: list[str]) -> list[dict]: # Concurrent I/O eliminates sequential waiting: 10 URLs at 100ms each # complete in ~100ms total, not 1000ms. Critical for API aggregation. tasks = [fetch_data(url) for url in urls] return await asyncio.gather(*tasks)
Error Handling
async def safe_fetch(url: str) -> dict | None: try: return await fetch_data(url) except Exception as e: print(f"Error: {e}") return None async def fetch_with_errors(urls: list[str]): results = await asyncio.gather( *[safe_fetch(url) for url in urls], return_exceptions=True # Don't fail on first error ) return [r for r in results if r and not isinstance(r, Exception)]
Timeouts
try: result = await asyncio.wait_for(slow_operation(), timeout=5.0) except asyncio.TimeoutError: print("Operation timed out")
Async Pitfalls
# WRONG: Forgetting await result = async_function() # Returns coroutine, doesn't execute! # WRONG: Blocking the event loop import time async def bad(): time.sleep(1) # Blocks everything! # CORRECT async def good(): await asyncio.sleep(1) # Non-blocking # WRONG: Calling async from sync def sync_func(): result = await async_func() # SyntaxError! # CORRECT def sync_func(): result = asyncio.run(async_func())
Optimization Patterns
Data Structures
# Sets use hash tables for O(1) membership testing. For collections # checked repeatedly or exceeding ~10 items, the overhead of hashing # pays off vs linear scanning. if item in items_list: # O(n) - scans every element if item in items_set: # O(1) - hash lookup # String immutability in Python forces reallocation on every +=. # For N concatenations, this creates O(n²) total copying. # join() pre-calculates final size and allocates once. result = "" for s in strings: result += s # Creates N intermediate string objects result = "".join(strings) # Single allocation, O(n) total
List Comprehensions
# List comprehensions bypass method dispatch overhead (no .append lookup) # and CPython pre-sizes the result list. For simple transforms, prefer # comprehensions unless loop body requires complex logic or side effects. result = [] for i in range(n): result.append(i**2) # Method lookup + call per iteration result = [i**2 for i in range(n)] # Optimized bytecode path
Generators for Memory
import sys list_data = [i for i in range(1_000_000)] # ~8MB in memory gen_data = (i for i in range(1_000_000)) # ~100 bytes (lazy evaluation) # Generators yield values on-demand, preventing memory exhaustion when # processing data larger than available RAM. Essential for log parsing, # ETL pipelines, or any single-pass data processing. def process_large_file(path): with open(path) as f: for line in f: # File iterator is itself a generator yield process(line)
Caching
from functools import lru_cache @lru_cache(maxsize=128) def expensive_computation(n: int) -> int: # Cached: subsequent calls with same n return instantly return sum(i**2 for i in range(n)) # Clear cache if needed expensive_computation.cache_clear()
Local Variables
# Python's LEGB resolution requires dictionary lookups for global/nonlocal # variables, but uses direct array indexing for locals. In tight loops with # 10k+ iterations, this overhead compounds significantly. GLOBAL = 100 def slow(): for i in range(10000): x = GLOBAL * i # Dict lookup each iteration def fast(): local = 100 # Captured once at function entry for i in range(10000): x = local * i # Direct array index
Multiprocessing (CPU-Bound)
import multiprocessing as mp def cpu_task(n: int) -> int: return sum(i**2 for i in range(n)) if __name__ == "__main__": with mp.Pool(4) as pool: results = pool.map(cpu_task, [1_000_000] * 4)
Async + Sync Integration
import asyncio from concurrent.futures import ProcessPoolExecutor def cpu_bound(n: int) -> int: return sum(i**2 for i in range(n)) async def main(): loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, cpu_bound, 1_000_000) print(result) asyncio.run(main())
Testing Async Code
import pytest @pytest.mark.asyncio async def test_fetch(): result = await fetch_data("https://api.example.com") assert result is not None @pytest.mark.asyncio async def test_timeout(): with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(slow_operation(), timeout=0.1)
Benchmarking
import timeit # Quick benchmark time = timeit.timeit(lambda: my_function(), number=1000) print(f"{time:.4f}s for 1000 runs") # pytest-benchmark (uv add --dev pytest-benchmark) def test_performance(benchmark): result = benchmark(my_function) assert result is not None
Workflow Integration
| Task | Skill |
|---|---|
| Writing async tests | |
| Root cause analysis | |
| Before claiming done | |
Best Practices
- Profile before optimizing - find real bottlenecks
- Use async for I/O - network, disk, databases
- Use multiprocessing for CPU - heavy computation
- Consider nogil (3.13+) - threading for CPU-bound
- Use appropriate data structures - set/dict for lookups
- Cache expensive operations - lru_cache
- Use generators - for large data iteration
- Avoid blocking in async - no time.sleep()
- Handle cancellation - catch CancelledError
- Benchmark changes - prove improvement