Claude-skill-registry databricks-python-sdk
Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/databricks-python-sdk" ~/.claude/skills/majiayu000-claude-skill-registry-databricks-python-sdk && rm -rf "$T"
skills/data/databricks-python-sdk/SKILL.mdDatabricks Development Guide
This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API.
SDK Documentation: https://databricks-sdk-py.readthedocs.io/en/latest/ GitHub Repository: https://github.com/databricks/databricks-sdk-py
Environment Setup
- Use existing virtual environment at
or use.venv
to create oneuv - For Spark operations:
uv pip install databricks-connect - For SDK operations:
uv pip install databricks-sdk - Databricks CLI version should be 0.278.0 or higher
Configuration
- Default profile name:
DEFAULT - Config file:
~/.databrickscfg - Environment variables:
,DATABRICKS_HOSTDATABRICKS_TOKEN
Databricks Connect (Spark Operations)
Use
databricks-connect for running Spark code locally against a Databricks cluster.
from databricks.connect import DatabricksSession # Auto-detects 'DEFAULT' profile from ~/.databrickscfg spark = DatabricksSession.builder.getOrCreate() # With explicit profile spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate() # Use spark as normal df = spark.sql("SELECT * FROM catalog.schema.table") df.show()
IMPORTANT: Do NOT set
.master("local[*]") - this will cause issues with Databricks Connect.
Direct REST API Access
For operations not yet in SDK or overly complex via SDK, use direct REST API:
from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Direct API call using authenticated client response = w.api_client.do( method="GET", path="/api/2.0/clusters/list" ) # POST with body response = w.api_client.do( method="POST", path="/api/2.0/jobs/run-now", body={"job_id": 123} )
When to use: Prefer SDK methods when available. Use
api_client.do for:
- New API endpoints not yet in SDK
- Complex operations where SDK abstraction is problematic
- Debugging/testing raw API responses
Databricks CLI
# Check version (should be >= 0.278.0) databricks --version # Use specific profile databricks --profile MY_PROFILE clusters list # Common commands databricks clusters list databricks jobs list databricks workspace ls /Users/me
SDK Documentation Architecture
The SDK documentation follows a predictable URL pattern:
Base: https://databricks-sdk-py.readthedocs.io/en/latest/ Workspace APIs: /workspace/{category}/{service}.html Account APIs: /account/{category}/{service}.html Authentication: /authentication.html DBUtils: /dbutils.html
Workspace API Categories
| Category | Services |
|---|---|
| clusters, cluster_policies, command_execution, instance_pools, libraries |
| catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations |
| jobs |
| warehouses, statement_execution, queries, alerts, dashboards |
| serving_endpoints |
| vector_search_indexes, vector_search_endpoints |
| pipelines |
| repos, secrets, workspace, git_credentials |
| files, dbfs |
| experiments, model_registry |
Authentication
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html
Environment Variables
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com DATABRICKS_TOKEN=dapi... # Personal Access Token
Code Patterns
# Auto-detect credentials from environment from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Explicit token auth w = WorkspaceClient( host="https://your-workspace.cloud.databricks.com", token="dapi..." ) # Azure Service Principal w = WorkspaceClient( host="https://adb-xxx.azuredatabricks.net", azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...", azure_tenant_id="tenant-id", azure_client_id="client-id", azure_client_secret="secret" ) # Use a named profile from ~/.databrickscfg w = WorkspaceClient(profile="MY_PROFILE")
Core API Reference
Clusters API
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html
# List all clusters for cluster in w.clusters.list(): print(f"{cluster.cluster_name}: {cluster.state}") # Get cluster details cluster = w.clusters.get(cluster_id="0123-456789-abcdef") # Create a cluster (returns Wait object) wait = w.clusters.create( cluster_name="my-cluster", spark_version=w.clusters.select_spark_version(latest=True), node_type_id=w.clusters.select_node_type(local_disk=True), num_workers=2 ) cluster = wait.result() # Wait for cluster to be running # Or use create_and_wait for blocking call cluster = w.clusters.create_and_wait( cluster_name="my-cluster", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) ) # Start/stop/delete w.clusters.start(cluster_id="...").result() w.clusters.stop(cluster_id="...") w.clusters.delete(cluster_id="...")
Jobs API
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html
from databricks.sdk.service.jobs import Task, NotebookTask # List jobs for job in w.jobs.list(): print(f"{job.job_id}: {job.settings.name}") # Create a job created = w.jobs.create( name="my-job", tasks=[ Task( task_key="main", notebook_task=NotebookTask(notebook_path="/Users/me/notebook"), existing_cluster_id="0123-456789-abcdef" ) ] ) # Run a job now run = w.jobs.run_now_and_wait(job_id=created.job_id) print(f"Run completed: {run.state.result_state}") # Get run output output = w.jobs.get_run_output(run_id=run.run_id)
SQL Statement Execution
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html
# Execute SQL query response = w.statement_execution.execute_statement( warehouse_id="abc123", statement="SELECT * FROM catalog.schema.table LIMIT 10", wait_timeout="30s" ) # Check status and get results if response.status.state == StatementState.SUCCEEDED: for row in response.result.data_array: print(row) # For large results, fetch chunks chunk = w.statement_execution.get_statement_result_chunk_n( statement_id=response.statement_id, chunk_index=0 )
SQL Warehouses
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html
# List warehouses for wh in w.warehouses.list(): print(f"{wh.name}: {wh.state}") # Get warehouse warehouse = w.warehouses.get(id="abc123") # Create warehouse created = w.warehouses.create_and_wait( name="my-warehouse", cluster_size="Small", max_num_clusters=1, auto_stop_mins=15 ) # Start/stop w.warehouses.start(id="abc123").result() w.warehouses.stop(id="abc123").result()
Unity Catalog - Tables
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html
# List tables in a schema for table in w.tables.list(catalog_name="main", schema_name="default"): print(f"{table.full_name}: {table.table_type}") # Get table info table = w.tables.get(full_name="main.default.my_table") print(f"Columns: {[c.name for c in table.columns]}") # Check if table exists exists = w.tables.exists(full_name="main.default.my_table")
Unity Catalog - Catalogs & Schemas
Doc (Catalogs): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html Doc (Schemas): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
# List catalogs for catalog in w.catalogs.list(): print(catalog.name) # Create catalog w.catalogs.create(name="my_catalog", comment="Description") # List schemas for schema in w.schemas.list(catalog_name="main"): print(schema.name) # Create schema w.schemas.create(name="my_schema", catalog_name="main")
Volumes
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html
from databricks.sdk.service.catalog import VolumeType # List volumes for vol in w.volumes.list(catalog_name="main", schema_name="default"): print(f"{vol.full_name}: {vol.volume_type}") # Create managed volume w.volumes.create( catalog_name="main", schema_name="default", name="my_volume", volume_type=VolumeType.MANAGED ) # Read volume info vol = w.volumes.read(name="main.default.my_volume")
Files API
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html
# Upload file to volume w.files.upload( file_path="/Volumes/main/default/my_volume/data.csv", contents=open("local_file.csv", "rb") ) # Download file with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f: content = f.read() # List directory contents for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"): print(f"{entry.name}: {entry.is_directory}") # Upload/download with progress (parallel) w.files.upload_from( file_path="/Volumes/main/default/my_volume/large.parquet", source_path="/local/path/large.parquet", use_parallel=True ) w.files.download_to( file_path="/Volumes/main/default/my_volume/large.parquet", destination="/local/output/", use_parallel=True )
Serving Endpoints (Model Serving)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html
# List endpoints for ep in w.serving_endpoints.list(): print(f"{ep.name}: {ep.state}") # Get endpoint endpoint = w.serving_endpoints.get(name="my-endpoint") # Query endpoint response = w.serving_endpoints.query( name="my-endpoint", inputs={"prompt": "Hello, world!"} ) # For chat/completions endpoints response = w.serving_endpoints.query( name="my-chat-endpoint", messages=[{"role": "user", "content": "Hello!"}] ) # Get OpenAI-compatible client openai_client = w.serving_endpoints.get_open_ai_client()
Vector Search
Doc (Indexes): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html Doc (Endpoints): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_endpoints.html
# List vector search indexes for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"): print(idx.name) # Query index results = w.vector_search_indexes.query_index( index_name="main.default.my_index", columns=["id", "text", "embedding"], query_text="search query", num_results=10 ) for doc in results.result.data_array: print(doc)
Pipelines (Delta Live Tables)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html
# List pipelines for pipeline in w.pipelines.list_pipelines(): print(f"{pipeline.name}: {pipeline.state}") # Get pipeline pipeline = w.pipelines.get(pipeline_id="abc123") # Start pipeline update w.pipelines.start_update(pipeline_id="abc123") # Stop pipeline w.pipelines.stop_and_wait(pipeline_id="abc123")
Secrets
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html
# List secret scopes for scope in w.secrets.list_scopes(): print(scope.name) # Create scope w.secrets.create_scope(scope="my-scope") # Put secret w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123") # Get secret (returns GetSecretResponse with value) secret = w.secrets.get_secret(scope="my-scope", key="api-key") # List secrets in scope (metadata only, not values) for s in w.secrets.list_secrets(scope="my-scope"): print(s.key)
DBUtils
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html
# Access dbutils through WorkspaceClient dbutils = w.dbutils # File system operations files = dbutils.fs.ls("/") dbutils.fs.cp("dbfs:/source", "dbfs:/dest") dbutils.fs.rm("dbfs:/path", recurse=True) # Secrets (same as w.secrets but dbutils interface) value = dbutils.secrets.get(scope="my-scope", key="my-key")
Common Patterns
CRITICAL: Async Applications (FastAPI, etc.)
The Databricks SDK is fully synchronous. All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with
asyncio.to_thread() to avoid blocking the event loop.
import asyncio from databricks.sdk import WorkspaceClient w = WorkspaceClient() # WRONG - blocks the event loop async def get_clusters_bad(): return list(w.clusters.list()) # BLOCKS! # CORRECT - runs in thread pool async def get_clusters_good(): return await asyncio.to_thread(lambda: list(w.clusters.list())) # CORRECT - for simple calls async def get_cluster(cluster_id: str): return await asyncio.to_thread(w.clusters.get, cluster_id) # CORRECT - FastAPI endpoint from fastapi import FastAPI app = FastAPI() @app.get("/clusters") async def list_clusters(): clusters = await asyncio.to_thread(lambda: list(w.clusters.list())) return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters] @app.post("/query") async def run_query(sql: str, warehouse_id: str): # Wrap the blocking SDK call response = await asyncio.to_thread( w.statement_execution.execute_statement, statement=sql, warehouse_id=warehouse_id, wait_timeout="30s" ) return response.result.data_array
Note:
WorkspaceClient().config.host is NOT a network call - it just reads config. No need to wrap property access.
Wait for Long-Running Operations
from datetime import timedelta # Pattern 1: Use *_and_wait methods cluster = w.clusters.create_and_wait( cluster_name="test", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) ) # Pattern 2: Use Wait object wait = w.clusters.create(...) cluster = wait.result() # Blocks until ready # Pattern 3: Manual polling with callback def progress(cluster): print(f"State: {cluster.state}") cluster = w.clusters.wait_get_cluster_running( cluster_id="...", timeout=timedelta(minutes=30), callback=progress )
Pagination
# All list methods return iterators that handle pagination automatically for job in w.jobs.list(): # Fetches all pages print(job.settings.name) # For manual control from databricks.sdk.service.jobs import ListJobsRequest response = w.jobs.list(limit=10) for job in response: print(job)
Error Handling
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists try: cluster = w.clusters.get(cluster_id="invalid-id") except NotFound: print("Cluster not found") except PermissionDenied: print("Access denied")
When Uncertain
If I'm unsure about a method, I should:
-
Check the documentation URL pattern:
https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html
-
Common categories:
- Clusters:
/workspace/compute/clusters.html - Jobs:
/workspace/jobs/jobs.html - Tables:
/workspace/catalog/tables.html - Warehouses:
/workspace/sql/warehouses.html - Serving:
/workspace/serving/serving_endpoints.html
- Clusters:
-
Fetch and verify before providing guidance on parameters or return types.