Claude-skill-registry async-patterns

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/async-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-async-patterns && rm -rf "$T"
manifest: skills/data/async-patterns/SKILL.md
source content

Async/Await Patterns with Comprehensive Error Handling

Core Principles

Async programming in Python 3.13 enables high-performance I/O-bound operations. All async code in Vibekit MUST include comprehensive error handling and proper resource cleanup.

Basic Async Functions

Every async function must handle errors properly:

import asyncio
from typing import Any

# ✅ REQUIRED: Comprehensive error handling in async functions
async def fetch_data(url: str, timeout: float = 10.0) -> dict[str, Any]:
    """
    Fetch data from URL with timeout and error handling.

    Raises:
        TimeoutError: If request exceeds timeout
        ConnectionError: If network request fails
        ValueError: If response is not valid JSON
    """
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response:
                response.raise_for_status()  # Raises on 4xx/5xx
                return await response.json()
    except asyncio.TimeoutError:
        raise TimeoutError(f"Request to {url} timed out after {timeout}s")
    except aiohttp.ClientError as e:
        raise ConnectionError(f"Failed to fetch {url}: {e}")
    except ValueError as e:
        raise ValueError(f"Invalid JSON response from {url}: {e}")
    # Let other exceptions propagate

# ❌ FORBIDDEN: Async function without error handling
async def bad_fetch(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()  # No timeout, no error handling!

Concurrent Execution with asyncio.gather()

Run multiple async operations concurrently:

import asyncio
from typing import Any

# ✅ REQUIRED: Use asyncio.gather() for concurrent operations
async def fetch_multiple_urls(urls: list[str]) -> list[dict[str, Any]]:
    """
    Fetch multiple URLs concurrently.

    Returns list of results in same order as input URLs.
    Failed requests return None instead of raising.
    """
    tasks = [fetch_data(url) for url in urls]

    # return_exceptions=True prevents one failure from cancelling all tasks
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Process results and exceptions
    processed: list[dict[str, Any]] = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Error fetching {urls[i]}: {result}")
            processed.append(None)
        else:
            processed.append(result)

    return processed

# Alternative: Fail-fast (any error cancels all)
async def fetch_multiple_strict(urls: list[str]) -> list[dict[str, Any]]:
    """Fetch URLs concurrently, raise on first error."""
    tasks = [fetch_data(url) for url in urls]
    return await asyncio.gather(*tasks)  # return_exceptions=False (default)

Task Management with create_task()

Control task lifecycle explicitly:

import asyncio
from typing import Set

# ✅ REQUIRED: Track tasks and handle cleanup
class BackgroundTaskManager:
    def __init__(self):
        self.tasks: Set[asyncio.Task] = set()

    def create_task(self, coro) -> asyncio.Task:
        """Create task and track it."""
        task = asyncio.create_task(coro)
        self.tasks.add(task)
        task.add_done_callback(self.tasks.discard)  # Auto-remove when done
        return task

    async def shutdown(self) -> None:
        """Cancel all pending tasks."""
        for task in self.tasks:
            task.cancel()

        # Wait for all cancellations to complete
        await asyncio.gather(*self.tasks, return_exceptions=True)
        self.tasks.clear()

# Usage
async def main():
    manager = BackgroundTaskManager()

    # Start background tasks
    task1 = manager.create_task(long_running_operation())
    task2 = manager.create_task(periodic_cleanup())

    try:
        # Do main work
        await some_main_task()
    finally:
        # Clean shutdown
        await manager.shutdown()

Async Context Managers

Ensure proper resource cleanup in async code:

import asyncio
from typing import AsyncIterator

# ✅ REQUIRED: Use async context managers for resources
class AsyncDatabaseConnection:
    """Async context manager for database connections."""

    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.conn = None

    async def __aenter__(self):
        """Establish connection on enter."""
        try:
            self.conn = await asyncio.wait_for(
                connect_to_database(self.connection_string),
                timeout=5.0
            )
            return self.conn
        except asyncio.TimeoutError:
            raise TimeoutError("Database connection timeout")
        except Exception as e:
            raise ConnectionError(f"Failed to connect: {e}")

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Close connection on exit, even if exception occurred."""
        if self.conn is not None:
            try:
                await asyncio.wait_for(self.conn.close(), timeout=5.0)
            except Exception as e:
                print(f"Error closing connection: {e}")
        return False  # Don't suppress exceptions

# Usage
async def query_database():
    async with AsyncDatabaseConnection("postgresql://...") as conn:
        result = await conn.execute("SELECT * FROM users")
        return result

Timeouts and Cancellation

Always set timeouts for async operations:

import asyncio
from typing import Any

# ✅ REQUIRED: Use timeouts for all external I/O
async def fetch_with_timeout(url: str, timeout: float = 10.0) -> dict[str, Any]:
    """Fetch data with overall timeout."""
    try:
        return await asyncio.wait_for(
            fetch_data(url),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        raise TimeoutError(f"Operation timed out after {timeout}s")

# ✅ REQUIRED: Handle cancellation gracefully
async def long_running_task():
    """Task that handles cancellation properly."""
    try:
        for i in range(100):
            await asyncio.sleep(1)
            print(f"Step {i}")
    except asyncio.CancelledError:
        print("Task was cancelled, cleaning up...")
        # Perform cleanup
        raise  # Re-raise to propagate cancellation

# ❌ FORBIDDEN: Async operation without timeout
async def bad_fetch():
    return await fetch_data("https://slow-api.com")  # No timeout!

Structured Concurrency with TaskGroup (Python 3.11+)

Use TaskGroup for managing related tasks:

import asyncio

# ✅ REQUIRED: Use TaskGroup for structured concurrency
async def fetch_all_data() -> list[dict]:
    """Fetch data from multiple sources using TaskGroup."""
    results = []

    async with asyncio.TaskGroup() as tg:
        # All tasks started within the group
        task1 = tg.create_task(fetch_data("https://api1.example.com"))
        task2 = tg.create_task(fetch_data("https://api2.example.com"))
        task3 = tg.create_task(fetch_data("https://api3.example.com"))

    # At this point, all tasks have completed or an exception was raised
    results = [task1.result(), task2.result(), task3.result()]
    return results

# TaskGroup automatically:
# - Waits for all tasks to complete
# - Cancels remaining tasks if one raises an exception
# - Raises the first exception that occurred

Async Generators

Create async iterators for streaming data:

import asyncio
from typing import AsyncIterator

# ✅ REQUIRED: Async generators for streaming data
async def fetch_paginated_data(api_url: str) -> AsyncIterator[dict]:
    """Fetch paginated data, yielding each item."""
    page = 1
    while True:
        try:
            response = await fetch_data(f"{api_url}?page={page}")
            items = response.get("items", [])

            if not items:
                break

            for item in items:
                yield item

            page += 1
            await asyncio.sleep(0.1)  # Rate limiting
        except Exception as e:
            print(f"Error fetching page {page}: {e}")
            break

# Usage
async def process_all_items():
    async for item in fetch_paginated_data("https://api.example.com/items"):
        await process_item(item)

Anti-Patterns to Avoid

No Error Handling in Async Functions

# BAD: Silent failures
async def bad_fetch(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# GOOD: Explicit error handling
async def good_fetch(url: str) -> dict:
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
    except aiohttp.ClientError as e:
        raise ConnectionError(f"Failed to fetch {url}: {e}")

No Timeout on Async Operations

# BAD: Can hang forever
result = await fetch_data(url)

# GOOD: Always use timeout
result = await asyncio.wait_for(fetch_data(url), timeout=10.0)

Not Re-raising CancelledError

# BAD: Suppresses cancellation
async def bad_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        print("Cancelled")
        # Missing: raise

# GOOD: Re-raise to propagate cancellation
async def good_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        print("Cancelled, cleaning up...")
        # Cleanup
        raise  # Re-raise

Using Blocking Code in Async Functions

# BAD: Blocks the event loop
async def bad_async():
    time.sleep(10)  # Blocking!
    return "done"

# GOOD: Use async sleep
async def good_async():
    await asyncio.sleep(10)  # Non-blocking
    return "done"

When to Use This Skill

Activate this skill when:

  • Implementing async API clients or server handlers
  • Creating concurrent data fetching operations
  • Building async context managers
  • Implementing retry logic for async operations
  • Working with async generators or streaming data
  • Managing background tasks with proper lifecycle

Related Resources

For additional patterns, see: