Skillshub databricks-data-handling
install
source · Clone the upstream repo
git clone https://github.com/ComeOnOliver/skillshub
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/ComeOnOliver/skillshub "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/jeremylongshore/claude-code-plugins-plus-skills/databricks-data-handling" ~/.claude/skills/comeonoliver-skillshub-databricks-data-handling && rm -rf "$T"
manifest:
skills/jeremylongshore/claude-code-plugins-plus-skills/databricks-data-handling/SKILL.mdsource content
Databricks Data Handling
Overview
Implement GDPR compliance, PII masking, data retention, and row-level security in Delta Lake with Unity Catalog. Covers data classification tagging, right-to-deletion workflows, automated retention enforcement, column-level masking functions, and subject access request (SAR) reporting.
Prerequisites
- Unity Catalog enabled
- Understanding of data classification requirements (GDPR, CCPA, HIPAA)
- Admin access for tags and masking functions
Instructions
Step 1: Classify and Tag Data
Use Unity Catalog tags to classify tables and columns for automated compliance enforcement.
-- Tag tables with classification and retention ALTER TABLE prod_catalog.silver.customers SET TAGS ('data_classification' = 'PII', 'retention_days' = '730'); ALTER TABLE prod_catalog.silver.orders SET TAGS ('data_classification' = 'CONFIDENTIAL', 'retention_days' = '365'); ALTER TABLE prod_catalog.gold.metrics SET TAGS ('data_classification' = 'INTERNAL', 'retention_days' = '1825'); -- Tag PII columns ALTER TABLE prod_catalog.silver.customers ALTER COLUMN email SET TAGS ('pii_type' = 'email'); ALTER TABLE prod_catalog.silver.customers ALTER COLUMN phone SET TAGS ('pii_type' = 'phone'); ALTER TABLE prod_catalog.silver.customers ALTER COLUMN full_name SET TAGS ('pii_type' = 'name');
Step 2: GDPR Right-to-Deletion
Delete all user data across PII-tagged tables with audit logging.
from pyspark.sql import SparkSession from datetime import datetime spark = SparkSession.builder.getOrCreate() class GDPRHandler: """Handle GDPR deletion requests across all PII-tagged tables.""" def __init__(self, catalog: str): self.catalog = catalog def find_pii_tables(self) -> list[str]: """Find all tables tagged as PII.""" result = spark.sql(f""" SELECT table_catalog, table_schema, table_name FROM {self.catalog}.information_schema.table_tags WHERE tag_name = 'data_classification' AND tag_value = 'PII' """).collect() return [f"{r.table_catalog}.{r.table_schema}.{r.table_name}" for r in result] def process_deletion(self, user_id: str, request_id: str, dry_run: bool = True) -> dict: """Delete user data from all PII tables. Returns audit record.""" pii_tables = self.find_pii_tables() audit = { "request_id": request_id, "user_id": user_id, "timestamp": datetime.utcnow().isoformat(), "dry_run": dry_run, "tables_processed": [], } for table in pii_tables: # Check if table has a user_id-like column cols = [c.name for c in spark.table(table).schema] user_col = next((c for c in cols if c in ("user_id", "customer_id", "account_id")), None) if not user_col: continue count = spark.sql( f"SELECT COUNT(*) AS cnt FROM {table} WHERE {user_col} = '{user_id}'" ).first().cnt if count > 0 and not dry_run: spark.sql(f"DELETE FROM {table} WHERE {user_col} = '{user_id}'") audit["tables_processed"].append({ "table": table, "column": user_col, "rows_affected": count, "action": "DELETED" if not dry_run else "WOULD_DELETE", }) # Log audit record if not dry_run: spark.createDataFrame([audit]).write.mode("append").saveAsTable( f"{self.catalog}.compliance.gdpr_audit_log" ) return audit # Usage gdpr = GDPRHandler("prod_catalog") # Always dry-run first report = gdpr.process_deletion("user-12345", "GDPR-2024-001", dry_run=True) for t in report["tables_processed"]: print(f" {t['table']}: {t['rows_affected']} rows {t['action']}")
Step 3: Automated Data Retention
class RetentionEnforcer: """Delete data older than retention policy set via table tags.""" def __init__(self, catalog: str): self.catalog = catalog def enforce(self, dry_run: bool = True) -> list[dict]: """Process all tables with retention_days tag.""" tagged = spark.sql(f""" SELECT table_catalog, table_schema, table_name, tag_value AS retention_days FROM {self.catalog}.information_schema.table_tags WHERE tag_name = 'retention_days' """).collect() results = [] for row in tagged: table = f"{row.table_catalog}.{row.table_schema}.{row.table_name}" retention_days = int(row.retention_days) # Find date column (prefer created_at, event_date, order_date) cols = [c.name for c in spark.table(table).schema] date_col = next( (c for c in cols if c in ("created_at", "event_date", "order_date", "timestamp")), None, ) if not date_col: continue expired = spark.sql(f""" SELECT COUNT(*) AS cnt FROM {table} WHERE {date_col} < current_timestamp() - INTERVAL {retention_days} DAYS """).first().cnt if expired > 0 and not dry_run: spark.sql(f""" DELETE FROM {table} WHERE {date_col} < current_timestamp() - INTERVAL {retention_days} DAYS """) # Clean up deleted files spark.sql(f"VACUUM {table} RETAIN 168 HOURS") results.append({ "table": table, "retention_days": retention_days, "expired_rows": expired, "action": "DELETED" if not dry_run else "WOULD_DELETE", }) return results # Schedule as a daily Databricks job enforcer = RetentionEnforcer("prod_catalog") for r in enforcer.enforce(dry_run=True): print(f" {r['table']}: {r['expired_rows']} rows > {r['retention_days']} days {r['action']}")
Step 4: Column-Level PII Masking
-- Create masking functions for different PII types CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_email(val STRING) RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val, CONCAT(LEFT(val, 1), '***@', SUBSTRING_INDEX(val, '@', -1))); CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_phone(val STRING) RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val, CONCAT('***-***-', RIGHT(val, 4))); CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_name(val STRING) RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val, CONCAT(LEFT(val, 1), REPEAT('*', LENGTH(val) - 1))); -- Apply masks to columns ALTER TABLE prod_catalog.silver.customers ALTER COLUMN email SET MASK prod_catalog.compliance.mask_email; ALTER TABLE prod_catalog.silver.customers ALTER COLUMN phone SET MASK prod_catalog.compliance.mask_phone; ALTER TABLE prod_catalog.silver.customers ALTER COLUMN full_name SET MASK prod_catalog.compliance.mask_name; -- Test: non-privileged users see masked data -- email: j***@company.com -- phone: ***-***-1234 -- name: J****
Step 5: Row-Level Security
-- Restrict data access by department/region CREATE OR REPLACE FUNCTION prod_catalog.compliance.region_filter(region STRING) RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admins'), true, region IN (SELECT allowed_region FROM prod_catalog.compliance.user_region_access WHERE user_email = current_user())); ALTER TABLE prod_catalog.gold.sales SET ROW FILTER prod_catalog.compliance.region_filter ON (region); -- Analysts only see data for their assigned regions
Step 6: Subject Access Request (SAR)
def generate_sar_report(catalog: str, user_id: str) -> dict: """Generate a GDPR Subject Access Request report.""" gdpr = GDPRHandler(catalog) pii_tables = gdpr.find_pii_tables() report = {"user_id": user_id, "generated_at": datetime.utcnow().isoformat(), "data": {}} for table in pii_tables: cols = [c.name for c in spark.table(table).schema] user_col = next((c for c in cols if c in ("user_id", "customer_id")), None) if not user_col: continue rows = spark.sql(f"SELECT * FROM {table} WHERE {user_col} = '{user_id}'").toPandas() if not rows.empty: report["data"][table] = rows.to_dict(orient="records") return report # Generate and export sar = generate_sar_report("prod_catalog", "user-12345") print(f"Found data in {len(sar['data'])} tables")
Output
- Data classification tags on tables and PII columns
- GDPR deletion workflow with dry-run and audit logging
- Automated retention enforcement via tagged policies
- Column masking functions for email, phone, name
- Row-level security restricting access by region/department
- SAR report generation for compliance requests
Error Handling
| Error | Cause | Solution |
|---|---|---|
fails | Retention below 7 days | Set minimum |
times out | Very large table | Partition deletes across multiple runs |
| Mask function error | Column type mismatch | Ensure mask function signature matches column type |
Missing column | Non-standard schema | Maintain a table-to-user-column mapping |
| Row filter performance | Complex subquery | Materialize user permissions as a small lookup table |
Examples
Quick Compliance Check
-- Find all PII-tagged tables and their masking status SELECT t.table_name, t.tag_value AS classification, COUNT(c.column_name) AS masked_columns FROM prod_catalog.information_schema.table_tags t LEFT JOIN prod_catalog.information_schema.column_tags c ON t.table_name = c.table_name AND c.tag_name = 'pii_type' WHERE t.tag_name = 'data_classification' AND t.tag_value = 'PII' GROUP BY t.table_name, t.tag_value;
Resources
Next Steps
For enterprise RBAC, see
databricks-enterprise-rbac.