DDC_Skills_for_AI_Agents_in_Construction historical-cost-analyzer
Analyze historical construction costs for benchmarking, trend analysis, and estimating calibration. Compare projects, track escalation, identify patterns.
install
source · Clone the upstream repo
git clone https://github.com/datadrivenconstruction/DDC_Skills_for_AI_Agents_in_Construction
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/datadrivenconstruction/DDC_Skills_for_AI_Agents_in_Construction "$T" && mkdir -p ~/.claude/skills && cp -r "$T/2_DDC_Book/3.1-Cost-Estimation/historical-cost-analyzer" ~/.claude/skills/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-historical-cost- && rm -rf "$T"
manifest:
2_DDC_Book/3.1-Cost-Estimation/historical-cost-analyzer/SKILL.mdsource content
Historical Cost Analyzer for Construction
Overview
Analyze historical construction cost data for benchmarking, escalation tracking, and estimating calibration. Compare similar projects, identify cost drivers, and improve future estimates.
Business Case
Historical cost analysis enables:
- Benchmarking: Compare current estimates to past projects
- Calibration: Improve estimating accuracy using actual data
- Trends: Track cost escalation and market changes
- Risk Assessment: Identify cost drivers and overrun patterns
Technical Implementation
from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Tuple import pandas as pd import numpy as np from datetime import datetime from scipy import stats @dataclass class CostBenchmark: metric_name: str value: float unit: str percentile_25: float percentile_50: float percentile_75: float sample_size: int project_types: List[str] @dataclass class EscalationAnalysis: from_year: int to_year: int annual_rate: float total_change: float category: str confidence: float @dataclass class CostDriver: factor: str impact_percentage: float correlation: float description: str class HistoricalCostAnalyzer: """Analyze historical construction costs.""" # RSMeans City Cost Indexes (sample - would be loaded from database) LOCATION_FACTORS = { 'New York': 1.32, 'San Francisco': 1.28, 'Los Angeles': 1.15, 'Chicago': 1.12, 'Houston': 0.92, 'Dallas': 0.89, 'Phoenix': 0.93, 'Atlanta': 0.91, 'Denver': 1.02, 'Seattle': 1.08, 'National Average': 1.00 } # Historical cost indices by year COST_INDICES = { 2015: 100.0, 2016: 102.1, 2017: 105.3, 2018: 109.2, 2019: 112.5, 2020: 114.8, 2021: 121.4, 2022: 135.6, 2023: 142.3, 2024: 148.7, 2025: 154.2, 2026: 160.0 } def __init__(self, historical_data: pd.DataFrame = None): self.data = historical_data self.benchmarks: Dict[str, CostBenchmark] = {} def load_data(self, data: pd.DataFrame): """Load historical project data.""" self.data = data.copy() # Normalize data if 'completion_year' not in self.data.columns and 'completion_date' in self.data.columns: self.data['completion_year'] = pd.to_datetime(self.data['completion_date']).dt.year # Calculate key metrics if 'gross_area' in self.data.columns and 'final_cost' in self.data.columns: self.data['cost_per_sf'] = self.data['final_cost'] / self.data['gross_area'] if 'original_estimate' in self.data.columns and 'final_cost' in self.data.columns: self.data['overrun_pct'] = ((self.data['final_cost'] - self.data['original_estimate']) / self.data['original_estimate'] * 100) def normalize_to_year(self, costs: pd.Series, from_years: pd.Series, to_year: int = 2026) -> pd.Series: """Normalize costs to a common year using cost indices.""" normalized = costs.copy() for i, (cost, year) in enumerate(zip(costs, from_years)): if pd.notna(cost) and pd.notna(year): year = int(year) if year in self.COST_INDICES and to_year in self.COST_INDICES: factor = self.COST_INDICES[to_year] / self.COST_INDICES[year] normalized.iloc[i] = cost * factor return normalized def normalize_to_location(self, costs: pd.Series, locations: pd.Series, to_location: str = 'National Average') -> pd.Series: """Normalize costs to a common location.""" normalized = costs.copy() to_factor = self.LOCATION_FACTORS.get(to_location, 1.0) for i, (cost, loc) in enumerate(zip(costs, locations)): if pd.notna(cost) and loc in self.LOCATION_FACTORS: from_factor = self.LOCATION_FACTORS[loc] normalized.iloc[i] = cost * (to_factor / from_factor) return normalized def calculate_benchmarks(self, project_type: str = None, year_range: Tuple[int, int] = None) -> Dict[str, CostBenchmark]: """Calculate cost benchmarks from historical data.""" df = self.data.copy() # Filter by project type if project_type and 'project_type' in df.columns: df = df[df['project_type'] == project_type] # Filter by year range if year_range and 'completion_year' in df.columns: df = df[(df['completion_year'] >= year_range[0]) & (df['completion_year'] <= year_range[1])] benchmarks = {} # Cost per SF if 'cost_per_sf' in df.columns: values = df['cost_per_sf'].dropna() if len(values) > 0: benchmarks['cost_per_sf'] = CostBenchmark( metric_name='Cost per SF', value=values.median(), unit='$/SF', percentile_25=values.quantile(0.25), percentile_50=values.quantile(0.50), percentile_75=values.quantile(0.75), sample_size=len(values), project_types=[project_type] if project_type else df['project_type'].unique().tolist() ) # Overrun percentage if 'overrun_pct' in df.columns: values = df['overrun_pct'].dropna() if len(values) > 0: benchmarks['overrun_pct'] = CostBenchmark( metric_name='Cost Overrun', value=values.median(), unit='%', percentile_25=values.quantile(0.25), percentile_50=values.quantile(0.50), percentile_75=values.quantile(0.75), sample_size=len(values), project_types=[project_type] if project_type else df['project_type'].unique().tolist() ) self.benchmarks.update(benchmarks) return benchmarks def calculate_escalation(self, category: str = 'overall', from_year: int = 2020, to_year: int = 2026) -> EscalationAnalysis: """Calculate cost escalation between years.""" if from_year in self.COST_INDICES and to_year in self.COST_INDICES: from_index = self.COST_INDICES[from_year] to_index = self.COST_INDICES[to_year] total_change = (to_index - from_index) / from_index years = to_year - from_year annual_rate = (to_index / from_index) ** (1 / years) - 1 if years > 0 else 0 return EscalationAnalysis( from_year=from_year, to_year=to_year, annual_rate=annual_rate, total_change=total_change, category=category, confidence=0.95 ) return None def identify_cost_drivers(self, target_col: str = 'cost_per_sf') -> List[CostDriver]: """Identify factors that drive costs.""" if self.data is None or target_col not in self.data.columns: return [] drivers = [] target = self.data[target_col].dropna() # Analyze numeric columns numeric_cols = self.data.select_dtypes(include=[np.number]).columns exclude = [target_col, 'final_cost', 'original_estimate'] for col in numeric_cols: if col not in exclude: valid_mask = self.data[col].notna() & self.data[target_col].notna() if valid_mask.sum() > 10: corr, p_value = stats.pearsonr( self.data.loc[valid_mask, col], self.data.loc[valid_mask, target_col] ) if abs(corr) > 0.3 and p_value < 0.05: impact = corr * self.data[col].std() / target.std() * 100 drivers.append(CostDriver( factor=col, impact_percentage=abs(impact), correlation=corr, description=f"{'Positive' if corr > 0 else 'Negative'} correlation with {target_col}" )) # Analyze categorical columns categorical_cols = self.data.select_dtypes(include=['object', 'category']).columns for col in categorical_cols: if col not in ['project_id', 'project_name']: groups = self.data.groupby(col)[target_col].mean() if len(groups) > 1: variance = groups.var() overall_var = target.var() if variance / overall_var > 0.1: drivers.append(CostDriver( factor=col, impact_percentage=variance / overall_var * 100, correlation=0, description=f"Categorical factor with significant cost variation" )) return sorted(drivers, key=lambda x: -x.impact_percentage) def compare_to_benchmark(self, estimate: Dict, project_type: str = None) -> Dict: """Compare an estimate to historical benchmarks.""" if project_type: self.calculate_benchmarks(project_type) comparison = {} # Cost per SF comparison if 'cost_per_sf' in estimate and 'cost_per_sf' in self.benchmarks: benchmark = self.benchmarks['cost_per_sf'] value = estimate['cost_per_sf'] percentile = stats.percentileofscore( self.data['cost_per_sf'].dropna(), value ) comparison['cost_per_sf'] = { 'estimate': value, 'benchmark_median': benchmark.value, 'benchmark_range': (benchmark.percentile_25, benchmark.percentile_75), 'percentile': percentile, 'status': 'within_range' if benchmark.percentile_25 <= value <= benchmark.percentile_75 else 'outside_range' } return comparison def find_similar_projects(self, criteria: Dict, n: int = 10) -> pd.DataFrame: """Find similar historical projects.""" df = self.data.copy() # Filter by criteria if 'project_type' in criteria: df = df[df['project_type'] == criteria['project_type']] if 'gross_area' in criteria: target = criteria['gross_area'] tolerance = criteria.get('area_tolerance', 0.3) df = df[(df['gross_area'] >= target * (1 - tolerance)) & (df['gross_area'] <= target * (1 + tolerance))] if 'location' in criteria and 'location' in df.columns: df = df[df['location'] == criteria['location']] if 'year_range' in criteria: df = df[(df['completion_year'] >= criteria['year_range'][0]) & (df['completion_year'] <= criteria['year_range'][1])] # Sort by similarity (simple: by area difference) if 'gross_area' in criteria and 'gross_area' in df.columns: df['similarity'] = 1 - abs(df['gross_area'] - criteria['gross_area']) / criteria['gross_area'] df = df.sort_values('similarity', ascending=False) return df.head(n) def analyze_overrun_patterns(self) -> Dict: """Analyze patterns in cost overruns.""" if 'overrun_pct' not in self.data.columns: return {} analysis = {} # Overall statistics overruns = self.data['overrun_pct'].dropna() analysis['overall'] = { 'mean': overruns.mean(), 'median': overruns.median(), 'std': overruns.std(), 'projects_over_budget': (overruns > 0).sum(), 'projects_under_budget': (overruns < 0).sum(), 'pct_over_budget': (overruns > 0).mean() * 100 } # By project type if 'project_type' in self.data.columns: by_type = self.data.groupby('project_type')['overrun_pct'].agg(['mean', 'std', 'count']) analysis['by_type'] = by_type.to_dict('index') # By size category if 'gross_area' in self.data.columns: self.data['size_category'] = pd.cut( self.data['gross_area'], bins=[0, 10000, 50000, 100000, np.inf], labels=['Small (<10k SF)', 'Medium (10-50k SF)', 'Large (50-100k SF)', 'Very Large (>100k SF)'] ) by_size = self.data.groupby('size_category')['overrun_pct'].agg(['mean', 'std', 'count']) analysis['by_size'] = by_size.to_dict('index') return analysis def generate_report(self, project_type: str = None) -> str: """Generate comprehensive cost analysis report.""" lines = ["# Historical Cost Analysis Report", ""] lines.append(f"**Generated:** {datetime.now().strftime('%Y-%m-%d')}") lines.append(f"**Projects Analyzed:** {len(self.data):,}") if project_type: lines.append(f"**Project Type:** {project_type}") lines.append("") # Benchmarks benchmarks = self.calculate_benchmarks(project_type) if benchmarks: lines.append("## Cost Benchmarks") for name, bm in benchmarks.items(): lines.append(f"\n### {bm.metric_name}") lines.append(f"- **Median:** {bm.value:.2f} {bm.unit}") lines.append(f"- **25th Percentile:** {bm.percentile_25:.2f} {bm.unit}") lines.append(f"- **75th Percentile:** {bm.percentile_75:.2f} {bm.unit}") lines.append(f"- **Sample Size:** {bm.sample_size}") # Escalation lines.append("\n## Cost Escalation") esc = self.calculate_escalation(from_year=2020, to_year=2026) if esc: lines.append(f"- **Period:** {esc.from_year} to {esc.to_year}") lines.append(f"- **Annual Rate:** {esc.annual_rate:.1%}") lines.append(f"- **Total Change:** {esc.total_change:.1%}") # Cost Drivers drivers = self.identify_cost_drivers() if drivers: lines.append("\n## Key Cost Drivers") for driver in drivers[:5]: lines.append(f"- **{driver.factor}:** {driver.impact_percentage:.1f}% impact (r={driver.correlation:.2f})") # Overrun Analysis overrun_analysis = self.analyze_overrun_patterns() if 'overall' in overrun_analysis: lines.append("\n## Overrun Analysis") overall = overrun_analysis['overall'] lines.append(f"- **Average Overrun:** {overall['mean']:.1f}%") lines.append(f"- **Projects Over Budget:** {overall['pct_over_budget']:.1f}%") return "\n".join(lines)
Quick Start
import pandas as pd # Load historical data historical = pd.read_excel("historical_projects.xlsx") # Initialize analyzer analyzer = HistoricalCostAnalyzer() analyzer.load_data(historical) # Calculate benchmarks for office buildings benchmarks = analyzer.calculate_benchmarks(project_type='Office') print(f"Office median cost: ${benchmarks['cost_per_sf'].value:.2f}/SF") # Calculate escalation escalation = analyzer.calculate_escalation(from_year=2020, to_year=2026) print(f"Annual escalation: {escalation.annual_rate:.1%}") # Find similar projects similar = analyzer.find_similar_projects({ 'project_type': 'Office', 'gross_area': 50000, 'year_range': (2020, 2025) }) print(f"Found {len(similar)} similar projects") # Compare estimate to benchmark comparison = analyzer.compare_to_benchmark({'cost_per_sf': 250}, 'Office') print(f"Estimate percentile: {comparison['cost_per_sf']['percentile']:.0f}th") # Generate report report = analyzer.generate_report('Office') print(report)
Dependencies
pip install pandas numpy scipy