Claude-skill-registry async-patterns
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/async-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-async-patterns && rm -rf "$T"
manifest:
skills/data/async-patterns/SKILL.mdsource content
Async/Await Patterns with Comprehensive Error Handling
Core Principles
Async programming in Python 3.13 enables high-performance I/O-bound operations. All async code in Vibekit MUST include comprehensive error handling and proper resource cleanup.
Basic Async Functions
Every async function must handle errors properly:
import asyncio from typing import Any # ✅ REQUIRED: Comprehensive error handling in async functions async def fetch_data(url: str, timeout: float = 10.0) -> dict[str, Any]: """ Fetch data from URL with timeout and error handling. Raises: TimeoutError: If request exceeds timeout ConnectionError: If network request fails ValueError: If response is not valid JSON """ try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response: response.raise_for_status() # Raises on 4xx/5xx return await response.json() except asyncio.TimeoutError: raise TimeoutError(f"Request to {url} timed out after {timeout}s") except aiohttp.ClientError as e: raise ConnectionError(f"Failed to fetch {url}: {e}") except ValueError as e: raise ValueError(f"Invalid JSON response from {url}: {e}") # Let other exceptions propagate # ❌ FORBIDDEN: Async function without error handling async def bad_fetch(url: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() # No timeout, no error handling!
Concurrent Execution with asyncio.gather()
Run multiple async operations concurrently:
import asyncio from typing import Any # ✅ REQUIRED: Use asyncio.gather() for concurrent operations async def fetch_multiple_urls(urls: list[str]) -> list[dict[str, Any]]: """ Fetch multiple URLs concurrently. Returns list of results in same order as input URLs. Failed requests return None instead of raising. """ tasks = [fetch_data(url) for url in urls] # return_exceptions=True prevents one failure from cancelling all tasks results = await asyncio.gather(*tasks, return_exceptions=True) # Process results and exceptions processed: list[dict[str, Any]] = [] for i, result in enumerate(results): if isinstance(result, Exception): print(f"Error fetching {urls[i]}: {result}") processed.append(None) else: processed.append(result) return processed # Alternative: Fail-fast (any error cancels all) async def fetch_multiple_strict(urls: list[str]) -> list[dict[str, Any]]: """Fetch URLs concurrently, raise on first error.""" tasks = [fetch_data(url) for url in urls] return await asyncio.gather(*tasks) # return_exceptions=False (default)
Task Management with create_task()
Control task lifecycle explicitly:
import asyncio from typing import Set # ✅ REQUIRED: Track tasks and handle cleanup class BackgroundTaskManager: def __init__(self): self.tasks: Set[asyncio.Task] = set() def create_task(self, coro) -> asyncio.Task: """Create task and track it.""" task = asyncio.create_task(coro) self.tasks.add(task) task.add_done_callback(self.tasks.discard) # Auto-remove when done return task async def shutdown(self) -> None: """Cancel all pending tasks.""" for task in self.tasks: task.cancel() # Wait for all cancellations to complete await asyncio.gather(*self.tasks, return_exceptions=True) self.tasks.clear() # Usage async def main(): manager = BackgroundTaskManager() # Start background tasks task1 = manager.create_task(long_running_operation()) task2 = manager.create_task(periodic_cleanup()) try: # Do main work await some_main_task() finally: # Clean shutdown await manager.shutdown()
Async Context Managers
Ensure proper resource cleanup in async code:
import asyncio from typing import AsyncIterator # ✅ REQUIRED: Use async context managers for resources class AsyncDatabaseConnection: """Async context manager for database connections.""" def __init__(self, connection_string: str): self.connection_string = connection_string self.conn = None async def __aenter__(self): """Establish connection on enter.""" try: self.conn = await asyncio.wait_for( connect_to_database(self.connection_string), timeout=5.0 ) return self.conn except asyncio.TimeoutError: raise TimeoutError("Database connection timeout") except Exception as e: raise ConnectionError(f"Failed to connect: {e}") async def __aexit__(self, exc_type, exc_val, exc_tb): """Close connection on exit, even if exception occurred.""" if self.conn is not None: try: await asyncio.wait_for(self.conn.close(), timeout=5.0) except Exception as e: print(f"Error closing connection: {e}") return False # Don't suppress exceptions # Usage async def query_database(): async with AsyncDatabaseConnection("postgresql://...") as conn: result = await conn.execute("SELECT * FROM users") return result
Timeouts and Cancellation
Always set timeouts for async operations:
import asyncio from typing import Any # ✅ REQUIRED: Use timeouts for all external I/O async def fetch_with_timeout(url: str, timeout: float = 10.0) -> dict[str, Any]: """Fetch data with overall timeout.""" try: return await asyncio.wait_for( fetch_data(url), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(f"Operation timed out after {timeout}s") # ✅ REQUIRED: Handle cancellation gracefully async def long_running_task(): """Task that handles cancellation properly.""" try: for i in range(100): await asyncio.sleep(1) print(f"Step {i}") except asyncio.CancelledError: print("Task was cancelled, cleaning up...") # Perform cleanup raise # Re-raise to propagate cancellation # ❌ FORBIDDEN: Async operation without timeout async def bad_fetch(): return await fetch_data("https://slow-api.com") # No timeout!
Structured Concurrency with TaskGroup (Python 3.11+)
Use TaskGroup for managing related tasks:
import asyncio # ✅ REQUIRED: Use TaskGroup for structured concurrency async def fetch_all_data() -> list[dict]: """Fetch data from multiple sources using TaskGroup.""" results = [] async with asyncio.TaskGroup() as tg: # All tasks started within the group task1 = tg.create_task(fetch_data("https://api1.example.com")) task2 = tg.create_task(fetch_data("https://api2.example.com")) task3 = tg.create_task(fetch_data("https://api3.example.com")) # At this point, all tasks have completed or an exception was raised results = [task1.result(), task2.result(), task3.result()] return results # TaskGroup automatically: # - Waits for all tasks to complete # - Cancels remaining tasks if one raises an exception # - Raises the first exception that occurred
Async Generators
Create async iterators for streaming data:
import asyncio from typing import AsyncIterator # ✅ REQUIRED: Async generators for streaming data async def fetch_paginated_data(api_url: str) -> AsyncIterator[dict]: """Fetch paginated data, yielding each item.""" page = 1 while True: try: response = await fetch_data(f"{api_url}?page={page}") items = response.get("items", []) if not items: break for item in items: yield item page += 1 await asyncio.sleep(0.1) # Rate limiting except Exception as e: print(f"Error fetching page {page}: {e}") break # Usage async def process_all_items(): async for item in fetch_paginated_data("https://api.example.com/items"): await process_item(item)
Anti-Patterns to Avoid
No Error Handling in Async Functions
# BAD: Silent failures async def bad_fetch(url: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() # GOOD: Explicit error handling async def good_fetch(url: str) -> dict: try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() return await response.json() except aiohttp.ClientError as e: raise ConnectionError(f"Failed to fetch {url}: {e}")
No Timeout on Async Operations
# BAD: Can hang forever result = await fetch_data(url) # GOOD: Always use timeout result = await asyncio.wait_for(fetch_data(url), timeout=10.0)
Not Re-raising CancelledError
# BAD: Suppresses cancellation async def bad_task(): try: await long_operation() except asyncio.CancelledError: print("Cancelled") # Missing: raise # GOOD: Re-raise to propagate cancellation async def good_task(): try: await long_operation() except asyncio.CancelledError: print("Cancelled, cleaning up...") # Cleanup raise # Re-raise
Using Blocking Code in Async Functions
# BAD: Blocks the event loop async def bad_async(): time.sleep(10) # Blocking! return "done" # GOOD: Use async sleep async def good_async(): await asyncio.sleep(10) # Non-blocking return "done"
When to Use This Skill
Activate this skill when:
- Implementing async API clients or server handlers
- Creating concurrent data fetching operations
- Building async context managers
- Implementing retry logic for async operations
- Working with async generators or streaming data
- Managing background tasks with proper lifecycle
Related Resources
For additional patterns, see:
- Python asyncio Documentation: https://docs.python.org/3/library/asyncio.html
- Python 3.13 Patterns: See
skillpython-3-13-patterns