Claude-skill-registry input-validation
Validate construction data inputs before processing: cost estimates, schedules, BIM data, field reports. Catch errors early with domain-specific rules.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/input-validation" ~/.claude/skills/majiayu000-claude-skill-registry-input-validation && rm -rf "$T"
manifest:
skills/data/input-validation/SKILL.mdsource content
Input Validation for Construction Data
Overview
Validate incoming construction data before processing to catch errors early. Domain-specific validation rules for estimates, schedules, BIM exports, and field data.
Validation Framework
Core Validator Class
from dataclasses import dataclass, field from typing import List, Dict, Any, Callable, Optional from enum import Enum import re from datetime import datetime class ValidationSeverity(Enum): ERROR = "error" # Must fix, blocks processing WARNING = "warning" # Should review, allows processing INFO = "info" # FYI, no action needed @dataclass class ValidationIssue: field: str message: str severity: ValidationSeverity value: Any = None suggestion: str = None @dataclass class ValidationResult: is_valid: bool issues: List[ValidationIssue] = field(default_factory=list) def add_error(self, field: str, message: str, value: Any = None, suggestion: str = None): self.issues.append(ValidationIssue(field, message, ValidationSeverity.ERROR, value, suggestion)) self.is_valid = False def add_warning(self, field: str, message: str, value: Any = None, suggestion: str = None): self.issues.append(ValidationIssue(field, message, ValidationSeverity.WARNING, value, suggestion)) def add_info(self, field: str, message: str, value: Any = None): self.issues.append(ValidationIssue(field, message, ValidationSeverity.INFO, value)) @property def errors(self) -> List[ValidationIssue]: return [i for i in self.issues if i.severity == ValidationSeverity.ERROR] @property def warnings(self) -> List[ValidationIssue]: return [i for i in self.issues if i.severity == ValidationSeverity.WARNING] def to_report(self) -> str: lines = ["VALIDATION REPORT", "=" * 50] lines.append(f"Status: {'PASSED' if self.is_valid else 'FAILED'}") lines.append(f"Errors: {len(self.errors)}, Warnings: {len(self.warnings)}") lines.append("") for issue in self.issues: icon = "❌" if issue.severity == ValidationSeverity.ERROR else "⚠️" if issue.severity == ValidationSeverity.WARNING else "ℹ️" lines.append(f"{icon} [{issue.field}] {issue.message}") if issue.suggestion: lines.append(f" Suggestion: {issue.suggestion}") return "\n".join(lines)
Cost Estimate Validation
class CostEstimateValidator: """Validate cost estimate inputs.""" # Typical cost ranges per CSI division ($/SF) TYPICAL_RANGES = { '03': (15, 45), # Concrete '04': (8, 25), # Masonry '05': (12, 35), # Metals '06': (5, 20), # Wood/Plastics '07': (8, 30), # Thermal/Moisture '08': (15, 50), # Openings '09': (10, 40), # Finishes '22': (8, 25), # Plumbing '23': (12, 40), # HVAC '26': (10, 35), # Electrical } def validate(self, estimate_data: Dict[str, Any]) -> ValidationResult: result = ValidationResult(is_valid=True) # Required fields self._validate_required_fields(estimate_data, result) # Line item validation if 'line_items' in estimate_data: self._validate_line_items(estimate_data['line_items'], result) # Total validation self._validate_totals(estimate_data, result) # Cost range validation if 'gross_area' in estimate_data: self._validate_cost_ranges(estimate_data, result) return result def _validate_required_fields(self, data: dict, result: ValidationResult): required = ['project_name', 'estimate_date', 'line_items', 'total'] for field in required: if field not in data or data[field] is None: result.add_error(field, f"Required field '{field}' is missing") def _validate_line_items(self, items: list, result: ValidationResult): for i, item in enumerate(items): # Check for negative values if item.get('quantity', 0) < 0: result.add_error(f"line_items[{i}].quantity", "Quantity cannot be negative", item.get('quantity')) if item.get('unit_cost', 0) < 0: result.add_error(f"line_items[{i}].unit_cost", "Unit cost cannot be negative", item.get('unit_cost')) # Check for missing descriptions if not item.get('description'): result.add_warning(f"line_items[{i}].description", "Line item missing description") # Check for valid CSI code if item.get('csi_code'): if not re.match(r'^\d{2}\s?\d{2}\s?\d{2}$', item['csi_code']): result.add_warning(f"line_items[{i}].csi_code", f"Invalid CSI code format: {item['csi_code']}", suggestion="Use format: XX XX XX") # Check for zero amounts amount = item.get('quantity', 0) * item.get('unit_cost', 0) if amount == 0: result.add_warning(f"line_items[{i}]", "Line item has zero amount") def _validate_totals(self, data: dict, result: ValidationResult): if 'line_items' not in data or 'total' not in data: return calculated = sum( item.get('quantity', 0) * item.get('unit_cost', 0) for item in data['line_items'] ) declared = data['total'] variance = abs(calculated - declared) if variance > 0.01: result.add_error("total", f"Total mismatch: calculated {calculated:.2f}, declared {declared:.2f}", variance) def _validate_cost_ranges(self, data: dict, result: ValidationResult): gross_area = data['gross_area'] for item in data.get('line_items', []): csi_div = item.get('csi_code', '')[:2] if csi_div in self.TYPICAL_RANGES: amount = item.get('quantity', 0) * item.get('unit_cost', 0) cost_per_sf = amount / gross_area if gross_area > 0 else 0 low, high = self.TYPICAL_RANGES[csi_div] if cost_per_sf < low * 0.5 or cost_per_sf > high * 2: result.add_warning( f"line_items[{item.get('description', 'Unknown')}]", f"Cost ${cost_per_sf:.2f}/SF outside typical range ${low}-${high}/SF for Division {csi_div}", cost_per_sf, "Review unit costs and quantities" )
Schedule Validation
class ScheduleValidator: """Validate schedule/planning inputs.""" def validate(self, schedule_data: Dict[str, Any]) -> ValidationResult: result = ValidationResult(is_valid=True) # Required fields self._validate_required_fields(schedule_data, result) # Task validation if 'tasks' in schedule_data: self._validate_tasks(schedule_data['tasks'], result) self._validate_dependencies(schedule_data['tasks'], result) self._validate_resources(schedule_data['tasks'], result) return result def _validate_required_fields(self, data: dict, result: ValidationResult): required = ['project_name', 'start_date', 'tasks'] for field in required: if field not in data: result.add_error(field, f"Required field '{field}' is missing") def _validate_tasks(self, tasks: list, result: ValidationResult): task_ids = set() for i, task in enumerate(tasks): # Check for duplicate IDs task_id = task.get('id') if task_id in task_ids: result.add_error(f"tasks[{i}].id", f"Duplicate task ID: {task_id}") task_ids.add(task_id) # Check dates start = task.get('start_date') end = task.get('end_date') if start and end: try: start_dt = datetime.fromisoformat(start) if isinstance(start, str) else start end_dt = datetime.fromisoformat(end) if isinstance(end, str) else end if end_dt < start_dt: result.add_error(f"tasks[{i}]", f"End date before start date", f"{start} -> {end}") # Check for unrealistic durations duration = (end_dt - start_dt).days if duration > 365: result.add_warning(f"tasks[{i}]", f"Task duration exceeds 1 year ({duration} days)") if duration == 0 and task.get('type') != 'milestone': result.add_warning(f"tasks[{i}]", "Task has zero duration but is not marked as milestone") except ValueError as e: result.add_error(f"tasks[{i}]", f"Invalid date format: {e}") # Check for missing duration if not task.get('duration') and not (start and end): result.add_error(f"tasks[{i}]", "Task missing duration or start/end dates") def _validate_dependencies(self, tasks: list, result: ValidationResult): task_ids = {t.get('id') for t in tasks} task_dict = {t.get('id'): t for t in tasks} for task in tasks: predecessors = task.get('predecessors', []) for pred_id in predecessors: # Check predecessor exists if pred_id not in task_ids: result.add_error(f"tasks[{task.get('id')}].predecessors", f"Predecessor '{pred_id}' does not exist") continue # Check for logical sequence (if dates available) pred = task_dict.get(pred_id) if pred and pred.get('end_date') and task.get('start_date'): pred_end = datetime.fromisoformat(pred['end_date']) if isinstance(pred['end_date'], str) else pred['end_date'] task_start = datetime.fromisoformat(task['start_date']) if isinstance(task['start_date'], str) else task['start_date'] if task_start < pred_end: result.add_error( f"tasks[{task.get('id')}]", f"Task starts before predecessor '{pred_id}' ends", f"Pred ends: {pred_end}, Task starts: {task_start}" ) def _validate_resources(self, tasks: list, result: ValidationResult): # Check for resource over-allocation by date resource_usage = {} for task in tasks: resources = task.get('resources', []) start = task.get('start_date') end = task.get('end_date') if not (resources and start and end): continue # Simplified: just check if any resource assigned to multiple tasks for resource in resources: res_id = resource.get('id') or resource.get('name') if res_id not in resource_usage: resource_usage[res_id] = [] resource_usage[res_id].append({ 'task': task.get('id'), 'start': start, 'end': end, 'allocation': resource.get('allocation', 100) }) # Check allocations for res_id, assignments in resource_usage.items(): if len(assignments) > 1: # Simple overlap check total_allocation = sum(a['allocation'] for a in assignments) if total_allocation > 100: result.add_warning( f"resource[{res_id}]", f"Resource may be over-allocated ({total_allocation}%)", suggestion="Check for overlapping assignments" )
BIM Data Validation
class BIMDataValidator: """Validate BIM export data (IFC, COBie, etc.).""" def validate(self, bim_data: Dict[str, Any]) -> ValidationResult: result = ValidationResult(is_valid=True) # Check element data if 'elements' in bim_data: self._validate_elements(bim_data['elements'], result) # Check property sets if 'property_sets' in bim_data: self._validate_properties(bim_data['property_sets'], result) # Check spatial structure if 'spatial_structure' in bim_data: self._validate_spatial(bim_data['spatial_structure'], result) return result def _validate_elements(self, elements: list, result: ValidationResult): guids = set() for i, elem in enumerate(elements): # Check for unique GUIDs guid = elem.get('guid') if guid in guids: result.add_error(f"elements[{i}].guid", f"Duplicate GUID: {guid}") guids.add(guid) # Check for required properties if not elem.get('ifc_type'): result.add_warning(f"elements[{i}]", "Element missing IFC type") if not elem.get('name'): result.add_warning(f"elements[{i}]", "Element missing name") # Check geometry if not elem.get('geometry') and not elem.get('location'): result.add_warning(f"elements[{i}]", "Element has no geometry or location") # Check for valid quantities for qty_name in ['area', 'volume', 'length']: if qty_name in elem and elem[qty_name] < 0: result.add_error(f"elements[{i}].{qty_name}", f"Negative {qty_name} value", elem[qty_name]) def _validate_properties(self, property_sets: list, result: ValidationResult): for pset in property_sets: pset_name = pset.get('name', 'Unknown') # Check for empty property sets if not pset.get('properties'): result.add_warning(f"property_set[{pset_name}]", "Property set has no properties") # Check property values for prop in pset.get('properties', []): if prop.get('value') is None: result.add_info(f"property_set[{pset_name}].{prop.get('name')}", "Property has null value") def _validate_spatial(self, spatial: dict, result: ValidationResult): # Check for proper hierarchy if not spatial.get('site'): result.add_warning("spatial_structure", "No site defined") if not spatial.get('building'): result.add_warning("spatial_structure", "No building defined") if not spatial.get('levels') or len(spatial.get('levels', [])) == 0: result.add_warning("spatial_structure", "No levels/floors defined")
Field Data Validation
class FieldDataValidator: """Validate field/site data inputs.""" def validate(self, field_data: Dict[str, Any]) -> ValidationResult: result = ValidationResult(is_valid=True) # Daily report validation if field_data.get('type') == 'daily_report': self._validate_daily_report(field_data, result) # Inspection data if field_data.get('type') == 'inspection': self._validate_inspection(field_data, result) # Progress data if field_data.get('type') == 'progress': self._validate_progress(field_data, result) return result def _validate_daily_report(self, data: dict, result: ValidationResult): required = ['date', 'weather', 'workforce'] for field in required: if field not in data: result.add_error(field, f"Daily report missing '{field}'") # Validate workforce if 'workforce' in data: total = sum(w.get('count', 0) for w in data['workforce']) if total == 0: result.add_warning("workforce", "No workers reported on-site") if total > 500: result.add_warning("workforce", f"Unusually high workforce count: {total}") # Validate date if 'date' in data: try: report_date = datetime.fromisoformat(data['date']) if isinstance(data['date'], str) else data['date'] if report_date > datetime.now(): result.add_error("date", "Report date is in the future") except ValueError: result.add_error("date", "Invalid date format") def _validate_inspection(self, data: dict, result: ValidationResult): required = ['inspection_type', 'date', 'inspector', 'result'] for field in required: if field not in data: result.add_error(field, f"Inspection missing '{field}'") # Check result value valid_results = ['pass', 'fail', 'conditional', 'not_applicable'] if data.get('result') and data['result'].lower() not in valid_results: result.add_warning("result", f"Non-standard inspection result: {data['result']}") def _validate_progress(self, data: dict, result: ValidationResult): # Check percentage values if 'percent_complete' in data: pct = data['percent_complete'] if pct < 0 or pct > 100: result.add_error("percent_complete", f"Invalid percentage: {pct}", suggestion="Must be 0-100") # Check for regression (if previous value available) if 'previous_percent' in data and 'percent_complete' in data: if data['percent_complete'] < data['previous_percent']: result.add_warning("percent_complete", "Progress decreased from previous report", f"{data['previous_percent']}% -> {data['percent_complete']}%")
Usage Examples
# Validate a cost estimate estimate = { 'project_name': 'Office Building', 'estimate_date': '2026-01-15', 'gross_area': 50000, 'line_items': [ {'description': 'Concrete', 'csi_code': '03 30 00', 'quantity': 5000, 'unit_cost': 150}, {'description': 'Steel', 'csi_code': '05 12 00', 'quantity': 200, 'unit_cost': 2500}, ], 'total': 1250000 } validator = CostEstimateValidator() result = validator.validate(estimate) print(result.to_report()) # Validate before processing if result.is_valid: process_estimate(estimate) else: print("Fix errors before processing") for error in result.errors: print(f" - {error.field}: {error.message}")
Integration with DDC Pipeline
# Validate all inputs before pipeline execution def validate_pipeline_inputs(inputs: dict) -> bool: validators = { 'estimate': CostEstimateValidator(), 'schedule': ScheduleValidator(), 'bim_data': BIMDataValidator(), 'field_data': FieldDataValidator() } all_valid = True for input_type, data in inputs.items(): if input_type in validators: result = validators[input_type].validate(data) if not result.is_valid: print(f"\n{input_type.upper()} VALIDATION FAILED:") print(result.to_report()) all_valid = False return all_valid
Resources
- Data Quality Best Practices: Validate early, validate often
- Construction Data Standards: CSI, IFC, COBie specifications
- Error Handling: Always provide actionable suggestions