Claude-skill-registry ai-engineer-expert
Expert-level AI implementation, deployment, LLM integration, and production AI systems
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/ai-engineer-expert" ~/.claude/skills/majiayu000-claude-skill-registry-ai-engineer-expert && rm -rf "$T"
manifest:
skills/data/ai-engineer-expert/SKILL.mdsource content
AI Engineer Expert
Expert guidance for implementing AI systems, LLM integration, prompt engineering, and deploying production AI applications.
Core Concepts
AI Engineering
- LLM integration and orchestration
- Prompt engineering and optimization
- RAG (Retrieval-Augmented Generation)
- Vector databases and embeddings
- Fine-tuning and adaptation
- AI agent systems
Production AI
- Model deployment strategies
- API design for AI services
- Rate limiting and cost control
- Error handling and fallbacks
- Monitoring and logging
- Security and safety
LLM Patterns
- Chain-of-thought prompting
- Few-shot learning
- System/user message design
- Function calling and tools
- Streaming responses
- Context window management
LLM Integration
from openai import AsyncOpenAI from anthropic import Anthropic from typing import List, Dict, Optional import asyncio class LLMClient: """Unified LLM client with fallback""" def __init__(self, primary: str = "openai", fallback: str = "anthropic"): self.openai_client = AsyncOpenAI() self.anthropic_client = Anthropic() self.primary = primary self.fallback = fallback async def chat_completion(self, messages: List[Dict], model: str = "gpt-4-turbo", temperature: float = 0.7, max_tokens: int = 1000) -> str: """Chat completion with fallback""" try: if self.primary == "openai": response = await self.openai_client.chat.completions.create( model=model, messages=messages, temperature=temperature, max_tokens=max_tokens ) return response.choices[0].message.content except Exception as e: print(f"Primary provider failed: {e}, trying fallback") if self.fallback == "anthropic": response = self.anthropic_client.messages.create( model="claude-3-5-sonnet-20241022", messages=messages, temperature=temperature, max_tokens=max_tokens ) return response.content[0].text async def chat_completion_streaming(self, messages: List[Dict], model: str = "gpt-4-turbo"): """Streaming chat completion""" stream = await self.openai_client.chat.completions.create( model=model, messages=messages, stream=True ) async for chunk in stream: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content async def function_calling(self, messages: List[Dict], tools: List[Dict]) -> Dict: """Function calling with tools""" response = await self.openai_client.chat.completions.create( model="gpt-4-turbo", messages=messages, tools=tools, tool_choice="auto" ) message = response.choices[0].message if message.tool_calls: return { "type": "function_call", "function": message.tool_calls[0].function.name, "arguments": message.tool_calls[0].function.arguments } else: return { "type": "message", "content": message.content }
RAG Implementation
from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain.chains import RetrievalQA from langchain_openai import ChatOpenAI class RAGSystem: """Retrieval-Augmented Generation system""" def __init__(self, persist_directory: str = "./chroma_db"): self.embeddings = OpenAIEmbeddings() self.vectorstore = None self.persist_directory = persist_directory self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0) def ingest_documents(self, documents: List[str]): """Ingest and index documents""" # Split documents into chunks text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) chunks = text_splitter.create_documents(documents) # Create vector store self.vectorstore = Chroma.from_documents( documents=chunks, embedding=self.embeddings, persist_directory=self.persist_directory ) def query(self, question: str, k: int = 4) -> Dict: """Query with RAG""" if not self.vectorstore: raise ValueError("No documents ingested") # Retrieve relevant documents retriever = self.vectorstore.as_retriever( search_kwargs={"k": k} ) # Create QA chain qa_chain = RetrievalQA.from_chain_type( llm=self.llm, chain_type="stuff", retriever=retriever, return_source_documents=True ) # Get answer result = qa_chain({"query": question}) return { "answer": result["result"], "sources": [doc.page_content for doc in result["source_documents"]] } def similarity_search(self, query: str, k: int = 4) -> List[Dict]: """Similarity search in vector database""" results = self.vectorstore.similarity_search_with_score(query, k=k) return [ { "content": doc.page_content, "score": score, "metadata": doc.metadata } for doc, score in results ]
Prompt Engineering
class PromptTemplate: """Advanced prompt templates""" @staticmethod def chain_of_thought(question: str) -> str: """Chain-of-thought prompting""" return f"""Let's solve this step by step: Question: {question} Please think through this problem carefully: 1. First, identify what we need to find 2. Then, break down the problem into smaller steps 3. Solve each step 4. Finally, combine the results Your step-by-step solution:""" @staticmethod def few_shot(task: str, examples: List[Dict], query: str) -> str: """Few-shot learning prompt""" examples_text = "\n\n".join([ f"Input: {ex['input']}\nOutput: {ex['output']}" for ex in examples ]) return f"""Task: {task} Here are some examples: {examples_text} Now, please solve this: Input: {query} Output:""" @staticmethod def system_message(role: str, constraints: List[str], format_instructions: str) -> str: """System message template""" constraints_text = "\n".join([f"- {c}" for c in constraints]) return f"""You are a {role}. Constraints: {constraints_text} Output Format: {format_instructions} Remember to follow these guidelines strictly."""
AI Agent System
from typing import Callable import json class Tool: """Tool that agents can use""" def __init__(self, name: str, description: str, function: Callable): self.name = name self.description = description self.function = function def to_openai_function(self) -> Dict: """Convert to OpenAI function format""" return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": self.get_parameters() } } class AIAgent: """AI agent with tools""" def __init__(self, llm_client: LLMClient, tools: List[Tool]): self.llm = llm_client self.tools = {tool.name: tool for tool in tools} self.conversation_history = [] async def run(self, user_input: str, max_iterations: int = 10) -> str: """Run agent with tool use""" self.conversation_history.append({ "role": "user", "content": user_input }) for i in range(max_iterations): # Get LLM response with function calling response = await self.llm.function_calling( messages=self.conversation_history, tools=[tool.to_openai_function() for tool in self.tools.values()] ) if response["type"] == "message": # Agent is done return response["content"] # Execute tool tool_name = response["function"] arguments = json.loads(response["arguments"]) tool_result = await self.execute_tool(tool_name, arguments) # Add tool result to conversation self.conversation_history.append({ "role": "function", "name": tool_name, "content": str(tool_result) }) return "Max iterations reached" async def execute_tool(self, tool_name: str, arguments: Dict) -> any: """Execute a tool""" if tool_name not in self.tools: raise ValueError(f"Tool {tool_name} not found") tool = self.tools[tool_name] return await tool.function(**arguments)
Production Deployment
from fastapi import FastAPI, HTTPException, Depends from fastapi.responses import StreamingResponse from pydantic import BaseModel from circuitbreaker import circuit import asyncio app = FastAPI() class ChatRequest(BaseModel): messages: List[Dict] model: str = "gpt-4-turbo" stream: bool = False class RateLimiter: """Rate limiter for API""" def __init__(self, max_requests: int, window_seconds: int): self.max_requests = max_requests self.window_seconds = window_seconds self.requests = {} async def check_limit(self, user_id: str) -> bool: """Check if user is within rate limit""" import time now = time.time() if user_id not in self.requests: self.requests[user_id] = [] # Remove old requests self.requests[user_id] = [ req_time for req_time in self.requests[user_id] if now - req_time < self.window_seconds ] if len(self.requests[user_id]) >= self.max_requests: return False self.requests[user_id].append(now) return True rate_limiter = RateLimiter(max_requests=100, window_seconds=60) llm_client = LLMClient() @circuit(failure_threshold=5, recovery_timeout=60) async def call_llm(messages: List[Dict]) -> str: """LLM call with circuit breaker""" return await llm_client.chat_completion(messages) @app.post("/chat") async def chat(request: ChatRequest, user_id: str = Depends(get_user_id)): """Chat endpoint with rate limiting""" # Check rate limit if not await rate_limiter.check_limit(user_id): raise HTTPException(status_code=429, detail="Rate limit exceeded") try: if request.stream: async def generate(): async for chunk in llm_client.chat_completion_streaming(request.messages): yield chunk return StreamingResponse(generate(), media_type="text/event-stream") else: response = await call_llm(request.messages) return {"response": response} except Exception as e: raise HTTPException(status_code=500, detail=str(e))
Best Practices
LLM Integration
- Implement fallback providers
- Use streaming for better UX
- Cache responses where appropriate
- Handle rate limits gracefully
- Monitor token usage and costs
- Version prompts and track changes
Production Systems
- Implement circuit breakers
- Add comprehensive logging
- Monitor latency and errors
- Use rate limiting
- Implement retry logic with backoff
- Test edge cases thoroughly
Security
- Validate and sanitize inputs
- Implement authentication/authorization
- Never expose API keys in logs
- Use environment variables for secrets
- Implement content filtering
- Monitor for prompt injection
Anti-Patterns
❌ No error handling or fallbacks ❌ Exposing raw LLM outputs without validation ❌ No rate limiting or cost controls ❌ Storing API keys in code ❌ No monitoring or logging ❌ Ignoring token limits ❌ No testing of prompts
Resources
- OpenAI API: https://platform.openai.com/docs
- Anthropic Claude: https://docs.anthropic.com/
- LangChain: https://python.langchain.com/
- LlamaIndex: https://www.llamaindex.ai/
- Weights & Biases Prompts: https://wandb.ai/site/prompts