install
source · Clone the upstream repo
git clone https://github.com/plurigrid/asi
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/plurigrid/asi "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/mcp-fastpath" ~/.claude/skills/plurigrid-asi-mcp-fastpath && rm -rf "$T"
manifest:
skills/mcp-fastpath/SKILL.mdsource content
MCP Fastpath Skill
Addresses: mcp-agent #603 (workflow wrapper timeouts), #564 (BrokenResourceError)
Problem Statement
The
MCPApp workflow wrappers in mcp-agent introduce:
- Timeout issues with long-running tools
- BrokenResourceError in stdio transport
- Complexity overhead for simple tool registrations
Solution: Use FastMCP directly for lightweight, reliable tool serving.
FastMCP Direct Pattern
from mcp.server.fastmcp import FastMCP from mcp.types import Tool, TextContent import asyncio # Initialize without workflow wrappers mcp = FastMCP("asi-tools") @mcp.tool() async def sheaf_laplacian( node_features: list[list[float]], edge_index: list[list[int]], stalk_dim: int = 4, ) -> dict: """ Compute sheaf Laplacian for distributed coordination. Args: node_features: Node feature matrix [n_nodes, feature_dim] edge_index: Edge connectivity [2, n_edges] stalk_dim: Dimension of stalks over edges Returns: Dictionary with Laplacian eigenvalues and coordination score """ import numpy as np n_nodes = len(node_features) n_edges = len(edge_index[0]) # Build sheaf Laplacian (simplified) L = np.zeros((n_nodes * stalk_dim, n_nodes * stalk_dim)) for i, (src, tgt) in enumerate(zip(edge_index[0], edge_index[1])): # Identity restriction maps (can be learned) F_src = np.eye(stalk_dim) F_tgt = np.eye(stalk_dim) # Add to Laplacian L[src*stalk_dim:(src+1)*stalk_dim, src*stalk_dim:(src+1)*stalk_dim] += F_src.T @ F_src L[tgt*stalk_dim:(tgt+1)*stalk_dim, tgt*stalk_dim:(tgt+1)*stalk_dim] += F_tgt.T @ F_tgt L[src*stalk_dim:(src+1)*stalk_dim, tgt*stalk_dim:(tgt+1)*stalk_dim] -= F_src.T @ F_tgt L[tgt*stalk_dim:(tgt+1)*stalk_dim, src*stalk_dim:(src+1)*stalk_dim] -= F_tgt.T @ F_src eigenvalues = np.linalg.eigvalsh(L) return { "eigenvalues": eigenvalues.tolist(), "spectral_gap": float(eigenvalues[1]) if len(eigenvalues) > 1 else 0.0, "coordination_score": float(1.0 / (1.0 + eigenvalues[1])) if len(eigenvalues) > 1 else 1.0, } @mcp.tool() async def gf3_trit_sum(values: list[int]) -> dict: """ Compute GF(3) trit sum and verify conservation. Args: values: List of balanced ternary values {-1, 0, 1} Returns: Sum, remainder, and conservation status """ total = sum(values) remainder = total % 3 # Convert to balanced representation balanced_remainder = remainder if remainder <= 1 else remainder - 3 return { "sum": total, "remainder": remainder, "balanced_remainder": balanced_remainder, "conserved": remainder == 0, "adjustment_needed": -balanced_remainder if remainder != 0 else 0, } @mcp.tool() async def operadic_compose( diagrams: list[dict], composition_order: list[int], ) -> dict: """ Compose diagrams operadically (nested substitution). Args: diagrams: List of diagram specs with 'dom', 'cod', 'boxes' composition_order: Order of composition (indices into diagrams) Returns: Composed diagram specification """ if not diagrams or not composition_order: return {"error": "Empty input"} # Start with first diagram result = diagrams[composition_order[0]].copy() for idx in composition_order[1:]: next_diag = diagrams[idx] # Check composability: result.cod == next_diag.dom if result.get('cod') != next_diag.get('dom'): return {"error": f"Type mismatch: {result.get('cod')} != {next_diag.get('dom')}"} # Sequential composition result = { 'dom': result['dom'], 'cod': next_diag['cod'], 'boxes': result.get('boxes', []) + next_diag.get('boxes', []), 'depth': max(result.get('depth', 1), next_diag.get('depth', 1)) + 1, } return result # Run without workflow wrapper overhead if __name__ == "__main__": mcp.run()
Stdio Transport Hardening
Address #564:
import asyncio import signal from contextlib import asynccontextmanager from mcp.server.fastmcp import FastMCP from mcp.server.stdio import stdio_server @asynccontextmanager async def robust_stdio_server(mcp: FastMCP): """Stdio server with graceful shutdown and error recovery.""" shutdown_event = asyncio.Event() def signal_handler(signum, frame): shutdown_event.set() # Register signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: async with stdio_server() as (read_stream, write_stream): # Wrap streams with error handling async def safe_read(): try: async for message in read_stream: if shutdown_event.is_set(): break yield message except Exception as e: if not shutdown_event.is_set(): print(f"Read error (recovering): {e}", file=sys.stderr) yield safe_read(), write_stream except BrokenPipeError: pass # Expected on clean shutdown except Exception as e: print(f"Stdio transport error: {e}", file=sys.stderr) finally: shutdown_event.set() async def run_robust_mcp(mcp: FastMCP): """Run MCP server with robust stdio handling.""" async with robust_stdio_server(mcp) as (read_stream, write_stream): await mcp.handle_streams(read_stream, write_stream)
Timeout-Free Long Operations
Pattern for long-running tools:
from mcp.server.fastmcp import FastMCP from mcp.types import TextContent import asyncio mcp = FastMCP("long-running-tools") @mcp.tool() async def train_sheaf_nn( dataset: str, epochs: int = 100, report_interval: int = 10, ) -> dict: """ Train sheaf neural network with progress reporting. Uses streaming progress instead of blocking for full training. """ results = {"epochs_completed": 0, "losses": []} for epoch in range(epochs): # Simulate training step await asyncio.sleep(0.1) # Yield to event loop loss = 1.0 / (epoch + 1) # Dummy loss results["losses"].append(loss) results["epochs_completed"] = epoch + 1 # Progress checkpoint (allows client to poll) if (epoch + 1) % report_interval == 0: # In practice, store to shared state or emit progress event pass return { "status": "completed", "final_loss": results["losses"][-1], "epochs": epochs, } @mcp.tool() async def batch_process_with_progress( items: list[str], batch_size: int = 10, ) -> dict: """Process items in batches with yielding.""" processed = [] for i in range(0, len(items), batch_size): batch = items[i:i+batch_size] # Process batch for item in batch: processed.append(f"processed:{item}") # Yield to event loop between batches await asyncio.sleep(0) return { "processed_count": len(processed), "items": processed[:10], # Return sample }
Memory Graph Schema Flexibility
Address MCP servers #3144:
from pydantic import BaseModel, ConfigDict from typing import Any, Optional class FlexibleEntity(BaseModel): """Entity with extensible properties for memory graph.""" model_config = ConfigDict(extra='allow') # Allow additional properties id: str type: str name: str observations: list[str] = [] # Additional properties stored dynamically def get_extra(self, key: str, default: Any = None) -> Any: return getattr(self, key, default) class FlexibleRelation(BaseModel): """Relation with extensible properties.""" model_config = ConfigDict(extra='allow') source: str target: str relation_type: str @mcp.tool() async def add_entity_flexible( id: str, type: str, name: str, observations: list[str] = [], **extra_properties: Any, ) -> dict: """Add entity with arbitrary additional properties.""" entity = FlexibleEntity( id=id, type=type, name=name, observations=observations, **extra_properties, ) # Store in memory graph... return { "status": "created", "entity": entity.model_dump(), }
Integration with ASI Skills
# Compose multiple ASI skills via MCP fastpath @mcp.tool() async def asi_pipeline( input_graph: dict, operations: list[str], ) -> dict: """ Run ASI skill pipeline on graph data. Operations can include: - "sheaf_laplacian": Compute coordination metric - "gf3_verify": Check GF(3) conservation - "topological_features": Extract TDA features """ result = {"input": input_graph, "steps": []} for op in operations: if op == "sheaf_laplacian": step_result = await sheaf_laplacian( input_graph["node_features"], input_graph["edge_index"], ) elif op == "gf3_verify": trits = [hash(str(n)) % 3 - 1 for n in input_graph["node_features"]] step_result = await gf3_trit_sum(trits) else: step_result = {"error": f"Unknown operation: {op}"} result["steps"].append({"operation": op, "result": step_result}) return result
Links
Commands
just mcp-fastpath-serve # Start FastMCP server just mcp-fastpath-test # Test tool endpoints just mcp-sheaf-tool # Run sheaf Laplacian tool just mcp-gf3-verify # Verify GF(3) conservation
GF(3) Category: PLUS (Generation) | Lightweight MCP tool serving