Claude-skill-registry delta-sharing
Delta Sharing configuration, monitoring, and recipient management for secure cross-organization data sharing.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/delta-sharing" ~/.claude/skills/majiayu000-claude-skill-registry-delta-sharing && rm -rf "$T"
manifest:
skills/data/delta-sharing/SKILL.mdsource content
Delta Sharing Skill
Overview
Delta Sharing is an open protocol for secure data sharing across organizations. This skill covers share configuration, recipient management, access control, and usage monitoring.
Key Benefits:
- Secure cross-organization sharing
- No data duplication
- Real-time data access
- Centralized access control
- Usage tracking and auditing
- Open standard (works with any platform)
When to Use This Skill
Use Delta Sharing when you need to:
- Share data with external partners
- Enable cross-workspace data access
- Distribute data products to consumers
- Implement multi-tenant data access
- Track data consumption patterns
- Maintain centralized governance
Core Concepts
1. Share Configuration
Create and Configure Shares:
from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Create share share = w.shares.create( name="partner_analytics_share", comment="Analytics data for external partners" ) # Add tables to share w.shares.update( name="partner_analytics_share", updates=[ { "action": "ADD", "data_object": { "name": "catalog.schema.customer_metrics", "data_object_type": "TABLE", "comment": "Aggregated customer metrics" } }, { "action": "ADD", "data_object": { "name": "catalog.schema.product_sales", "data_object_type": "TABLE" } } ] )
2. Recipient Management
Create and Manage Recipients:
# Create recipient recipient = w.recipients.create( name="acme_corp", comment="ACME Corporation partner", authentication_type="TOKEN" ) # Get activation URL for recipient activation_url = recipient.activation_url print(f"Share this URL with recipient: {activation_url}") # Grant access to share w.grants.update( securable_type="SHARE", securable_name="partner_analytics_share", changes=[{ "principal": "acme_corp", "add": ["SELECT"] }] )
3. Access Control
Manage Permissions:
# List current permissions permissions = w.grants.get_effective( securable_type="SHARE", full_name="partner_analytics_share" ) # Revoke access w.grants.update( securable_type="SHARE", securable_name="partner_analytics_share", changes=[{ "principal": "acme_corp", "remove": ["SELECT"] }] ) # Grant access to specific tables only w.grants.update( securable_type="TABLE", full_name="catalog.schema.customer_metrics", securable_type="SHARE", securable_name="partner_analytics_share", changes=[{ "principal": "acme_corp", "add": ["SELECT"] }] )
4. Usage Monitoring
Track Share Usage:
def monitor_share_usage(spark, share_name: str, days: int = 7): """Monitor Delta Sharing usage.""" query = f""" SELECT date_trunc('day', request_time) as date, recipient_name, table_name, COUNT(*) as request_count, SUM(rows_returned) as total_rows, SUM(bytes_transferred) as total_bytes FROM system.access.audit WHERE share_name = '{share_name}' AND request_time >= current_date() - INTERVAL {days} DAYS GROUP BY 1, 2, 3 ORDER BY 1 DESC, 4 DESC """ return spark.sql(query) # Get usage metrics usage_df = monitor_share_usage(spark, "partner_analytics_share") usage_df.show()
Implementation Patterns
Pattern 1: Multi-Tenant Sharing
Separate Shares per Tenant:
class MultiTenantSharingManager: """Manage multi-tenant data sharing.""" def __init__(self, workspace_client: WorkspaceClient): self.client = workspace_client def setup_tenant_share( self, tenant_name: str, tables: List[str], tenant_email: str ): """Set up complete sharing for a tenant.""" share_name = f"{tenant_name}_share" # Create share self.client.shares.create( name=share_name, comment=f"Data share for {tenant_name}" ) # Add tables for table in tables: self.client.shares.update( name=share_name, updates=[{ "action": "ADD", "data_object": { "name": table, "data_object_type": "TABLE" } }] ) # Create recipient recipient = self.client.recipients.create( name=tenant_name, comment=f"Recipient for {tenant_name}", authentication_type="TOKEN" ) # Grant access self.client.grants.update( securable_type="SHARE", securable_name=share_name, changes=[{ "principal": tenant_name, "add": ["SELECT"] }] ) return { "share_name": share_name, "recipient": tenant_name, "activation_url": recipient.activation_url, "tables": tables }
Pattern 2: Row-Level Security
Implement RLS for Shared Data:
import dlt from pyspark.sql.functions import * @dlt.table( name="customer_data_with_rls", comment="Customer data with row-level security" ) def customer_data_with_tenant_filter(): """ Apply row-level security for multi-tenant sharing. Each recipient sees only their tenant's data. """ return ( dlt.read("silver_customer_data") .withColumn("tenant_id", col("customer_id").substr(1, 3)) .select( "customer_id", "tenant_id", "customer_name", "email", "created_date" ) ) # Share with partition filtering def create_tenant_specific_share( tenant_id: str, source_table: str, share_name: str ): """ Create share with automatic tenant filtering. Uses partition pruning for efficient filtering. """ w = WorkspaceClient() # Create view with tenant filter view_name = f"{source_table}_{tenant_id}_view" spark.sql(f""" CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {source_table} WHERE tenant_id = '{tenant_id}' """) # Share the view w.shares.create(name=share_name) w.shares.update( name=share_name, updates=[{ "action": "ADD", "data_object": { "name": view_name, "data_object_type": "VIEW" } }] )
Pattern 3: Usage Analytics
Comprehensive Usage Tracking:
class SharingAnalytics: """Analyze Delta Sharing usage patterns.""" def __init__(self, spark): self.spark = spark def get_most_accessed_tables(self, days: int = 30): """Find most accessed shared tables.""" return self.spark.sql(f""" SELECT table_name, COUNT(DISTINCT recipient_name) as unique_recipients, COUNT(*) as total_requests, SUM(rows_returned) as total_rows, AVG(query_duration_ms) as avg_duration_ms FROM system.access.audit WHERE event_type = 'deltaSharing' AND request_time >= current_date() - INTERVAL {days} DAYS GROUP BY table_name ORDER BY total_requests DESC LIMIT 20 """) def get_recipient_activity(self, recipient_name: str, days: int = 7): """Detailed activity for specific recipient.""" return self.spark.sql(f""" SELECT date_trunc('hour', request_time) as hour, table_name, COUNT(*) as requests, SUM(rows_returned) as rows, SUM(bytes_transferred) / 1024 / 1024 as mb_transferred FROM system.access.audit WHERE recipient_name = '{recipient_name}' AND request_time >= current_date() - INTERVAL {days} DAYS GROUP BY 1, 2 ORDER BY 1 DESC """) def detect_anomalies(self, share_name: str): """Detect unusual access patterns.""" return self.spark.sql(f""" WITH daily_stats AS ( SELECT date_trunc('day', request_time) as date, recipient_name, COUNT(*) as daily_requests FROM system.access.audit WHERE share_name = '{share_name}' AND request_time >= current_date() - INTERVAL 30 DAYS GROUP BY 1, 2 ), stats AS ( SELECT recipient_name, AVG(daily_requests) as avg_requests, STDDEV(daily_requests) as stddev_requests FROM daily_stats GROUP BY recipient_name ) SELECT d.date, d.recipient_name, d.daily_requests, s.avg_requests, s.stddev_requests, CASE WHEN d.daily_requests > s.avg_requests + 2 * s.stddev_requests THEN 'ANOMALY_HIGH' WHEN d.daily_requests < s.avg_requests - 2 * s.stddev_requests THEN 'ANOMALY_LOW' ELSE 'NORMAL' END as status FROM daily_stats d JOIN stats s ON d.recipient_name = s.recipient_name WHERE date >= current_date() - INTERVAL 7 DAYS ORDER BY d.date DESC, d.daily_requests DESC """)
Pattern 4: Automated Share Management
CI/CD for Share Configuration:
""" Automated share management from configuration. """ import yaml from typing import Dict, List, Any class ShareConfigManager: """Manage shares from YAML configuration.""" def __init__(self, workspace_client: WorkspaceClient): self.client = workspace_client def load_config(self, config_path: str) -> Dict[str, Any]: """Load share configuration from YAML.""" with open(config_path, 'r') as f: return yaml.safe_load(f) def apply_config(self, config: Dict[str, Any]): """Apply share configuration.""" for share_config in config.get('shares', []): self._create_or_update_share(share_config) for recipient_config in config.get('recipients', []): self._create_or_update_recipient(recipient_config) for grant_config in config.get('grants', []): self._apply_grants(grant_config) def _create_or_update_share(self, config: Dict[str, Any]): """Create or update a share.""" try: existing = self.client.shares.get(name=config['name']) print(f"Share {config['name']} already exists") except: self.client.shares.create( name=config['name'], comment=config.get('comment', '') ) print(f"Created share: {config['name']}") # Add tables for table in config.get('tables', []): self.client.shares.update( name=config['name'], updates=[{ "action": "ADD", "data_object": { "name": table, "data_object_type": "TABLE" } }] )
Example Configuration (shares.yaml):
shares: - name: partner_analytics_share comment: Analytics data for partners tables: - catalog.analytics.customer_metrics - catalog.analytics.product_performance - catalog.analytics.sales_summary recipients: - name: acme_corp comment: ACME Corporation authentication_type: TOKEN email: data-team@acme.com - name: globex_inc comment: Globex Inc authentication_type: TOKEN email: analytics@globex.com grants: - share: partner_analytics_share recipient: acme_corp permissions: [SELECT] - share: partner_analytics_share recipient: globex_inc permissions: [SELECT]
Best Practices
1. Security
- Use token-based authentication
- Implement row-level security when needed
- Regular audit of access permissions
- Monitor for unusual access patterns
- Rotate recipient tokens periodically
2. Performance
- Share aggregated data when possible
- Use partition pruning for large tables
- Monitor query performance
- Implement caching strategies
- Limit result set sizes
3. Governance
- Document all shares and recipients
- Maintain share configuration in version control
- Regular access reviews
- Clear data ownership
- Defined SLAs for shared data
4. Monitoring
# Set up alerts for anomalous usage def check_usage_threshold(spark, share_name: str, threshold_gb: float = 100): """Alert if daily data transfer exceeds threshold.""" query = f""" SELECT SUM(bytes_transferred) / 1024 / 1024 / 1024 as gb_transferred FROM system.access.audit WHERE share_name = '{share_name}' AND date_trunc('day', request_time) = current_date() """ result = spark.sql(query).first() gb_transferred = result['gb_transferred'] or 0 if gb_transferred > threshold_gb: send_alert( f"Share {share_name} transferred {gb_transferred:.2f}GB today, " f"exceeding threshold of {threshold_gb}GB" )
Common Pitfalls to Avoid
Don't:
- Share raw PII without review
- Neglect access auditing
- Skip documentation
- Ignore usage monitoring
- Hard-code credentials
Do:
- Implement data masking for sensitive fields
- Regular access audits
- Document all shares
- Monitor usage patterns
- Use secure credential management
Complete Examples
See
/examples/ directory for:
: Complete external sharing setupexternal_data_sharing.py
: Multi-tenant implementationmulti_tenant_sharing.py
Related Skills
: Share data productsdata-products
: Ensure shared data qualitydata-quality
: Share gold layer tablesmedallion-architecture
: Automate share managementcicd-workflows