DDC_Skills_for_AI_Agents_in_Construction resource-leveler
Level and optimize construction resource allocation across project schedule. Balance labor, equipment usage, and avoid overallocation while maintaining critical path.
install
source · Clone the upstream repo
git clone https://github.com/datadrivenconstruction/DDC_Skills_for_AI_Agents_in_Construction
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/datadrivenconstruction/DDC_Skills_for_AI_Agents_in_Construction "$T" && mkdir -p ~/.claude/skills && cp -r "$T/2_DDC_Book/3.3-4D-BIM-CO2-Simulation/resource-leveler" ~/.claude/skills/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-resource-leveler && rm -rf "$T"
manifest:
2_DDC_Book/3.3-4D-BIM-CO2-Simulation/resource-leveler/SKILL.mdsource content
Resource Leveler for Construction
Overview
Optimize resource allocation across construction schedules. Level labor and equipment to avoid peaks, balance workload, and maintain project deadlines while reducing costs.
Business Case
Resource leveling provides:
- Cost Reduction: Avoid overtime and idle time
- Workforce Stability: Consistent crew sizes
- Equipment Optimization: Reduce rental costs
- Realistic Schedules: Achievable resource plans
Technical Implementation
from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, date, timedelta import pandas as pd import numpy as np from collections import defaultdict @dataclass class Resource: id: str name: str resource_type: str # labor, equipment, material max_units: float cost_per_unit: float unit: str # hours, days, each @dataclass class ResourceAssignment: task_id: str resource_id: str units: float start_date: date end_date: date @dataclass class Task: id: str name: str duration: int # days start_date: date end_date: date predecessors: List[str] total_float: int is_critical: bool resource_assignments: List[ResourceAssignment] = field(default_factory=list) @dataclass class LevelingResult: success: bool original_end_date: date leveled_end_date: date tasks_moved: int peak_reduction: Dict[str, float] warnings: List[str] class ConstructionResourceLeveler: """Level resources across construction schedules.""" def __init__(self): self.resources: Dict[str, Resource] = {} self.tasks: Dict[str, Task] = {} self.assignments: List[ResourceAssignment] = [] def add_resource(self, resource: Resource): """Add a resource to the pool.""" self.resources[resource.id] = resource def add_task(self, task: Task): """Add a task to the schedule.""" self.tasks[task.id] = task def add_assignment(self, assignment: ResourceAssignment): """Assign a resource to a task.""" self.assignments.append(assignment) if assignment.task_id in self.tasks: self.tasks[assignment.task_id].resource_assignments.append(assignment) def calculate_resource_usage(self, start_date: date = None, end_date: date = None) -> pd.DataFrame: """Calculate daily resource usage.""" if not self.assignments: return pd.DataFrame() # Determine date range if start_date is None: start_date = min(a.start_date for a in self.assignments) if end_date is None: end_date = max(a.end_date for a in self.assignments) # Create date range dates = pd.date_range(start_date, end_date, freq='D') # Initialize usage matrix usage = {r_id: [0.0] * len(dates) for r_id in self.resources} # Fill in usage for assignment in self.assignments: if assignment.resource_id in usage: for i, d in enumerate(dates): if assignment.start_date <= d.date() <= assignment.end_date: usage[assignment.resource_id][i] += assignment.units df = pd.DataFrame(usage, index=dates) df.index.name = 'date' return df def identify_overallocations(self) -> List[Dict]: """Identify resource overallocations.""" usage = self.calculate_resource_usage() overallocations = [] for resource_id, resource in self.resources.items(): if resource_id in usage.columns: daily_usage = usage[resource_id] over_days = daily_usage[daily_usage > resource.max_units] if len(over_days) > 0: overallocations.append({ 'resource_id': resource_id, 'resource_name': resource.name, 'max_units': resource.max_units, 'peak_usage': daily_usage.max(), 'over_by': daily_usage.max() - resource.max_units, 'days_overallocated': len(over_days), 'first_overallocation': over_days.index[0].date(), 'worst_day': daily_usage.idxmax().date() }) return overallocations def level_resources(self, method: str = 'float_priority', protect_critical_path: bool = True, max_extension: int = 30) -> LevelingResult: """Level resources to resolve overallocations.""" original_end = max(t.end_date for t in self.tasks.values()) tasks_moved = 0 warnings = [] # Get initial overallocations initial_over = self.identify_overallocations() if not initial_over: return LevelingResult( success=True, original_end_date=original_end, leveled_end_date=original_end, tasks_moved=0, peak_reduction={}, warnings=["No overallocations found"] ) # Track peak usage before usage_before = self.calculate_resource_usage() peaks_before = {r: usage_before[r].max() for r in usage_before.columns} # Leveling loop iteration = 0 max_iterations = len(self.tasks) * 2 while iteration < max_iterations: iteration += 1 overallocations = self.identify_overallocations() if not overallocations: break # Find task to move moved = False for over in overallocations: resource_id = over['resource_id'] worst_day = over['worst_day'] # Find tasks using this resource on worst day candidates = self._find_movable_tasks( resource_id, worst_day, protect_critical_path ) if candidates: # Sort by priority (lowest float first to preserve options) candidates.sort(key=lambda t: -t.total_float) task_to_move = candidates[0] # Calculate new dates new_start, new_end = self._calculate_shift( task_to_move, resource_id, max_extension ) if new_start: self._shift_task(task_to_move.id, new_start, new_end) tasks_moved += 1 moved = True break if not moved: warnings.append("Could not resolve all overallocations") break # Calculate results usage_after = self.calculate_resource_usage() peaks_after = {r: usage_after[r].max() for r in usage_after.columns} peak_reduction = {} for r in peaks_before: if r in peaks_after: reduction = (peaks_before[r] - peaks_after[r]) / peaks_before[r] * 100 peak_reduction[r] = reduction leveled_end = max(t.end_date for t in self.tasks.values()) if leveled_end > original_end + timedelta(days=max_extension): warnings.append(f"Project extended beyond max allowed ({max_extension} days)") remaining_over = self.identify_overallocations() return LevelingResult( success=len(remaining_over) == 0, original_end_date=original_end, leveled_end_date=leveled_end, tasks_moved=tasks_moved, peak_reduction=peak_reduction, warnings=warnings ) def _find_movable_tasks(self, resource_id: str, on_date: date, protect_critical: bool) -> List[Task]: """Find tasks that can be moved to reduce overallocation.""" candidates = [] for task in self.tasks.values(): # Check if task uses this resource on this date uses_resource = any( a.resource_id == resource_id and a.start_date <= on_date <= a.end_date for a in task.resource_assignments ) if not uses_resource: continue # Check if critical path protected if protect_critical and task.is_critical: continue # Check if has float if task.total_float > 0: candidates.append(task) return candidates def _calculate_shift(self, task: Task, resource_id: str, max_extension: int) -> Tuple[date, date]: """Calculate optimal shift for a task.""" resource = self.resources[resource_id] # Try shifting forward for days in range(1, min(task.total_float + 1, max_extension + 1)): new_start = task.start_date + timedelta(days=days) new_end = task.end_date + timedelta(days=days) # Check if this resolves overallocation temp_usage = self._calculate_usage_if_moved(task.id, new_start, new_end) if temp_usage.get(resource_id, 0) <= resource.max_units: return new_start, new_end return None, None def _calculate_usage_if_moved(self, task_id: str, new_start: date, new_end: date) -> Dict[str, float]: """Calculate resource usage if task were moved.""" # Simplified: calculate peak on affected dates usage = defaultdict(float) for assignment in self.assignments: if assignment.task_id == task_id: # Use new dates for d in pd.date_range(new_start, new_end): usage[assignment.resource_id] = max( usage[assignment.resource_id], assignment.units ) else: # Use existing dates for d in pd.date_range(assignment.start_date, assignment.end_date): usage[assignment.resource_id] = max( usage[assignment.resource_id], assignment.units ) return dict(usage) def _shift_task(self, task_id: str, new_start: date, new_end: date): """Shift a task to new dates.""" task = self.tasks[task_id] delta = new_start - task.start_date # Update task task.start_date = new_start task.end_date = new_end # Update assignments for assignment in self.assignments: if assignment.task_id == task_id: assignment.start_date += delta assignment.end_date += delta def optimize_crew_size(self, resource_id: str, target_utilization: float = 0.85) -> Dict: """Recommend optimal crew size for a resource.""" usage = self.calculate_resource_usage() if resource_id not in usage.columns: return None daily_usage = usage[resource_id] resource = self.resources[resource_id] # Calculate statistics peak = daily_usage.max() avg = daily_usage.mean() working_days = (daily_usage > 0).sum() # Current utilization current_util = avg / resource.max_units if resource.max_units > 0 else 0 # Optimal size for target utilization optimal_size = avg / target_utilization return { 'resource_id': resource_id, 'current_max_units': resource.max_units, 'peak_usage': peak, 'average_usage': avg, 'working_days': int(working_days), 'current_utilization': current_util, 'recommended_max_units': round(optimal_size, 1), 'potential_savings': (resource.max_units - optimal_size) * resource.cost_per_unit * working_days } def generate_histogram(self, resource_id: str) -> pd.DataFrame: """Generate resource histogram data.""" usage = self.calculate_resource_usage() if resource_id not in usage.columns: return pd.DataFrame() resource = self.resources[resource_id] df = pd.DataFrame({ 'date': usage.index, 'usage': usage[resource_id].values, 'capacity': resource.max_units, 'overallocated': usage[resource_id].values > resource.max_units }) return df def generate_report(self) -> str: """Generate resource leveling report.""" lines = ["# Resource Leveling Report", ""] lines.append(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M')}") lines.append(f"**Resources:** {len(self.resources)}") lines.append(f"**Tasks:** {len(self.tasks)}") lines.append("") # Overallocations overallocations = self.identify_overallocations() if overallocations: lines.append("## Overallocations Found") for over in overallocations: lines.append(f"\n### {over['resource_name']}") lines.append(f"- **Max Units:** {over['max_units']}") lines.append(f"- **Peak Usage:** {over['peak_usage']}") lines.append(f"- **Days Overallocated:** {over['days_overallocated']}") lines.append(f"- **Worst Day:** {over['worst_day']}") else: lines.append("## No Overallocations") lines.append("All resources are within capacity.") # Resource utilization lines.append("\n## Resource Utilization") for resource_id in self.resources: opt = self.optimize_crew_size(resource_id) if opt: lines.append(f"\n### {self.resources[resource_id].name}") lines.append(f"- **Utilization:** {opt['current_utilization']:.1%}") lines.append(f"- **Peak:** {opt['peak_usage']:.1f}") lines.append(f"- **Average:** {opt['average_usage']:.1f}") return "\n".join(lines)
Quick Start
from datetime import date # Initialize leveler leveler = ConstructionResourceLeveler() # Add resources leveler.add_resource(Resource( id="CARP", name="Carpenters", resource_type="labor", max_units=10, cost_per_unit=75, unit="hours" )) # Add tasks leveler.add_task(Task( id="T1", name="Frame Level 1", duration=10, start_date=date(2026, 3, 1), end_date=date(2026, 3, 14), predecessors=[], total_float=5, is_critical=False )) # Add assignments leveler.add_assignment(ResourceAssignment( task_id="T1", resource_id="CARP", units=8, start_date=date(2026, 3, 1), end_date=date(2026, 3, 14) )) # Check overallocations overallocations = leveler.identify_overallocations() for over in overallocations: print(f"{over['resource_name']}: {over['peak_usage']} vs {over['max_units']} max") # Level resources result = leveler.level_resources(protect_critical_path=True) print(f"Tasks moved: {result.tasks_moved}") print(f"End date change: {result.original_end_date} -> {result.leveled_end_date}") # Generate report print(leveler.generate_report())
Dependencies
pip install pandas numpy