Claude-skill-registry building-mcp-servers
Use when building MCP servers in TypeScript, Python, or C#; when implementing tools, resources, or prompts; when configuring Streamable HTTP transport; when migrating from SSE; when adding OAuth authentication; when seeing MCP protocol errors
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/building-mcp-servers" ~/.claude/skills/majiayu000-claude-skill-registry-building-mcp-servers-8ab66c && rm -rf "$T"
skills/data/building-mcp-servers/SKILL.mdBuilding MCP Servers
Reference guide for Model Context Protocol server development (January 2026). Covers TypeScript, Python, and C# implementations with Streamable HTTP transport.
Quick Reference
| Language | Package | Version | Transport |
|---|---|---|---|
| TypeScript | | 1.25.1 | |
| Python | | 1.25.0 | |
| C# | | 0.6.0-preview | |
Transport Status
| Transport | Status | Use Case |
|---|---|---|
| stdio | Supported | Local/CLI (Claude Desktop, Cursor) |
| Streamable HTTP | Recommended | Remote servers, production |
| SSE | Deprecated | Legacy only |
Streamable HTTP Transport
Single endpoint replaces dual SSE endpoints. Supports stateful sessions or stateless (serverless) mode.
Protocol Flow
Client Server |------ POST /mcp ---------------->| (JSON-RPC messages) |<----- JSON or SSE response ------| |------ GET /mcp ----------------->| (Optional: server-initiated) |<----- SSE stream ----------------|
Required Headers
POST /mcp HTTP/1.1 Content-Type: application/json Accept: application/json, text/event-stream Mcp-Session-Id: <session-id> # After initialization
TypeScript Implementation
Installation
npm install @modelcontextprotocol/sdk zod express npm install -D @types/express
Basic Server
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { z } from "zod"; import express from "express"; const app = express(); app.use(express.json()); const server = new McpServer({ name: "my-server", version: "1.0.0" }); // Tool server.registerTool("add", { description: "Add two numbers", inputSchema: { a: z.number(), b: z.number() } }, async ({ a, b }) => ({ content: [{ type: "text", text: String(a + b) }] })); // Resource server.registerResource( "config", "config://app", { description: "App configuration" }, async (uri) => ({ contents: [{ uri: uri.href, text: JSON.stringify({ env: "prod" }) }] }) ); // Prompt server.registerPrompt("review", { description: "Code review", argsSchema: { code: z.string() } }, ({ code }) => ({ messages: [{ role: "user", content: { type: "text", text: `Review:\n${code}` } }] })); // Transport const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => crypto.randomUUID() }); await server.connect(transport); app.all("/mcp", async (req, res) => { await transport.handleRequest(req, res); }); app.listen(3000);
Note: Top-level
requires Node.js with ES modules (awaitin package.json or"type": "module"extension)..mjs
Stateless Mode (Serverless)
const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined // Disables sessions });
Python Implementation
Installation
pip install "mcp[cli]"
Basic Server (FastMCP)
from mcp.server.fastmcp import FastMCP, Context from typing import List mcp = FastMCP("my-server") # Tool @mcp.tool() def add(a: int, b: int) -> int: """Add two numbers.""" return a + b # Async tool with progress @mcp.tool() async def process_files(files: List[str], ctx: Context) -> str: """Process files with progress.""" for i, f in enumerate(files): await ctx.report_progress(i + 1, len(files)) return f"Processed {len(files)} files" # Resource @mcp.resource("config://app") def get_config() -> dict: """App configuration.""" return {"env": "prod", "version": "1.0.0"} # Dynamic resource @mcp.resource("users://{user_id}/profile") def get_user(user_id: str) -> dict: """Get user profile.""" return {"id": user_id, "name": f"User {user_id}"} # Prompt @mcp.prompt() def code_review(code: str, language: str = "python") -> str: """Code review prompt.""" return f"Review this {language} code:\n\n```{language}\n{code}\n```" if __name__ == "__main__": # Streamable HTTP mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp") # Or stdio: mcp.run()
FastAPI Integration
from fastapi import FastAPI from mcp.server.fastmcp import FastMCP api = FastAPI() mcp = FastMCP("api-tools") @mcp.tool() def query(sql: str) -> dict: return {"result": "data"} api.mount("/mcp", mcp.streamable_http_app()) # Run: uvicorn app:api --port 8000
C# Implementation
Installation
dotnet add package ModelContextProtocol.AspNetCore --prerelease
Basic Server
using System.ComponentModel; using Microsoft.Extensions.DependencyInjection; using ModelContextProtocol.Server; var builder = WebApplication.CreateBuilder(args); builder.Services .AddMcpServer() .WithHttpTransport() .WithToolsFromAssembly() .WithPromptsFromAssembly() .WithResourcesFromAssembly(); var app = builder.Build(); app.MapMcp(); // Maps /, /sse, /messages app.Run(); // Tools [McpServerToolType] public static class MyTools { [McpServerTool, Description("Add two numbers")] public static int Add( [Description("First number")] int a, [Description("Second number")] int b) => a + b; [McpServerTool, Description("Get weather")] public static async Task<string> GetWeather( HttpClient http, // Injected from DI [Description("City name")] string city, CancellationToken ct) { var data = await http.GetStringAsync($"https://api.weather.example/{city}", ct); return data; } } // Prompts [McpServerPromptType] public static class MyPrompts { [McpServerPrompt, Description("Code review prompt")] public static ChatMessage CodeReview( [Description("Code to review")] string code) => new(ChatRole.User, $"Review this code:\n\n{code}"); } // Resources [McpServerResourceType] public static class MyResources { [McpServerResource(Name = "config://app"), Description("App config")] public static string Config() => """{"env": "production"}"""; [McpServerResource(UriTemplate = "docs://{topic}")] public static string GetDoc([Description("Topic")] string topic) => $"Documentation for {topic}"; }
Stdio Transport (Local)
var builder = Host.CreateApplicationBuilder(args); builder.Logging.AddConsole(o => o.LogToStandardErrorThreshold = LogLevel.Trace); builder.Services .AddMcpServer() .WithStdioServerTransport() .WithToolsFromAssembly(); await builder.Build().RunAsync();
Authentication (OAuth 2.1)
MCP servers are OAuth Resource Servers (not Authorization Servers). Key requirements:
- PKCE mandatory (S256 method)
- Resource Indicators (RFC 8707) required
- Bearer tokens in Authorization header
- Audience validation on every request
Discovery Endpoints
GET /.well-known/oauth-protected-resource # Server metadata GET /.well-known/oauth-authorization-server # Auth server metadata
Server Response on 401
HTTP/1.1 401 Unauthorized WWW-Authenticate: Bearer realm="mcp", resource_metadata="https://mcp.example.com/.well-known/oauth-protected-resource"
Token Validation (Python)
import jwt from jwt import PyJWKClient class TokenValidator: def __init__(self, jwks_uri: str, audience: str): self.jwks = PyJWKClient(jwks_uri) self.audience = audience def validate(self, token: str) -> dict: key = self.jwks.get_signing_key_from_jwt(token) return jwt.decode( token, key.key, algorithms=["RS256"], audience=self.audience, options={"require": ["exp", "aud", "iss"]} )
Protected Resource Metadata Response
{ "resource": "https://mcp.example.com", "authorization_servers": ["https://auth.example.com"], "scopes_supported": ["read", "write", "admin"], "bearer_methods_supported": ["header"] }
OAuth Middleware Integration (Python/Starlette)
from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse from mcp.server.fastmcp import FastMCP class BearerAuthMiddleware(BaseHTTPMiddleware): def __init__(self, app, validator: TokenValidator): super().__init__(app) self.validator = validator async def dispatch(self, request, call_next): auth = request.headers.get("Authorization", "") if not auth.startswith("Bearer "): return JSONResponse( {"error": "unauthorized"}, status_code=401, headers={"WWW-Authenticate": 'Bearer realm="mcp"'} ) try: token = auth.replace("Bearer ", "") request.state.auth = self.validator.validate(token) except Exception: return JSONResponse({"error": "invalid_token"}, status_code=401) return await call_next(request) # Setup mcp = FastMCP("secure-server") validator = TokenValidator( jwks_uri="https://auth.example.com/.well-known/jwks.json", audience="https://mcp.example.com" ) app = Starlette( routes=[Mount("/mcp", app=mcp.streamable_http_app())], middleware=[Middleware(BearerAuthMiddleware, validator=validator)] )
OAuth Middleware Integration (TypeScript/Express)
import jwt from "jsonwebtoken"; import jwksClient from "jwks-rsa"; const client = jwksClient({ jwksUri: "https://auth.example.com/.well-known/jwks.json" }); const authMiddleware = async (req, res, next) => { const auth = req.headers.authorization; if (!auth?.startsWith("Bearer ")) { return res.status(401).json({ error: "unauthorized" }); } try { const token = auth.slice(7); const decoded = jwt.decode(token, { complete: true }); const key = await client.getSigningKey(decoded.header.kid); req.auth = jwt.verify(token, key.getPublicKey(), { audience: "https://mcp.example.com", algorithms: ["RS256"] }); next(); } catch (err) { res.status(401).json({ error: "invalid_token" }); } }; app.use("/mcp", authMiddleware);
Authenticated Client Request
const transport = new StreamableHTTPClientTransport( new URL("https://mcp.example.com/mcp"), { requestInit: { headers: { Authorization: `Bearer ${accessToken}` } } } );
Security Anti-Patterns
| Anti-Pattern | Fix |
|---|---|
| Token passthrough to upstream APIs | Use separate tokens for upstream calls |
| Missing audience validation | Always validate claim |
| Tokens in URLs | Use Authorization header only |
Scope Enforcement in Tools
Check scopes before executing sensitive operations:
TypeScript:
const authMiddleware = async (req, res, next) => { // ... token validation ... req.auth = { sub: decoded.sub, scopes: decoded.scope?.split(" ") || [] }; next(); }; server.registerTool("delete_user", { /* ... */ }, async ({ userId }, { meta }) => { const scopes = meta?.auth?.scopes || []; if (!scopes.includes("admin:write")) { return { isError: true, content: [{ type: "text", text: "Insufficient scope" }] }; } // ... perform deletion ... });
Python: Use HTTP middleware to validate, then check in tools:
# With Starlette middleware (see OAuth Middleware Integration above) # Store validated claims in request.state, then check in tool: @mcp.tool() def delete_user(user_id: str) -> str: """Delete user - requires admin:write scope.""" # Scope enforcement happens at HTTP layer via middleware # Tool assumes request already passed auth checks return f"Deleted user {user_id}"
Note: For advanced middleware with per-tool auth context, use
package (fastmcp) which providespip install fastmcpclass andMiddleware. The officialContext.get_state()package provides FastMCP but with simpler middleware options.mcp
Passing Auth Context to Tool Handlers
TypeScript: Store auth on transport or use a request-scoped context:
// In middleware: attach to request req.auth = { sub: decoded.sub, email: decoded.email }; // Pass to tool via server context or closure const sessions = new Map(); app.all("/mcp", async (req, res) => { const transport = new StreamableHTTPServerTransport({ /* ... */ }); sessions.set(transport.sessionId, { auth: req.auth }); // Tools access via sessions.get(sessionId) });
Python (with
package): Use middleware and context state:fastmcp
# pip install fastmcp from fastmcp import FastMCP, Context from fastmcp.server.middleware import Middleware, MiddlewareContext mcp = FastMCP("my-server") class AuthMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): # Extract and validate token, then store in context context.fastmcp_context.set_state("user_id", "user_123") context.fastmcp_context.set_state("scopes", ["read", "write"]) return await call_next() mcp.add_middleware(AuthMiddleware()) @mcp.tool async def get_my_profile(ctx: Context) -> dict: user_id = ctx.get_state("user_id") # Set by middleware return {"user_id": user_id, "profile": "..."}
Token Refresh Handling
Access tokens are short-lived (typically 1 hour). Strategies:
- Client-side refresh: Clients refresh tokens before expiration and reconnect
- Proactive server refresh: Background task refreshes tokens expiring soon
- On-demand refresh: Return 401, client refreshes and retries
Recommended pattern: Use short-lived access tokens (5-60 min) with refresh tokens. On 401:
// Client retry logic async function callWithRefresh(tool: string, args: object) { try { return await client.callTool(tool, args); } catch (err) { if (err.status === 401) { accessToken = await refreshAccessToken(refreshToken); transport.updateHeaders({ Authorization: `Bearer ${accessToken}` }); return await client.callTool(tool, args); } throw err; } }
Migration: SSE to Streamable HTTP
Server Changes
// OLD (SSE) - Two endpoints app.get("/sse", async (req, res) => { const transport = new SSEServerTransport("/sse/messages", res); await server.connect(transport); }); app.post("/sse/messages", async (req, res) => { /* ... */ }); // NEW (Streamable HTTP) - Single endpoint app.all("/mcp", async (req, res) => { const transport = new StreamableHTTPServerTransport(); await server.connect(transport); await transport.handleRequest(req, res); });
Client Changes
// OLD import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; const transport = new SSEClientTransport(new URL("http://localhost:3000/sse")); // NEW import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; const transport = new StreamableHTTPClientTransport(new URL("http://localhost:3000/mcp"));
Backward-Compatible Server
// Support both transports during migration app.all("/mcp", /* new Streamable HTTP handler */); app.get("/sse", /* legacy SSE handler */); app.post("/sse/messages", /* legacy message handler */);
Client Configuration
Claude Desktop / Claude Code (stdio)
{ "mcpServers": { "my-server": { "command": "node", "args": ["/path/to/server.js"], "env": { "API_KEY": "secret" } } } }
Remote Server (Streamable HTTP)
{ "mcpServers": { "remote": { "type": "streamable-http", "url": "https://mcp.example.com/mcp" } } }
Error Handling
Return tool errors (not protocol errors) for model self-correction:
TypeScript:
server.registerTool("query", { /* ... */ }, async ({ sql }) => { if (sql.includes("DROP")) { return { isError: true, content: [{ type: "text", text: "Destructive queries not allowed" }] }; } // ... });
Python: Raise exceptions in tools - they're caught and returned as errors:
@mcp.tool() def query(sql: str) -> dict: if "DROP" in sql.upper(): raise ValueError("Destructive queries not allowed") return {"result": "data"}
Common Mistakes
| Mistake | Fix |
|---|---|
| Using SSE for new servers | Use Streamable HTTP |
| Writing to stdout in stdio servers | Use stderr for logs |
| Missing session ID after init | Always include header |
| Not validating OAuth audience | Validate token matches your server |
| Forwarding client tokens upstream | Use separate credentials for upstream APIs |
Official Resources
SDKs
- TypeScript: https://github.com/modelcontextprotocol/typescript-sdk
- Python: https://github.com/modelcontextprotocol/python-sdk
- C#: https://github.com/modelcontextprotocol/csharp-sdk
Documentation
- Specification: https://modelcontextprotocol.io/specification/2025-11-25
- Transports: https://modelcontextprotocol.io/specification/2025-11-25/basic/transports
- Authorization: https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization
Testing
npx @modelcontextprotocol/inspector node path/to/server.js