Claude-skill-registry genie-integration
Integrate Databricks Genie rooms as tools in agent workflows. Use when integrating Genie spaces with AI agents, querying Genie rooms programmatically via SDK or MCP, managing Genie conversations and polling, handling Genie API responses and errors, or building tool-calling agents that use Genie as a data source. Covers SDK patterns, MCP tool integration, conversation management, error handling, and performance optimization for Genie-based agent tools.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/genie-integration" ~/.claude/skills/majiayu000-claude-skill-registry-genie-integration && rm -rf "$T"
skills/data/genie-integration/SKILL.mdGenie Integration Patterns
Integrate Databricks Genie rooms as powerful tools in your agent workflows using SDK or MCP approaches.
Core Concepts
What is Databricks Genie?
Genie is a conversational BI interface that:
- Translates natural language to SQL
- Executes queries against your data
- Returns formatted results and visualizations
- Maintains conversation context
Key advantage: Existing Genie rooms become instant agent tools without rebuilding data pipelines.
Integration Approaches
SDK Integration:
- Direct control via Databricks Python SDK
- Manual conversation management
- Flexible polling strategies
- Best for custom workflows
MCP Tools:
- Pre-configured tool interfaces
- Simplified API surface
- Built-in polling logic
- Best for standard patterns
Problem-Solution Patterns
Problem 1: Genie Query Timeouts
Symptoms:
- Queries hang indefinitely
- Agent gets stuck waiting for response
- No error handling for slow queries
Root causes:
- Insufficient polling timeout
- Complex SQL generated by Genie
- Large dataset queries
- No backoff strategy
Solution:
@tool def query_genie_with_timeout(question: str, space_id: str, max_attempts: int = 30) -> str: """Query Genie with proper timeout handling""" import time from databricks.sdk import WorkspaceClient w = WorkspaceClient() try: # Start conversation response = w.genie.start_conversation( space_id=space_id, content=question ) conversation_id = response.conversation_id message_id = response.message_id # Poll with exponential backoff wait_time = 2 for attempt in range(max_attempts): message = w.genie.get_message( space_id=space_id, conversation_id=conversation_id, message_id=message_id ) if message.status == "COMPLETED": return extract_response(message) elif message.status in ["FAILED", "CANCELLED"]: return f"Query failed: {message.status}. Try simplifying your question." # Exponential backoff: 2s, 2s, 4s, 4s, 8s, 8s... time.sleep(wait_time) if attempt % 2 == 1: # Double wait time every 2 attempts wait_time = min(wait_time * 2, 10) # Cap at 10 seconds return "Query timeout. The query may be too complex or the dataset too large." except Exception as e: return f"Error: {str(e)}" def extract_response(message) -> str: """Extract formatted response from Genie message""" response_text = "" if message.attachments: for attachment in message.attachments: if hasattr(attachment, 'text') and attachment.text: response_text += attachment.text.content + "\n" elif hasattr(attachment, 'query') and attachment.query: if hasattr(attachment.query, 'description'): response_text += f"{attachment.query.description}\n" return response_text or message.content or "No response available"
Problem 2: Verbose Genie Responses Confuse Agent
Symptoms:
- Agent overwhelmed by SQL query text
- Long table outputs cause token limits
- Agent can't synthesize due to noise
Root causes:
- Not filtering Genie response attachments
- Including raw SQL in tool output
- No summarization of large results
Solution:
@tool def query_genie_concise(question: str, space_id: str) -> str: """Query Genie and return concise, agent-friendly response""" from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Get raw response (using pattern from Problem 1) response = w.genie.start_conversation(space_id=space_id, content=question) message = poll_for_completion(w, space_id, response.conversation_id, response.message_id) # Extract ONLY the natural language summary if message.attachments: for attachment in message.attachments: # Prioritize text summaries over raw SQL if hasattr(attachment, 'text') and attachment.text: # Return first text attachment (usually the summary) return attachment.text.content # If query result, extract key insights only if hasattr(attachment, 'query') and attachment.query: if hasattr(attachment.query, 'description'): desc = attachment.query.description # Truncate if too long if len(desc) > 500: return desc[:500] + "... [truncated for brevity]" return desc return message.content or "No clear response from Genie"
Best practice: Return summaries, not raw data. Let Genie do the summarization.
Problem 3: Losing Conversation Context
Symptoms:
- Each query starts fresh conversation
- Agent can't do follow-up questions
- "What about last month?" fails
Root causes:
- Not tracking conversation IDs
- Creating new conversation for each query
- No conversation state management
Solution:
class GenieConversationManager: """Manage ongoing conversations with Genie rooms""" def __init__(self): self.workspace_client = WorkspaceClient() self.conversations = {} # space_id -> conversation_id @tool def query_genie_contextual(self, question: str, space_id: str) -> str: """ Query Genie while maintaining conversation context. Automatically continues existing conversations or starts new ones. """ # Check if we have an active conversation for this space conversation_id = self.conversations.get(space_id) if conversation_id: # Continue existing conversation response = self.workspace_client.genie.create_message( space_id=space_id, conversation_id=conversation_id, content=question ) else: # Start new conversation response = self.workspace_client.genie.start_conversation( space_id=space_id, content=question ) # Save conversation ID self.conversations[space_id] = response.conversation_id # Poll and return (using patterns from above) message = poll_for_completion( self.workspace_client, space_id, response.conversation_id, response.message_id ) return extract_response(message) def reset_conversation(self, space_id: str): """Start fresh conversation for a space""" if space_id in self.conversations: del self.conversations[space_id]
Usage in agent:
# Initialize once for agent lifecycle genie_manager = GenieConversationManager() @tool def query_customer_behavior(question: str) -> str: """Query customer behavior Genie room""" return genie_manager.query_genie_contextual( question=question, space_id="01f09cdbacf01b5fa7ff7c237365502c" )
Problem 4: Genie Space ID Management
Symptoms:
- Hard-coded space IDs scattered in code
- Errors when space IDs change
- Difficulty managing multiple environments
Root causes:
- No centralized configuration
- Space IDs embedded in tool definitions
- No environment-aware setup
Solution:
# config.py from dataclasses import dataclass from typing import Dict import os @dataclass class GenieSpaceConfig: space_id: str name: str description: str class GenieConfig: """Centralized Genie space configuration""" def __init__(self, environment: str = None): self.environment = environment or os.getenv("DATABRICKS_ENV", "production") self.spaces = self._load_spaces() def _load_spaces(self) -> Dict[str, GenieSpaceConfig]: """Load space configurations per environment""" # Production spaces if self.environment == "production": return { "customer_behavior": GenieSpaceConfig( space_id="01f09cdbacf01b5fa7ff7c237365502c", name="Customer Behavior Analysis", description="Customer trends and preferences" ), "inventory": GenieSpaceConfig( space_id="02a10defbcg02c6ga8gg8d348476613d", name="Real-Time Inventory", description="Stock levels and turnover" ) } # Development spaces elif self.environment == "development": return { "customer_behavior": GenieSpaceConfig( space_id="dev_customer_space_id", name="Customer Behavior (Dev)", description="Dev customer data" ), "inventory": GenieSpaceConfig( space_id="dev_inventory_space_id", name="Inventory (Dev)", description="Dev inventory data" ) } raise ValueError(f"Unknown environment: {self.environment}") def get_space(self, name: str) -> GenieSpaceConfig: """Get space configuration by name""" if name not in self.spaces: raise ValueError(f"Unknown Genie space: {name}") return self.spaces[name] # Usage in tools config = GenieConfig() @tool def query_customer_behavior(question: str) -> str: """Query customer behavior""" space = config.get_space("customer_behavior") return query_genie(question, space.space_id)
SDK Integration Pattern
Complete SDK-Based Tool
from databricks.sdk import WorkspaceClient from langchain.tools import tool import time @tool def query_genie_sdk( question: str, space_id: str, conversation_id: str = None, max_attempts: int = 30 ) -> str: """ Query Databricks Genie room using SDK. Args: question: Natural language question space_id: Genie space ID conversation_id: Optional conversation ID to continue conversation max_attempts: Max polling attempts Returns: Genie's response as string """ w = WorkspaceClient() try: # Start or continue conversation if conversation_id: response = w.genie.create_message( space_id=space_id, conversation_id=conversation_id, content=question ) else: response = w.genie.start_conversation( space_id=space_id, content=question ) conv_id = response.conversation_id msg_id = response.message_id # Poll for completion with exponential backoff wait_time = 2 for attempt in range(max_attempts): message = w.genie.get_message( space_id=space_id, conversation_id=conv_id, message_id=msg_id ) # Check status if message.status == "COMPLETED": # Extract response result = "" if message.attachments: for attachment in message.attachments: if hasattr(attachment, 'text') and attachment.text: result += attachment.text.content + "\n" elif hasattr(attachment, 'query') and attachment.query: if hasattr(attachment.query, 'description'): result += attachment.query.description + "\n" return result.strip() or message.content or "No response" elif message.status in ["FAILED", "CANCELLED"]: return f"Query failed: {message.status}" # Wait with backoff time.sleep(wait_time) if attempt % 2 == 1: wait_time = min(wait_time * 2, 10) return "Query timeout after 60 seconds" except Exception as e: return f"Error querying Genie: {str(e)}"
MCP Tool Integration Pattern
Understanding MCP Tools
MCP (Model Context Protocol) tools provide a higher-level interface:
# MCP tools are pre-configured for specific Genie spaces # They handle polling and conversation management automatically # Available in environment as: # - query_space_<SPACE_ID> # - poll_response_<SPACE_ID> # Example: Using existing MCP tool from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Call Genie via MCP result = w.genie.query_space( space_id="01f09cdbacf01b5fa7ff7c237365502c", query="What products are trending?", conversation_id=None # Optional: continue conversation )
When to Use MCP vs SDK
Use MCP tools when:
- Standard query/response pattern
- Don't need custom polling logic
- Want simpler code
- MCP tools available for your spaces
Use SDK when:
- Need custom timeout handling
- Complex conversation management
- Custom response parsing
- MCP tools not configured
Performance Optimization
Pattern 1: Caching Genie Responses
from functools import lru_cache from datetime import datetime, timedelta class CachedGenieQuery: """Cache Genie responses with TTL""" def __init__(self, ttl_minutes: int = 15): self.cache = {} self.ttl = timedelta(minutes=ttl_minutes) def query(self, question: str, space_id: str) -> str: """Query with caching""" cache_key = f"{space_id}:{question}" # Check cache if cache_key in self.cache: result, timestamp = self.cache[cache_key] if datetime.now() - timestamp < self.ttl: return result # Query Genie result = query_genie_sdk(question, space_id) # Cache result self.cache[cache_key] = (result, datetime.now()) return result
Pattern 2: Parallel Genie Queries
import concurrent.futures def query_multiple_genie_spaces(questions: list[tuple[str, str]]) -> list[str]: """ Query multiple Genie spaces in parallel. Args: questions: List of (question, space_id) tuples Returns: List of responses in same order """ def query_one(question_space): question, space_id = question_space return query_genie_sdk(question, space_id) with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: results = list(executor.map(query_one, questions)) return results # Usage in agent questions = [ ("What products are trending?", "customer_space_id"), ("Which locations have high turnover?", "inventory_space_id") ] results = query_multiple_genie_spaces(questions)
Error Handling Best Practices
Comprehensive Error Strategy
@tool def query_genie_robust(question: str, space_id: str) -> str: """Query Genie with comprehensive error handling""" from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError w = WorkspaceClient() try: response = w.genie.start_conversation( space_id=space_id, content=question ) message = poll_for_completion( w, space_id, response.conversation_id, response.message_id ) if message.status == "COMPLETED": return extract_response(message) elif message.status == "FAILED": # Provide actionable error message return ("Query failed. This could be due to:\n" "- Invalid SQL generated\n" "- Data source unavailable\n" "- Permissions issue\n" "Try rephrasing your question or check Genie room configuration.") else: return f"Unexpected status: {message.status}" except DatabricksError as e: if "not found" in str(e).lower(): return "Genie space not found. Check space ID configuration." elif "permission" in str(e).lower(): return "Permission denied. Ensure agent has access to Genie space." else: return f"Databricks API error: {str(e)}" except TimeoutError: return "Query timeout. Try a simpler question or check data volume." except Exception as e: return f"Unexpected error: {str(e)}"
Testing Genie Integration
Unit Test Pattern
def test_genie_tool(): """Test Genie tool in isolation""" # Test 1: Simple query result = query_genie_sdk( question="How many customers do we have?", space_id="01f09cdbacf01b5fa7ff7c237365502c" ) assert result, "Should return non-empty result" assert "error" not in result.lower(), "Should not contain error" # Test 2: Invalid space ID result = query_genie_sdk( question="test", space_id="invalid_id" ) assert "not found" in result.lower() or "error" in result.lower() # Test 3: Conversation continuity # (Test that conversation_id parameter works)
Integration Test Pattern
def test_genie_in_agent(): """Test Genie tool within agent workflow""" from your_agent import GenieAgent agent = GenieAgent() # Test single-tool query result = agent.query("What products are trending?") assert "customer_behavior" in result['intermediate_steps'][0][0].tool # Test multi-tool query result = agent.query("Trending products at risk of overstock?") tools_called = [step[0].tool for step in result['intermediate_steps']] assert "customer_behavior" in tools_called assert "inventory" in tools_called
Quick Reference
Minimum Viable Genie Tool (SDK)
from databricks.sdk import WorkspaceClient from langchain.tools import tool import time @tool def query_genie(question: str, space_id: str) -> str: """Query Genie room""" w = WorkspaceClient() resp = w.genie.start_conversation(space_id=space_id, content=question) for _ in range(30): msg = w.genie.get_message(space_id, resp.conversation_id, resp.message_id) if msg.status == "COMPLETED": return msg.attachments[0].text.content if msg.attachments else msg.content elif msg.status in ["FAILED", "CANCELLED"]: return f"Failed: {msg.status}" time.sleep(2) return "Timeout"
Related Skills
- mosaic-ai-agent: Design agents that use Genie tools
- agent-mlops: Deploy Genie-powered agents to production