Claude-skill-registry async-python

Python async/await patterns for concurrent programming. Covers asyncio, aiohttp, async databases, task groups, and performance optimization.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/async-python" ~/.claude/skills/majiayu000-claude-skill-registry-async-python && rm -rf "$T"
manifest: skills/data/async-python/SKILL.md
source content

Async Python Skill

Master asynchronous Python programming for high-performance concurrent applications.

Asyncio Fundamentals

Basic Async/Await

import asyncio

async def fetch_data(url: str) -> dict:
    """Simulated async fetch."""
    await asyncio.sleep(1)  # Simulates I/O
    return {"url": url, "data": "..."}

async def main():
    # Sequential (slow)
    result1 = await fetch_data("https://api1.com")
    result2 = await fetch_data("https://api2.com")

    # Concurrent (fast)
    results = await asyncio.gather(
        fetch_data("https://api1.com"),
        fetch_data("https://api2.com"),
    )
    return results

# Run the event loop
asyncio.run(main())

Task Groups (Python 3.11+)

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("https://api1.com"))
        task2 = tg.create_task(fetch_data("https://api2.com"))

    # All tasks complete when exiting context
    return task1.result(), task2.result()

HTTP Requests with aiohttp

Client Session

import aiohttp
import asyncio

async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()

async def main():
    # Reuse session for connection pooling
    async with aiohttp.ClientSession() as session:
        urls = [
            "https://api.example.com/users",
            "https://api.example.com/posts",
            "https://api.example.com/comments",
        ]

        tasks = [fetch_json(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                print(f"Error fetching {url}: {result}")
            else:
                print(f"Got {len(result)} items from {url}")

asyncio.run(main())

With Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
)
async def fetch_with_retry(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
        response.raise_for_status()
        return await response.json()

Async Database Access

With asyncpg (PostgreSQL)

import asyncpg

async def main():
    # Connection pool
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/db",
        min_size=5,
        max_size=20,
    )

    async with pool.acquire() as conn:
        # Single query
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )

        # Multiple rows
        users = await conn.fetch(
            "SELECT * FROM users WHERE active = $1", True
        )

        # Transaction
        async with conn.transaction():
            await conn.execute(
                "UPDATE users SET balance = balance - $1 WHERE id = $2",
                100, sender_id
            )
            await conn.execute(
                "UPDATE users SET balance = balance + $1 WHERE id = $2",
                100, receiver_id
            )

    await pool.close()

With SQLAlchemy Async

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=True,
)

AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_users():
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(User).where(User.active == True))
        return result.scalars().all()

Concurrency Patterns

Semaphore for Rate Limiting

async def fetch_with_limit(urls: list[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(url: str):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    return await response.text()

    return await asyncio.gather(*[fetch_one(url) for url in urls])

Producer-Consumer Queue

async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)
    # Signal completion
    await queue.put(None)

async def consumer(queue: asyncio.Queue, worker_id: int):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        # Process item
        await process(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=100)

    # Start consumers
    consumers = [
        asyncio.create_task(consumer(queue, i))
        for i in range(5)
    ]

    # Start producer
    await producer(queue, items)

    # Wait for queue to be processed
    await queue.join()

    # Cancel consumers
    for c in consumers:
        c.cancel()

Timeout Handling

async def with_timeout():
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("Operation timed out")
        result = None
    return result

FastAPI Integration

from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    app.state.pool = await asyncpg.create_pool(DATABASE_URL)
    yield
    # Shutdown
    await app.state.pool.close()

app = FastAPI(lifespan=lifespan)

async def get_db():
    async with app.state.pool.acquire() as conn:
        yield conn

@app.get("/users/{user_id}")
async def get_user(user_id: int, db = Depends(get_db)):
    user = await db.fetchrow(
        "SELECT * FROM users WHERE id = $1", user_id
    )
    return user

Performance Tips

# 1. Use uvloop for faster event loop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 2. Batch database operations
async def insert_many(items: list[dict]):
    async with pool.acquire() as conn:
        await conn.executemany(
            "INSERT INTO items (name, value) VALUES ($1, $2)",
            [(i["name"], i["value"]) for i in items]
        )

# 3. Stream large responses
async def stream_response():
    async with session.get(url) as response:
        async for chunk in response.content.iter_chunked(8192):
            yield chunk

Anti-Patterns

❌ Blocking I/O in async code (

time.sleep
, sync HTTP) ❌ Creating new sessions per request ❌ Not closing resources properly ❌ Ignoring exceptions in gather ❌ Running CPU-bound work in event loop

✅ Use async versions (

asyncio.sleep
,
aiohttp
) ✅ Reuse connection pools ✅ Use context managers ✅ Handle exceptions:
return_exceptions=True
✅ Use
run_in_executor
for CPU work

# CPU-bound work
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
    None,  # Default executor
    cpu_intensive_function,
    arg1, arg2
)