Claude-skill-registry llm-pipeline
Pydantic-AI agents, RAG, embeddings for Pulse Radar knowledge extraction.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/llm-pipeline" ~/.claude/skills/majiayu000-claude-skill-registry-llm-pipeline && rm -rf "$T"
skills/data/llm-pipeline/SKILL.mdLLM Pipeline Skill
<overview> Pulse Radar uses LLM pipeline to transform raw Telegram messages into structured knowledge. Core philosophy: Messages individually are noise; batched extraction reveals patterns. </overview> <entity-hierarchy> ``` TOPICS (categories) └─ ATOMS (knowledge units: problem/solution/decision/insight...) └─ MESSAGES (raw data, hidden layer) ``` </entity-hierarchy> <extraction-flow> ```python # 1. Message arrives via Telegram webhook await save_telegram_message(message) # triggers TaskIQ2. Scoring (AI Judge, not heuristics - ADR-003)
score = await importance_scorer.score(message)
classification: SIGNAL (>0.6) / NOISE (<0.3)
3. Auto-trigger extraction when threshold met
if unprocessed_count >= 10: # ai_config.message_threshold await extract_knowledge_from_messages_task.kiq()
4. KnowledgeOrchestrator runs Pydantic AI agent
agent = Agent( model=model, system_prompt=get_extraction_prompt("uk"), output_type=KnowledgeExtractionOutput, # CRITICAL: structured output output_retries=5, ) result = await agent.run(messages_content)
5. Save to DB + embed
await save_topics_and_atoms(result.output) await embed_atoms_batch_task.kiq(atom_ids)
</agent-creation> <prompt-guidelines> 1. **JSON-only output** — explicitly state "respond with ONLY JSON" 2. **Schema in prompt** — include exact JSON structure expected 3. **Language enforcement** — "ALL fields MUST be in Ukrainian" 4. **Retry on language mismatch** — use `get_strengthened_prompt()` 5. **No markdown** — models often wrap JSON in ```json blocks </prompt-guidelines> <embedding-service> ```python # OpenAI: 1536 dimensions (text-embedding-3-small) # Ollama: 1024 dimensions (mxbai-embed-large) → padded to 1536</extraction-flow> <agent-creation> ```python from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIChatModel # Provider-specific model creation if provider.type == "ollama": model = OpenAIChatModel( model_name=agent_config.model_name, provider=OllamaProvider(base_url=provider.base_url), ) elif provider.type == "openai": model = OpenAIChatModel( model_name=agent_config.model_name, provider=OpenAIProvider(api_key=api_key), ) # Agent with structured output agent = Agent( model=model, output_type=MyPydanticModel, # Forces JSON schema system_prompt="...", output_retries=5, )
await embedding_service.generate_embedding(text) await embedding_service.embed_messages_batch(session, ids, batch_size=10)
</rag-context> <context-strategies> ## RAG vs CAG</embedding-service> <rag-context> ```python # SemanticSearchService uses pgvector cosine similarity similar_atoms = await search_service.search_atoms( query_embedding=embedding, limit=5, threshold=0.65, # ai_config.semantic_search ) # RAGContextBuilder assembles context for LLM context = await rag_builder.build_context( query=user_query, similar_atoms=similar_atoms, related_messages=messages, )
| Strategy | Data Type | Pulse Radar Use |
|---|---|---|
| RAG | Dynamic (messages, atoms) | Semantic search, history retrieval |
| CAG | Static (project config) | Keywords, glossary, components preloaded |
Hybrid: Project context (CAG) + similar atoms (RAG) = best extraction quality. See: @references/rag.md for detailed comparison. </context-strategies>
<adrs> - **ADR-003:** AI Importance Scoring — LLM Judge vs Heuristics (LLM chosen) - **ADR-006:** Pydantic AI vs LangChain — Hexagonal architecture (Pydantic AI chosen) </adrs> <references> - @references/architecture.md — Hexagonal LLM domain structure - @references/pydantic-ai.md — Agent configuration, streaming - @references/rag.md — RAG & CAG context strategies </references>