Claude-skill-registry langchain-data-handling
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/langchain-data-handling" ~/.claude/skills/majiayu000-claude-skill-registry-langchain-data-handling && rm -rf "$T"
manifest:
skills/data/langchain-data-handling/SKILL.mdsource content
LangChain Data Handling
Overview
Best practices for handling sensitive data, PII protection, and compliance in LangChain applications.
Prerequisites
- Understanding of data privacy regulations (GDPR, CCPA)
- LangChain application processing user data
- Data classification framework
Instructions
Step 1: PII Detection and Masking
import re from typing import List, Tuple from dataclasses import dataclass @dataclass class PIIPattern: name: str pattern: str replacement: str PII_PATTERNS = [ PIIPattern("email", r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "[EMAIL]"), PIIPattern("phone", r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "[PHONE]"), PIIPattern("ssn", r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]"), PIIPattern("credit_card", r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "[CREDIT_CARD]"), PIIPattern("ip_address", r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", "[IP_ADDRESS]"), PIIPattern("date_of_birth", r"\b\d{1,2}/\d{1,2}/\d{2,4}\b", "[DOB]"), ] class PIIDetector: """Detect and mask PII in text.""" def __init__(self, patterns: List[PIIPattern] = None): self.patterns = patterns or PII_PATTERNS def detect(self, text: str) -> List[Tuple[str, str, int, int]]: """Detect PII in text. Returns list of (type, value, start, end).""" findings = [] for pattern in self.patterns: for match in re.finditer(pattern.pattern, text, re.IGNORECASE): findings.append(( pattern.name, match.group(), match.start(), match.end() )) return findings def mask(self, text: str) -> str: """Mask all PII in text.""" masked = text for pattern in self.patterns: masked = re.sub(pattern.pattern, pattern.replacement, masked, flags=re.IGNORECASE) return masked def redact(self, text: str) -> Tuple[str, dict]: """Redact PII and return mapping for restoration.""" redactions = {} counter = {} def replace(match, pattern_name, replacement): count = counter.get(pattern_name, 0) counter[pattern_name] = count + 1 key = f"{replacement[1:-1]}_{count}" redactions[key] = match.group() return f"[{key}]" result = text for pattern in self.patterns: result = re.sub( pattern.pattern, lambda m, p=pattern: replace(m, p.name, p.replacement), result, flags=re.IGNORECASE ) return result, redactions # Usage detector = PIIDetector() text = "Contact john@example.com or call 555-123-4567" masked = detector.mask(text) # "Contact [EMAIL] or call [PHONE]"
Step 2: Pre-processing Pipeline
from langchain_core.runnables import RunnableLambda, RunnablePassthrough def create_privacy_pipeline(chain): """Wrap chain with PII protection.""" detector = PIIDetector() def preprocess(input_data: dict) -> dict: """Mask PII before sending to LLM.""" if "input" in input_data: masked, redactions = detector.redact(input_data["input"]) return { **input_data, "input": masked, "_redactions": redactions } return input_data def postprocess(output: str, redactions: dict = None) -> str: """Restore redacted values in output if needed.""" # Note: Generally we DON'T restore PII in outputs # This is just for cases where it's required return output privacy_chain = ( RunnableLambda(preprocess) | chain ) return privacy_chain # Usage from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate llm = ChatOpenAI(model="gpt-4o-mini") prompt = ChatPromptTemplate.from_template("Summarize: {input}") chain = prompt | llm safe_chain = create_privacy_pipeline(chain) result = safe_chain.invoke({"input": "User john@example.com reported an issue"}) # LLM sees: "User [EMAIL_0] reported an issue"
Step 3: Data Retention Policies
from datetime import datetime, timedelta from typing import Optional import hashlib class DataRetentionManager: """Manage data retention for LLM interactions.""" def __init__(self, retention_days: int = 30): self.retention_days = retention_days self.storage = {} # Replace with actual storage def store_interaction( self, user_id: str, input_text: str, output_text: str, metadata: dict = None ) -> str: """Store interaction with retention policy.""" interaction_id = hashlib.sha256( f"{user_id}{datetime.now().isoformat()}{input_text}".encode() ).hexdigest()[:16] # Mask PII before storage detector = PIIDetector() self.storage[interaction_id] = { "user_id_hash": hashlib.sha256(user_id.encode()).hexdigest(), "input_masked": detector.mask(input_text), "output_masked": detector.mask(output_text), "created_at": datetime.now().isoformat(), "expires_at": (datetime.now() + timedelta(days=self.retention_days)).isoformat(), "metadata": metadata or {} } return interaction_id def cleanup_expired(self) -> int: """Remove expired interactions.""" now = datetime.now() expired = [ k for k, v in self.storage.items() if datetime.fromisoformat(v["expires_at"]) < now ] for key in expired: del self.storage[key] return len(expired) def delete_user_data(self, user_id: str) -> int: """GDPR right to erasure - delete all user data.""" user_hash = hashlib.sha256(user_id.encode()).hexdigest() to_delete = [ k for k, v in self.storage.items() if v["user_id_hash"] == user_hash ] for key in to_delete: del self.storage[key] return len(to_delete)
Step 4: Consent Management
from enum import Enum from pydantic import BaseModel from datetime import datetime class ConsentType(str, Enum): LLM_PROCESSING = "llm_processing" DATA_RETENTION = "data_retention" ANALYTICS = "analytics" TRAINING = "training" # For fine-tuning class UserConsent(BaseModel): user_id: str consents: dict[ConsentType, bool] updated_at: datetime ip_address: str = None class ConsentManager: """Manage user consent for data processing.""" def __init__(self): self.consents = {} def set_consent(self, user_consent: UserConsent) -> None: self.consents[user_consent.user_id] = user_consent def check_consent(self, user_id: str, consent_type: ConsentType) -> bool: """Check if user has given consent.""" if user_id not in self.consents: return False return self.consents[user_id].consents.get(consent_type, False) def require_consent(self, consent_type: ConsentType): """Decorator to require consent before processing.""" def decorator(func): async def wrapper(user_id: str, *args, **kwargs): if not self.check_consent(user_id, consent_type): raise PermissionError( f"User {user_id} has not consented to {consent_type.value}" ) return await func(user_id, *args, **kwargs) return wrapper return decorator # Usage consent_manager = ConsentManager() @consent_manager.require_consent(ConsentType.LLM_PROCESSING) async def process_with_llm(user_id: str, input_text: str): return await chain.ainvoke({"input": input_text})
Step 5: Audit Logging
import json from datetime import datetime from typing import Any class AuditLogger: """Audit log for data access and processing.""" def __init__(self, log_file: str = "audit.jsonl"): self.log_file = log_file def log( self, action: str, user_id: str, resource: str, details: dict = None, outcome: str = "success" ) -> None: """Log an audit event.""" event = { "timestamp": datetime.now().isoformat(), "action": action, "user_id_hash": hashlib.sha256(user_id.encode()).hexdigest(), "resource": resource, "outcome": outcome, "details": details or {} } with open(self.log_file, "a") as f: f.write(json.dumps(event) + "\n") def log_llm_call( self, user_id: str, model: str, prompt_tokens: int, has_pii: bool ) -> None: """Log LLM API call.""" self.log( action="llm_call", user_id=user_id, resource=f"model/{model}", details={ "prompt_tokens": prompt_tokens, "pii_detected": has_pii } ) # Callback for automatic audit logging class AuditCallback(BaseCallbackHandler): def __init__(self, audit_logger: AuditLogger, user_id: str): self.audit_logger = audit_logger self.user_id = user_id def on_llm_end(self, response, **kwargs) -> None: usage = response.llm_output.get("token_usage", {}) if response.llm_output else {} self.audit_logger.log_llm_call( user_id=self.user_id, model=response.llm_output.get("model_name", "unknown") if response.llm_output else "unknown", prompt_tokens=usage.get("prompt_tokens", 0), has_pii=False # Set based on detection )
Data Handling Checklist
- PII detection and masking implemented
- Data retention policies defined
- Consent management in place
- Audit logging enabled
- Right to erasure (GDPR) supported
- Data minimization practiced
- Encryption at rest and in transit
Resources
Next Steps
Use
langchain-security-basics for additional security measures.