Marketplace mcp-server
Generic MCP (Model Context Protocol) server development patterns. Provides reusable architecture and best practices for building MCP servers that expose any domain-specific operations as tools for AI agents. Framework-agnostic implementation supporting async operations, error handling, and enterprise-grade features.
git clone https://github.com/aiskillstore/marketplace
T=$(mktemp -d) && git clone --depth=1 https://github.com/aiskillstore/marketplace "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/azeem-2/mcp-server" ~/.claude/skills/aiskillstore-marketplace-mcp-server && rm -rf "$T"
skills/azeem-2/mcp-server/SKILL.mdGeneric MCP Server Development
This skill provides comprehensive patterns and reusable code for building MCP (Model Context Protocol) servers that can expose any domain operations as tools for AI agents. Follows 2025 best practices for performance, security, and maintainability.
When to Use This Skill
Use this skill when you need to:
- Build an MCP server for any domain (not just todos)
- Expose database operations as MCP tools
- Create AI-agent accessible APIs
- Implement async MCP tool handlers
- Add proper error handling and validation
- Support rate limiting and caching
- Build enterprise-grade MCP servers
- Integrate with multiple storage backends
1. Generic MCP Server Architecture
# mcp_server/core.py #!/usr/bin/env python3 """ Generic MCP Server Base Architecture Provides reusable patterns for any MCP server implementation """ import asyncio import json import logging from abc import ABC, abstractmethod from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Sequence, Union, Callable from contextlib import asynccontextmanager from dataclasses import dataclass, field from enum import Enum import redis.asyncio as redis from mcp.server import Server, NotificationOptions, stdio from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import ( Resource, Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel, CallToolRequest, EmptyResult, ListResourcesRequest, ListToolsRequest, ReadResourceRequest, GetPromptRequest, ListPromptsRequest ) from pydantic import BaseModel, Field, validator import aiofiles import yaml from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("mcp_server") class ServerConfig(BaseModel): """MCP Server configuration""" name: str version: str = "1.0.0" description: str debug: bool = False redis_url: Optional[str] = None rate_limit_requests: int = 100 rate_limit_window: int = 60 cache_ttl: int = 300 max_retries: int = 3 timeout: int = 30 class Config: extra = "allow" @dataclass class RequestContext: """Request context for tool calls""" user_id: str session_id: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.utcnow) class RateLimiter: """Redis-based rate limiter for MCP operations""" def __init__(self, redis_url: str, requests: int, window: int): self.redis_url = redis_url self.requests = requests self.window = window self._redis = None async def _get_redis(self): if not self._redis: self._redis = await redis.from_url(self.redis_url) return self._redis async def is_allowed(self, key: str) -> bool: """Check if request is allowed""" r = await self._get_redis() current = await r.incr(f"rate_limit:{key}") if current == 1: await r.expire(f"rate_limit:{key}", self.window) return current <= self.requests async def get_remaining(self, key: str) -> int: """Get remaining requests""" r = await self._get_redis() current = await r.get(f"rate_limit:{key}") return max(0, self.requests - int(current or 0)) class CacheManager: """Redis-based caching for MCP responses""" def __init__(self, redis_url: str, ttl: int = 300): self.redis_url = redis_url self.ttl = ttl self._redis = None async def _get_redis(self): if not self._redis: self._redis = await redis.from_url(self.redis_url) return self._redis def _make_key(self, tool_name: str, args: Dict[str, Any]) -> str: """Generate cache key from tool name and arguments""" import hashlib args_str = json.dumps(args, sort_keys=True) return f"cache:{tool_name}:{hashlib.md5(args_str.encode()).hexdigest()}" async def get(self, tool_name: str, args: Dict[str, Any]) -> Optional[Any]: """Get cached result""" r = await self._get_redis() key = self._make_key(tool_name, args) result = await r.get(key) return json.loads(result) if result else None async def set(self, tool_name: str, args: Dict[str, Any], value: Any): """Cache result""" r = await self._get_redis() key = self._make_key(tool_name, args) await r.setex(key, self.ttl, json.dumps(value)) class BaseMCPServer: """Base MCP Server with common functionality""" def __init__(self, config: ServerConfig): self.config = config self.server = Server(config.name) self.tools: Dict[str, Callable] = {} self.rate_limiter: Optional[RateLimiter] = None self.cache: Optional[CacheManager] = None # Setup optional components if config.redis_url: self.rate_limiter = RateLimiter( config.redis_url, config.rate_limit_requests, config.rate_limit_window ) self.cache = CacheManager( config.redis_url, config.cache_ttl ) # Register handlers self._register_handlers() logger.info(f"MCP Server '{config.name}' initialized") def _register_handlers(self): """Register MCP handlers""" @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """Return list of available tools""" return await self.list_tools() @self.server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool call with rate limiting and caching""" return await self.call_tool(name, arguments) def register_tool(self, name: str, handler: Callable, schema: Dict[str, Any]): """Register a new tool""" self.tools[name] = { "handler": handler, "schema": schema } logger.info(f"Registered tool: {name}") async def list_tools(self) -> List[Tool]: """List all available tools""" tools = [] for name, tool_info in self.tools.items(): tools.append(Tool( name=name, description=tool_info["schema"].get("description", ""), inputSchema=tool_info["schema"].get("inputSchema", {}) )) return tools async def call_tool(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Execute a tool call with full middleware pipeline""" start_time = datetime.utcnow() try: # Extract context from arguments context = self._extract_context(arguments) # Rate limiting check if self.rate_limiter: rate_key = f"{context.user_id}:{name}" if not await self.rate_limiter.is_allowed(rate_key): return [TextContent( type="text", text=json.dumps({ "status": "error", "error": "Rate limit exceeded", "remaining": await self.rate_limiter.get_remaining(rate_key) }) )] # Check cache if self.cache and self._is_cacheable(name): cached_result = await self.cache.get(name, arguments) if cached_result: logger.info(f"Cache hit for tool: {name}") return [TextContent( type="text", text=json.dumps(cached_result) )] # Validate tool exists if name not in self.tools: raise ValueError(f"Unknown tool: {name}") # Validate arguments schema = self.tools[name]["schema"] self._validate_arguments(arguments, schema) # Execute tool handler = self.tools[name]["handler"] result = await self._execute_tool(handler, arguments, context) # Cache result if applicable if self.cache and self._is_cacheable(name) and result.get("status") != "error": await self.cache.set(name, arguments, result) # Log execution duration = (datetime.utcnow() - start_time).total_seconds() logger.info(f"Tool {name} executed in {duration:.2f}s for user {context.user_id}") return [TextContent( type="text", text=json.dumps(result, default=str) )] except Exception as e: logger.error(f"Error executing tool {name}: {str(e)}", exc_info=True) duration = (datetime.utcnow() - start_time).total_seconds() logger.error(f"Tool {name} failed after {duration:.2f}s") return [TextContent( type="text", text=json.dumps({ "status": "error", "error": str(e), "tool": name, "timestamp": datetime.utcnow().isoformat() }) )] def _extract_context(self, arguments: Dict[str, Any]) -> RequestContext: """Extract request context from arguments""" user_id = arguments.pop("_user_id", "anonymous") session_id = arguments.pop("_session_id", None) metadata = arguments.pop("_metadata", {}) return RequestContext( user_id=user_id, session_id=session_id, metadata=metadata ) def _validate_arguments(self, arguments: Dict[str, Any], schema: Dict[str, Any]): """Validate tool arguments against schema""" # Basic validation - can be extended with pydantic input_schema = schema.get("inputSchema", {}) required = input_schema.get("required", []) properties = input_schema.get("properties", {}) # Check required fields for field in required: if field not in arguments: raise ValueError(f"Missing required field: {field}") # Validate field types for field, value in arguments.items(): if field in properties: field_schema = properties[field] expected_type = field_schema.get("type") if expected_type == "string" and not isinstance(value, str): raise ValueError(f"Field {field} must be a string") elif expected_type == "integer" and not isinstance(value, int): raise ValueError(f"Field {field} must be an integer") elif expected_type == "array" and not isinstance(value, list): raise ValueError(f"Field {field} must be an array") # Check enum values if "enum" in field_schema and value not in field_schema["enum"]: raise ValueError(f"Field {field} must be one of {field_schema['enum']}") def _is_cacheable(self, tool_name: str) -> bool: """Determine if tool result should be cached""" # Non-mutating operations are cacheable non_mutating = ["get", "list", "search", "find", "read"] return any(op in tool_name.lower() for op in non_mutating) async def _execute_tool(self, handler: Callable, arguments: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Execute tool handler with error handling""" try: # Pass context to handler if it accepts it import inspect sig = inspect.signature(handler) if 'context' in sig.parameters: result = await handler(arguments, context=context) else: result = await handler(arguments) return result except Exception as e: logger.error(f"Tool handler failed: {str(e)}") return { "status": "error", "error": str(e), "timestamp": datetime.utcnow().isoformat() } async def run(self): """Start the MCP server""" logger.info(f"Starting MCP server: {self.config.name}") async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name=self.config.name, server_version=self.config.version, capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ) ) ) def tool( name: Optional[str] = None, description: str = "", input_schema: Optional[Dict[str, Any]] = None ): """Decorator for registering MCP tools""" def decorator(func): tool_name = name or func.__name__ schema = { "description": description or func.__doc__ or "", "inputSchema": input_schema or {} } # Store schema on function for later registration func._mcp_tool_schema = schema func._mcp_tool_name = tool_name return func return decorator
2. Database Integration Patterns
# mcp_server/database.py """ Generic Database Integration for MCP Servers Supports multiple ORMs and connection patterns """ import asyncio from abc import ABC, abstractmethod from contextlib import asynccontextmanager from typing import Any, Dict, List, Optional, TypeVar, Generic, Union from datetime import datetime import json from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime, Text, Boolean, select, update, delete, insert from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.pool import NullPool import asyncpg import motor.motor_asyncio from redis.asyncio import Redis # Type variables T = TypeVar('T') class DatabaseBackend(ABC): """Abstract base for database backends""" @abstractmethod async def connect(self): """Establish connection""" pass @abstractmethod async def disconnect(self): """Close connection""" pass @abstractmethod async def execute_query(self, query: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Execute a query and return results""" pass @abstractmethod async def execute_command(self, command: str, params: Dict[str, Any] = None) -> Any: """Execute a command (INSERT, UPDATE, DELETE)""" pass class PostgresBackend(DatabaseBackend): """PostgreSQL backend using asyncpg""" def __init__(self, connection_string: str): self.connection_string = connection_string self.pool: Optional[asyncpg.Pool] = None async def connect(self): self.pool = await asyncpg.create_pool( self.connection_string, min_size=5, max_size=20, command_timeout=60 ) async def disconnect(self): if self.pool: await self.pool.close() async def execute_query(self, query: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]: async with self.pool.acquire() as conn: rows = await conn.fetch(query, *params.values() if params else []) return [dict(row) for row in rows] async def execute_command(self, command: str, params: Dict[str, Any] = None) -> Any: async with self.pool.acquire() as conn: return await conn.execute(command, *params.values() if params else []) class SQLAlchemyBackend(DatabaseBackend): """SQLAlchemy backend for multiple databases""" def __init__(self, database_url: str, async_mode: bool = True): self.database_url = database_url self.async_mode = async_mode self.engine = None self.session_factory = None async def connect(self): if self.async_mode: self.engine = create_async_engine( self.database_url, pool_pre_ping=True, pool_recycle=300, echo=False ) self.session_factory = async_sessionmaker( self.engine, class_=AsyncSession, expire_on_commit=False ) else: self.engine = create_engine( self.database_url, pool_pre_ping=True, pool_recycle=300, echo=False ) self.session_factory = sessionmaker( bind=self.engine, expire_on_commit=False ) async def disconnect(self): if self.engine: await self.engine.dispose() @asynccontextmanager async def get_session(self): """Get database session""" async with self.session_factory() as session: try: yield session if self.async_mode: await session.commit() else: session.commit() except Exception: if self.async_mode: await session.rollback() else: session.rollback() raise finally: if self.async_mode: await session.close() else: session.close() async def execute_query(self, query: Any, params: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Execute SQLAlchemy query""" async with self.get_session() as session: if isinstance(query, str): # Raw SQL query result = await session.execute(query, params or {}) rows = result.fetchall() return [dict(row._mapping) for row in rows] else: # SQLAlchemy ORM query result = await session.execute(query) rows = result.fetchall() return [dict(row._mapping) for row in rows] async def execute_command(self, command: Any, params: Dict[str, Any] = None) -> Any: """Execute SQLAlchemy command""" async with self.get_session() as session: if isinstance(command, str): # Raw SQL command result = await session.execute(command, params or {}) await session.commit() return result else: # SQLAlchemy ORM command await session.execute(command, params or {}) await session.commit() return None class MongoBackend(DatabaseBackend): """MongoDB backend using motor""" def __init__(self, connection_string: str, database_name: str): self.connection_string = connection_string self.database_name = database_name self.client = None self.db = None async def connect(self): self.client = motor.motor_asyncio.AsyncIOMotorClient(self.connection_string) self.db = self.client[self.database_name] async def disconnect(self): if self.client: self.client.close() async def execute_query(self, collection: str, query: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Execute MongoDB find query""" cursor = self.db[collection].find(query or {}) results = [] async for document in cursor: # Convert ObjectId to string if '_id' in document: document['_id'] = str(document['_id']) results.append(document) return results async def execute_command(self, operation: str, collection: str, data: Dict[str, Any]) -> Any: """Execute MongoDB command""" if operation == "insert": result = await self.db[collection].insert_one(data) return str(result.inserted_id) elif operation == "update": filter_ = data.pop("_filter") update_data = {"$set": data} result = await self.db[collection].update_one(filter_, update_data) return result.modified_count elif operation == "delete": result = await self.db[collection].delete_one(data) return result.deleted_count class DatabaseManager(Generic[T]): """Generic database manager for MCP servers""" def __init__(self, backend: DatabaseBackend): self.backend = backend self._connected = False async def connect(self): """Connect to database""" if not self._connected: await self.backend.connect() self._connected = True async def disconnect(self): """Disconnect from database""" if self._connected: await self.backend.disconnect() self._connected = False @asynccontextmanager async def transaction(self): """Database transaction context manager""" if hasattr(self.backend, 'get_session'): async with self.backend.get_session() as session: yield session else: # For backends that don't support transactions yield self.backend async def find_one(self, table_or_collection: str, query: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Find a single record""" if isinstance(self.backend, MongoBackend): results = await self.backend.execute_query(table_or_collection, query) return results[0] if results else None else: # SQL implementation where_clause = " AND ".join([f"{k} = :{k}" for k in query.keys()]) sql = f"SELECT * FROM {table_or_collection} WHERE {where_clause} LIMIT 1" results = await self.backend.execute_query(sql, query) return results[0] if results else None async def find_many( self, table_or_collection: str, query: Dict[str, Any] = None, limit: int = None, offset: int = None, order_by: str = None ) -> List[Dict[str, Any]]: """Find multiple records""" query = query or {} if isinstance(self.backend, MongoBackend): cursor = self.backend.db[table_or_collection].find(query) if limit: cursor = cursor.limit(limit) if offset: cursor = cursor.skip(offset) if order_by: # MongoDB sort format sort_field, sort_dir = order_by.split() cursor = cursor.sort([(sort_field, 1 if sort_dir == 'ASC' else -1)]) results = [] async for document in cursor: if '_id' in document: document['_id'] = str(document['_id']) results.append(document) return results else: # SQL implementation where_clause = "" if query: where_clause = "WHERE " + " AND ".join([f"{k} = :{k}" for k in query.keys()]) sql = f"SELECT * FROM {table_or_collection} {where_clause}" if order_by: sql += f" ORDER BY {order_by}" if limit: sql += f" LIMIT {limit}" if offset: sql += f" OFFSET {offset}" return await self.backend.execute_query(sql, query) async def create(self, table_or_collection: str, data: Dict[str, Any]) -> Any: """Create a new record""" data = data.copy() # Add timestamps data['created_at'] = datetime.utcnow() data['updated_at'] = datetime.utcnow() if isinstance(self.backend, MongoBackend): return await self.backend.execute_command("insert", table_or_collection, data) else: # SQL implementation columns = list(data.keys()) placeholders = [f":{col}" for col in columns] sql = f"INSERT INTO {table_or_collection} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})" return await self.backend.execute_command(sql, data) async def update(self, table_or_collection: str, query: Dict[str, Any], data: Dict[str, Any]) -> int: """Update records""" data = data.copy() data['updated_at'] = datetime.utcnow() if isinstance(self.backend, MongoBackend): data['_filter'] = query return await self.backend.execute_command("update", table_or_collection, data) else: # SQL implementation where_clause = " AND ".join([f"{k} = :{k}" for k in query.keys()]) set_clause = ", ".join([f"{k} = :update_{k}" for k in data.keys()]) # Prefix update params to avoid conflicts update_params = {f"update_{k}": v for k, v in data.items()} params = {**query, **update_params} sql = f"UPDATE {table_or_collection} SET {set_clause} WHERE {where_clause}" result = await self.backend.execute_command(sql, params) return result.rowcount if hasattr(result, 'rowcount') else 0 async def delete(self, table_or_collection: str, query: Dict[str, Any]) -> int: """Delete records""" if isinstance(self.backend, MongoBackend): return await self.backend.execute_command("delete", table_or_collection, query) else: # SQL implementation where_clause = " AND ".join([f"{k} = :{k}" for k in query.keys()]) sql = f"DELETE FROM {table_or_collection} WHERE {where_clause}" result = await self.backend.execute_command(sql, query) return result.rowcount if hasattr(result, 'rowcount') else 0 async def count(self, table_or_collection: str, query: Dict[str, Any] = None) -> int: """Count records""" query = query or {} if isinstance(self.backend, MongoBackend): return await self.backend.db[table_or_collection].count_documents(query) else: # SQL implementation where_clause = "" if query: where_clause = "WHERE " + " AND ".join([f"{k} = :{k}" for k in query.keys()]) sql = f"SELECT COUNT(*) as count FROM {table_or_collection} {where_clause}" results = await self.backend.execute_query(sql, query) return results[0]['count'] if results else 0 # Factory function for creating database managers def create_database_manager(database_url: str, backend_type: str = "auto") -> DatabaseManager: """Create database manager based on URL or backend type""" if backend_type == "auto": if database_url.startswith("postgresql+asyncpg://"): backend = SQLAlchemyBackend(database_url, async_mode=True) elif database_url.startswith("mongodb://"): import re match = re.match(r'mongodb://[^/]+/([^?]*)', database_url) db_name = match.group(1) if match else "default" backend = MongoBackend(database_url, db_name) elif database_url.startswith("postgresql://"): backend = PostgresBackend(database_url) else: backend = SQLAlchemyBackend(database_url, async_mode=True) else: if backend_type == "postgres": backend = PostgresBackend(database_url) elif backend_type == "mongodb": db_name = database_url.split("/")[-1].split("?")[0] backend = MongoBackend(database_url, db_name) elif backend_type == "sqlalchemy": backend = SQLAlchemyBackend(database_url) else: raise ValueError(f"Unknown backend type: {backend_type}") return DatabaseManager(backend)
3. Tool Implementation Patterns
# mcp_server/tools.py """ Generic MCP Tool Implementation Patterns """ from typing import Any, Dict, List, Optional, Union, Callable from datetime import datetime, timedelta import json import uuid from dataclasses import dataclass, field from .core import BaseMCPServer, tool, RequestContext from .database import DatabaseManager class BaseTool: """Base class for MCP tools""" def __init__(self, db_manager: DatabaseManager, cache=None): self.db_manager = db_manager self.cache = cache async def execute(self, args: Dict[str, Any], context: RequestContext = None) -> Dict[str, Any]: """Execute the tool logic""" raise NotImplementedError def _validate_permissions(self, context: RequestContext, required_permission: str = None) -> bool: """Validate user permissions""" # Implement permission checking logic return True class CRUDBaseTool(BaseTool): """Base CRUD tool for any entity""" def __init__(self, table_name: str, db_manager: DatabaseManager, schema: Dict[str, Any]): super().__init__(db_manager) self.table_name = table_name self.schema = schema self.entity_name = table_name.rstrip('s') # Remove plural 's' async def create(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Create entity""" try: # Validate against schema validated_data = self._validate_data(args, for_create=True) # Add user context if context: validated_data['user_id'] = context.user_id if context.session_id: validated_data['session_id'] = context.session_id # Insert into database result = await self.db_manager.create(self.table_name, validated_data) return { "status": "created", "id": result, "entity": self.entity_name, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "create" } async def get(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Get entity by ID""" try: entity_id = args.get("id") if not entity_id: raise ValueError("Missing required field: id") # Add user filter for security query = {"id": entity_id} if context and not self._validate_permissions(context, "read_all"): query["user_id"] = context.user_id result = await self.db_manager.find_one(self.table_name, query) if not result: return { "status": "not_found", "entity": self.entity_name, "id": entity_id } return { "status": "success", "entity": self.entity_name, "data": self._serialize_data(result) } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "get" } async def list(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """List entities with filtering""" try: # Build query from args query = {} filters = args.get("filters", {}) limit = args.get("limit", 20) offset = args.get("offset", 0) order_by = args.get("order_by", "created_at DESC") # Add user filter for security if context and not self._validate_permissions(context, "read_all"): query["user_id"] = context.user_id # Apply additional filters query.update(filters) # Fetch from database results = await self.db_manager.find_many( self.table_name, query=query, limit=limit, offset=offset, order_by=order_by ) # Get total count total = await self.db_manager.count(self.table_name, query) return { "status": "success", "entity": self.entity_name, "data": [self._serialize_data(r) for r in results], "pagination": { "total": total, "limit": limit, "offset": offset, "has_more": offset + limit < total } } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "list" } async def update(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Update entity""" try: entity_id = args.pop("id", None) if not entity_id: raise ValueError("Missing required field: id") # Validate update data update_data = self._validate_data(args, for_create=False) # Build query filter query = {"id": entity_id} if context and not self._validate_permissions(context, "update_all"): query["user_id"] = context.user_id # Update in database affected = await self.db_manager.update(self.table_name, query, update_data) if affected == 0: return { "status": "not_found", "entity": self.entity_name, "id": entity_id } return { "status": "updated", "entity": self.entity_name, "id": entity_id, "affected_rows": affected, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "update" } async def delete(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Delete entity""" try: entity_id = args.get("id") if not entity_id: raise ValueError("Missing required field: id") # Build query filter query = {"id": entity_id} if context and not self._validate_permissions(context, "delete_all"): query["user_id"] = context.user_id # Delete from database affected = await self.db_manager.delete(self.table_name, query) if affected == 0: return { "status": "not_found", "entity": self.entity_name, "id": entity_id } return { "status": "deleted", "entity": self.entity_name, "id": entity_id, "affected_rows": affected, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "delete" } def _validate_data(self, data: Dict[str, Any], for_create: bool = False) -> Dict[str, Any]: """Validate data against schema""" validated = {} schema_fields = self.schema.get("properties", {}) required_fields = self.schema.get("required", []) # Check required fields for create if for_create: for field in required_fields: if field not in data: raise ValueError(f"Missing required field: {field}") # Validate each field for field, value in data.items(): if field not in schema_fields: continue # Skip unknown fields or raise error based on strictness field_schema = schema_fields[field] field_type = field_schema.get("type") # Type validation if field_type == "string": if not isinstance(value, str): raise ValueError(f"Field {field} must be a string") # Check min/max length if "minLength" in field_schema and len(value) < field_schema["minLength"]: raise ValueError(f"Field {field} is too short") if "maxLength" in field_schema and len(value) > field_schema["maxLength"]: raise ValueError(f"Field {field} is too long") elif field_type == "integer": if not isinstance(value, int): raise ValueError(f"Field {field} must be an integer") # Check min/max value if "minimum" in field_schema and value < field_schema["minimum"]: raise ValueError(f"Field {field} is too small") if "maximum" in field_schema and value > field_schema["maximum"]: raise ValueError(f"Field {field} is too large") elif field_type == "array": if not isinstance(value, list): raise ValueError(f"Field {field} must be an array") # Check enum values if "enum" in field_schema and value not in field_schema["enum"]: raise ValueError(f"Field {field} must be one of {field_schema['enum']}") validated[field] = value return validated def _serialize_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """Serialize data for output""" serialized = data.copy() # Handle datetime serialization for key, value in serialized.items(): if isinstance(value, datetime): serialized[key] = value.isoformat() elif isinstance(value, dict): # Convert complex types to JSON string try: json.dumps(value) except TypeError: serialized[key] = str(value) return serialized class BulkOperationTool(BaseTool): """Tool for bulk operations on entities""" def __init__(self, table_name: str, db_manager: DatabaseManager, schema: Dict[str, Any]): super().__init__(db_manager) self.table_name = table_name self.schema = schema self.entity_name = table_name.rstrip('s') async def bulk_create(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Bulk create entities""" try: items = args.get("items", []) if not items: raise ValueError("No items provided for bulk create") # Validate all items validated_items = [] for item in items: validated = self._validate_item(item) if context: validated["user_id"] = context.user_id validated_items.append(validated) # Insert all items results = [] for item in validated_items: result = await self.db_manager.create(self.table_name, item) results.append(result) return { "status": "created", "entity": self.entity_name, "count": len(results), "ids": results, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "bulk_create" } async def bulk_update(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Bulk update entities""" try: updates = args.get("updates", []) if not updates: raise ValueError("No updates provided") total_affected = 0 for update in updates: entity_id = update.get("id") update_data = update.get("data", {}) if not entity_id: continue # Build query query = {"id": entity_id} if context: query["user_id"] = context.user_id # Update affected = await self.db_manager.update(self.table_name, query, update_data) total_affected += affected return { "status": "updated", "entity": self.entity_name, "affected_rows": total_affected, "updates_processed": len(updates), "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "bulk_update" } async def bulk_delete(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]: """Bulk delete entities""" try: ids = args.get("ids", []) if not ids: raise ValueError("No IDs provided for bulk delete") total_affected = 0 for entity_id in ids: # Build query query = {"id": entity_id} if context: query["user_id"] = context.user_id # Delete affected = await self.db_manager.delete(self.table_name, query) total_affected += affected return { "status": "deleted", "entity": self.entity_name, "affected_rows": total_affected, "ids_processed": len(ids), "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "error": str(e), "entity": self.entity_name, "operation": "bulk_delete" } def _validate_item(self, item: Dict[str, Any]) -> Dict[str, Any]: """Validate a single item""" # Use CRUD base tool validation crud_tool = CRUDBaseTool(self.table_name, self.db_manager, self.schema) return crud_tool._validate_data(item, for_create=True)
4. Example: Building a Generic Task Management MCP Server
# examples/task_mcp_server.py """ Example: Task Management MCP Server using generic patterns """ import os from typing import Dict, Any from mcp_server.core import BaseMCPServer, ServerConfig, tool from mcp_server.database import create_database_manager from mcp_server.tools import CRUDBaseTool, BulkOperationTool # Server configuration config = ServerConfig( name="task-manager", version="1.0.0", description="Generic task management MCP server", redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"), database_url=os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost/tasks") ) # Task entity schema TASK_SCHEMA = { "type": "object", "properties": { "title": { "type": "string", "minLength": 1, "maxLength": 200, "description": "Task title" }, "description": { "type": "string", "maxLength": 1000, "description": "Task description" }, "priority": { "type": "string", "enum": ["low", "medium", "high"], "default": "medium", "description": "Task priority" }, "status": { "type": "string", "enum": ["todo", "in_progress", "completed"], "default": "todo", "description": "Task status" }, "due_date": { "type": "string", "format": "date-time", "description": "Optional due date" }, "tags": { "type": "array", "items": {"type": "string"}, "description": "Task tags" } }, "required": ["title"] } class TaskMCPServer(BaseMCPServer): """Task Management MCP Server""" def __init__(self, config: ServerConfig): super().__init__(config) # Initialize database self.db_manager = create_database_manager(config.database_url) # Initialize tools self.task_tool = CRUDBaseTool("tasks", self.db_manager, TASK_SCHEMA) self.bulk_tool = BulkOperationTool("tasks", self.db_manager, TASK_SCHEMA) # Register tools self._register_task_tools() def _register_task_tools(self): """Register all task-related tools""" # Create task self.register_tool( "create_task", self.task_tool.create, { "description": "Create a new task", "inputSchema": { "type": "object", "properties": { "title": {"type": "string", "description": "Task title"}, "description": {"type": "string", "description": "Optional description"}, "priority": {"type": "string", "enum": ["low", "medium", "high"]}, "due_date": {"type": "string", "format": "date-time"}, "tags": {"type": "array", "items": {"type": "string"}} }, "required": ["title"] } } ) # Get task self.register_tool( "get_task", self.task_tool.get, { "description": "Get a task by ID", "inputSchema": { "type": "object", "properties": { "id": {"type": "integer", "description": "Task ID"} }, "required": ["id"] } } ) # List tasks self.register_tool( "list_tasks", self.task_tool.list, { "description": "List tasks with optional filtering", "inputSchema": { "type": "object", "properties": { "filters": {"type": "object", "description": "Filter criteria"}, "limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20}, "offset": {"type": "integer", "minimum": 0, "default": 0}, "order_by": {"type": "string", "description": "Order by field (e.g., 'created_at DESC')"} } } } ) # Update task self.register_tool( "update_task", self.task_tool.update, { "description": "Update a task", "inputSchema": { "type": "object", "properties": { "id": {"type": "integer", "description": "Task ID"}, "title": {"type": "string", "description": "New title"}, "description": {"type": "string", "description": "New description"}, "priority": {"type": "string", "enum": ["low", "medium", "high"]}, "status": {"type": "string", "enum": ["todo", "in_progress", "completed"]}, "due_date": {"type": "string", "format": "date-time"}, "tags": {"type": "array", "items": {"type": "string"}} }, "required": ["id"] } } ) # Delete task self.register_tool( "delete_task", self.task_tool.delete, { "description": "Delete a task", "inputSchema": { "type": "object", "properties": { "id": {"type": "integer", "description": "Task ID"} }, "required": ["id"] } } ) # Bulk create self.register_tool( "bulk_create_tasks", self.bulk_tool.bulk_create, { "description": "Create multiple tasks at once", "inputSchema": { "type": "object", "properties": { "items": { "type": "array", "items": { "type": "object", "properties": { "title": {"type": "string"}, "description": {"type": "string"}, "priority": {"type": "string", "enum": ["low", "medium", "high"]}, "tags": {"type": "array", "items": {"type": "string"}} }, "required": ["title"] } } }, "required": ["items"] } } ) # Search tasks self.register_tool( "search_tasks", self._search_tasks, { "description": "Search tasks by text query", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "limit": {"type": "integer", "minimum": 1, "maximum": 50, "default": 20} }, "required": ["query"] } } ) async def _search_tasks(self, args: Dict[str, Any], context) -> Dict[str, Any]: """Search tasks by text""" try: query = args.get("query", "") limit = args.get("limit", 20) # Build search query if isinstance(self.db_manager.backend, MongoBackend): # MongoDB text search search_query = { "$text": {"$search": query} } if context and not self.task_tool._validate_permissions(context, "read_all"): search_query["user_id"] = context.user_id results = await self.db_manager.find_many("tasks", search_query, limit=limit) else: # PostgreSQL full-text search sql = """ SELECT * FROM tasks WHERE to_tsvector('english', title || ' ' || COALESCE(description, '')) @@ plainto_tsquery('english', :query) """ params = {"query": query} if context and not self.task_tool._validate_permissions(context, "read_all"): sql += " AND user_id = :user_id" params["user_id"] = context.user_id sql += f" LIMIT {limit}" results = await self.db_manager.execute_query(sql, params) return { "status": "success", "entity": "task", "data": [self.task_tool._serialize_data(r) for r in results], "query": query, "count": len(results) } except Exception as e: return { "status": "error", "error": str(e), "operation": "search_tasks" } # Main execution async def main(): """Start the Task MCP Server""" server = TaskMCPServer(config) await server.run() if __name__ == "__main__": import asyncio asyncio.run(main())
5. Testing Patterns
# tests/test_mcp_server.py """ Generic MCP Server Testing Patterns """ import pytest import asyncio from typing import Dict, Any, List from unittest.mock import Mock, AsyncMock from mcp_server.core import BaseMCPServer, ServerConfig, RequestContext from mcp_server.database import DatabaseManager, create_database_manager from mcp_server.tools import CRUDBaseTool class MockDatabaseManager: """Mock database manager for testing""" def __init__(self): self.data = {} self.next_id = 1 async def create(self, table: str, data: Dict[str, Any]) -> int: """Mock create""" entity_id = self.next_id data['id'] = entity_id self.data[f"{table}:{entity_id}"] = data self.next_id += 1 return entity_id async def find_one(self, table: str, query: Dict[str, Any]) -> Dict[str, Any]: """Mock find one""" for key, value in self.data.items(): table_name, entity_id = key.split(":") if table_name == table: match = True for k, v in query.items(): if value.get(k) != v: match = False break if match: return value return None async def find_many(self, table: str, query: Dict[str, Any], limit: int = None) -> List[Dict[str, Any]]: """Mock find many""" results = [] for key, value in self.data.items(): table_name, entity_id = key.split(":") if table_name == table: match = True for k, v in query.items(): if value.get(k) != v: match = False break if match: results.append(value) if limit and len(results) >= limit: break return results async def update(self, table: str, query: Dict[str, Any], data: Dict[str, Any]) -> int: """Mock update""" count = 0 for key, value in self.data.items(): table_name, entity_id = key.split(":") if table_name == table: match = True for k, v in query.items(): if value.get(k) != v: match = False break if match: value.update(data) count += 1 return count async def delete(self, table: str, query: Dict[str, Any]) -> int: """Mock delete""" to_delete = [] for key, value in self.data.items(): table_name, entity_id = key.split(":") if table_name == table: match = True for k, v in query.items(): if value.get(k) != v: match = False break if match: to_delete.append(key) for key in to_delete: del self.data[key] return len(to_delete) @pytest.fixture async def mock_db(): """Mock database manager fixture""" return MockDatabaseManager() @pytest.fixture def test_schema(): """Test entity schema""" return { "type": "object", "properties": { "name": {"type": "string", "minLength": 1}, "value": {"type": "integer", "minimum": 0}, "status": {"type": "string", "enum": ["active", "inactive"]}, "tags": {"type": "array", "items": {"type": "string"}} }, "required": ["name"] } @pytest.fixture def crud_tool(mock_db, test_schema): """CRUD tool fixture""" return CRUDBaseTool("test_entities", mock_db, test_schema) @pytest.fixture def user_context(): """User context fixture""" return RequestContext(user_id="test_user", session_id="test_session") class TestCRUDTool: """Test CRUD tool operations""" @pytest.mark.asyncio async def test_create_entity(self, crud_tool, user_context): """Test entity creation""" args = { "name": "Test Entity", "value": 100, "status": "active", "tags": ["test", "example"] } result = await crud_tool.create(args, user_context) assert result["status"] == "created" assert "id" in result assert result["entity"] == "test_entity" assert "timestamp" in result @pytest.mark.asyncio async def test_create_missing_required(self, crud_tool, user_context): """Test creation with missing required field""" args = {"value": 100} # Missing 'name' result = await crud_tool.create(args, user_context) assert result["status"] == "error" assert "Missing required field" in result["error"] @pytest.mark.asyncio async def test_get_entity(self, crud_tool, user_context): """Test getting an entity""" # First create an entity create_args = {"name": "Test Get"} create_result = await crud_tool.create(create_args, user_context) entity_id = create_result["id"] # Get the entity get_args = {"id": entity_id} result = await crud_tool.get(get_args, user_context) assert result["status"] == "success" assert result["data"]["name"] == "Test Get" assert result["data"]["id"] == entity_id @pytest.mark.asyncio async def test_get_not_found(self, crud_tool, user_context): """Test getting non-existent entity""" args = {"id": 99999} result = await crud_tool.get(args, user_context) assert result["status"] == "not_found" @pytest.mark.asyncio async def test_list_entities(self, crud_tool, user_context): """Test listing entities""" # Create a few entities for i in range(3): args = {"name": f"Entity {i}"} await crud_tool.create(args, user_context) # List entities result = await crud_tool.list({}, user_context) assert result["status"] == "success" assert len(result["data"]) == 3 assert "pagination" in result assert result["pagination"]["total"] == 3 @pytest.mark.asyncio async def test_list_with_filters(self, crud_tool, user_context): """Test listing with filters""" # Create entities with different statuses await crud_tool.create({"name": "Active 1", "status": "active"}, user_context) await crud_tool.create({"name": "Inactive 1", "status": "inactive"}, user_context) await crud_tool.create({"name": "Active 2", "status": "active"}, user_context) # Filter by status result = await crud_tool.list( {"filters": {"status": "active"}}, user_context ) assert result["status"] == "success" assert all(entity["status"] == "active" for entity in result["data"]) @pytest.mark.asyncio async def test_update_entity(self, crud_tool, user_context): """Test updating an entity""" # Create entity create_result = await crud_tool.create({"name": "Original"}, user_context) entity_id = create_result["id"] # Update entity update_args = { "id": entity_id, "name": "Updated", "value": 200 } result = await crud_tool.update(update_args, user_context) assert result["status"] == "updated" assert result["affected_rows"] == 1 @pytest.mark.asyncio async def test_delete_entity(self, crud_tool, user_context): """Test deleting an entity""" # Create entity create_result = await crud_tool.create({"name": "To Delete"}, user_context) entity_id = create_result["id"] # Delete entity delete_args = {"id": entity_id} result = await crud_tool.delete(delete_args, user_context) assert result["status"] == "deleted" assert result["affected_rows"] == 1 # Verify deletion get_result = await crud_tool.get({"id": entity_id}, user_context) assert get_result["status"] == "not_found" class TestMCPServer: """Test MCP Server functionality""" @pytest.mark.asyncio async def test_server_initialization(self): """Test server initialization""" config = ServerConfig( name="test-server", description="Test MCP Server" ) server = BaseMCPServer(config) assert server.config.name == "test-server" assert server.server.name == "test-server" assert len(server.tools) == 0 @pytest.mark.asyncio async def test_tool_registration(self): """Test tool registration""" config = ServerConfig(name="test-server", description="Test") server = BaseMCPServer(config) # Register a test tool async def test_tool(args: Dict[str, Any]) -> Dict[str, Any]: return {"status": "success", "data": args} schema = { "description": "Test tool", "inputSchema": { "type": "object", "properties": { "message": {"type": "string"} } } } server.register_tool("test_tool", test_tool, schema) assert "test_tool" in server.tools assert server.tools["test_tool"]["schema"] == schema @pytest.mark.asyncio async def test_rate_limiting(self): """Test rate limiting functionality""" # This would require mocking Redis or using a test instance # Implementation would depend on your rate limiting strategy pass @pytest.mark.asyncio async def test_caching(self): """Test caching functionality""" # This would require mocking Redis or using a test instance # Implementation would depend on your caching strategy pass # Integration test example class TestMCPServerIntegration: """Integration tests for MCP Server""" @pytest.mark.asyncio async def test_full_crud_workflow(self): """Test full CRUD workflow through MCP interface""" # Create mock server config = ServerConfig( name="test-integration", description="Integration test server" ) # Use in-memory database mock_db = MockDatabaseManager() # Create CRUD tool test_schema = { "type": "object", "properties": { "name": {"type": "string"}, "value": {"type": "integer"} }, "required": ["name"] } crud_tool = CRUDBaseTool("items", mock_db, test_schema) # Register tools server = BaseMCPServer(config) server.register_tool( "create_item", crud_tool.create, { "description": "Create item", "inputSchema": { "type": "object", "properties": { "name": {"type": "string"}, "value": {"type": "integer"} }, "required": ["name"] } } ) # Test create result = await server.call_tool( "create_item", {"name": "Test Item", "value": 123} ) response = json.loads(result[0].text) assert response["status"] == "created" # Test get (would need get_item tool) # Test list # Test update # Test delete
This generic MCP server skill provides a reusable foundation for building any MCP server, not just for todos. It includes:
- Core Server Architecture - Base class with rate limiting, caching, and error handling
- Database Integration - Support for PostgreSQL, MongoDB, and SQLAlchemy with async operations
- Tool Patterns - Generic CRUD and bulk operation patterns that work with any entity
- Example Implementation - Shows how to build a task management server using the generic patterns
- Testing Framework - Comprehensive testing patterns and mocks
★ Insight ─────────────────────────────────────
The key architectural pattern here is the separation of concerns between:
- The MCP protocol handling (BaseMCPServer)
- The data access layer (DatabaseManager with multiple backends)
- The business logic layer (CRUDBaseTool and BulkOperationTool)
- The specific implementation (TaskMCPServer combining the components)
This makes the system highly reusable and maintainable. Any developer can quickly build a new MCP server by defining their entity schema and combining the generic tools.
─────────────────────────────────────────────────