DDC_Skills_for_AI_Agents_in_Construction big-data-analysis
Analyze large-scale construction datasets. Process thousands of projects for patterns, benchmarks, and predictive insights.
install
source · Clone the upstream repo
git clone https://github.com/datadrivenconstruction/DDC_Skills_for_AI_Agents_in_Construction
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/datadrivenconstruction/DDC_Skills_for_AI_Agents_in_Construction "$T" && mkdir -p ~/.claude/skills && cp -r "$T/2_DDC_Book/4.4-Vector-Search-BigData/big-data-analysis" ~/.claude/skills/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-big-data-analysi && rm -rf "$T"
manifest:
2_DDC_Book/4.4-Vector-Search-BigData/big-data-analysis/SKILL.mdsource content
Big Data Analysis
Business Case
Problem Statement
Large-scale data analysis challenges:
- Processing millions of records
- Cross-project benchmarking
- Pattern recognition at scale
- Memory and performance constraints
Solution
Scalable big data analysis framework for construction data using efficient data structures and parallel processing patterns.
Technical Implementation
import pandas as pd from typing import Dict, Any, List, Optional, Callable, Iterator from dataclasses import dataclass, field from datetime import datetime, date from enum import Enum import json class AnalysisType(Enum): BENCHMARK = "benchmark" TREND = "trend" ANOMALY = "anomaly" CORRELATION = "correlation" CLUSTERING = "clustering" AGGREGATION = "aggregation" class MetricType(Enum): COST_PER_SF = "cost_per_sf" DURATION_PER_SF = "duration_per_sf" PRODUCTIVITY = "productivity" CHANGE_ORDER_RATE = "change_order_rate" SAFETY_RATE = "safety_rate" QUALITY_SCORE = "quality_score" @dataclass class ProjectRecord: project_id: str name: str project_type: str location: str size_sf: float duration_days: int total_cost: float start_date: date metrics: Dict[str, float] = field(default_factory=dict) attributes: Dict[str, Any] = field(default_factory=dict) @dataclass class BenchmarkResult: metric: str mean: float median: float std: float min_val: float max_val: float percentile_25: float percentile_75: float sample_size: int class BigDataAnalyzer: """Analyze large-scale construction datasets.""" def __init__(self, name: str = "Construction Analytics"): self.name = name self.projects: List[ProjectRecord] = [] self.df: Optional[pd.DataFrame] = None self.benchmarks: Dict[str, BenchmarkResult] = {} def load_from_dataframe(self, df: pd.DataFrame): """Load project data from DataFrame.""" self.df = df.copy() self.projects = [] for _, row in df.iterrows(): project = ProjectRecord( project_id=str(row.get('project_id', '')), name=str(row.get('name', '')), project_type=str(row.get('project_type', '')), location=str(row.get('location', '')), size_sf=float(row.get('size_sf', 0)), duration_days=int(row.get('duration_days', 0)), total_cost=float(row.get('total_cost', 0)), start_date=pd.to_datetime(row.get('start_date')).date() if pd.notna(row.get('start_date')) else date.today() ) # Add calculated metrics if project.size_sf > 0: project.metrics['cost_per_sf'] = project.total_cost / project.size_sf project.metrics['duration_per_1000sf'] = project.duration_days / (project.size_sf / 1000) self.projects.append(project) def load_from_parquet(self, path: str): """Load data from Parquet file.""" df = pd.read_parquet(path) self.load_from_dataframe(df) def stream_process(self, file_path: str, chunk_size: int = 10000, processor: Callable = None) -> Iterator[Dict[str, Any]]: """Process large file in chunks.""" for chunk in pd.read_csv(file_path, chunksize=chunk_size): if processor: result = processor(chunk) yield result else: yield {'rows': len(chunk), 'columns': list(chunk.columns)} def calculate_benchmarks(self, metric_column: str, group_by: str = None) -> Dict[str, BenchmarkResult]: """Calculate benchmarks for a metric.""" if self.df is None or self.df.empty: return {} results = {} if group_by and group_by in self.df.columns: groups = self.df.groupby(group_by) for group_name, group_df in groups: values = group_df[metric_column].dropna() if len(values) > 0: results[str(group_name)] = self._calculate_stats(values, metric_column) else: values = self.df[metric_column].dropna() if len(values) > 0: results['all'] = self._calculate_stats(values, metric_column) self.benchmarks.update(results) return results def _calculate_stats(self, values: pd.Series, metric: str) -> BenchmarkResult: """Calculate statistics for a series.""" return BenchmarkResult( metric=metric, mean=round(values.mean(), 2), median=round(values.median(), 2), std=round(values.std(), 2), min_val=round(values.min(), 2), max_val=round(values.max(), 2), percentile_25=round(values.quantile(0.25), 2), percentile_75=round(values.quantile(0.75), 2), sample_size=len(values) ) def find_anomalies(self, metric_column: str, threshold_std: float = 2.0) -> pd.DataFrame: """Find anomalies based on standard deviation threshold.""" if self.df is None or self.df.empty: return pd.DataFrame() values = self.df[metric_column] mean = values.mean() std = values.std() lower = mean - (threshold_std * std) upper = mean + (threshold_std * std) anomalies = self.df[(values < lower) | (values > upper)].copy() anomalies['anomaly_type'] = anomalies[metric_column].apply( lambda x: 'high' if x > upper else 'low' ) anomalies['deviation'] = ((anomalies[metric_column] - mean) / std).round(2) return anomalies def analyze_trends(self, metric_column: str, date_column: str, period: str = 'Y') -> pd.DataFrame: """Analyze trends over time.""" if self.df is None or self.df.empty: return pd.DataFrame() df = self.df.copy() df[date_column] = pd.to_datetime(df[date_column]) df['period'] = df[date_column].dt.to_period(period) trends = df.groupby('period').agg({ metric_column: ['mean', 'median', 'count', 'std'] }).round(2) trends.columns = ['mean', 'median', 'count', 'std'] trends = trends.reset_index() trends['period'] = trends['period'].astype(str) # Calculate year-over-year change trends['yoy_change'] = trends['mean'].pct_change().round(4) * 100 return trends def calculate_correlations(self, columns: List[str]) -> pd.DataFrame: """Calculate correlations between metrics.""" if self.df is None or self.df.empty: return pd.DataFrame() available_cols = [c for c in columns if c in self.df.columns] return self.df[available_cols].corr().round(3) def segment_analysis(self, metric_column: str, segment_column: str) -> pd.DataFrame: """Analyze metric by segments.""" if self.df is None or self.df.empty: return pd.DataFrame() results = self.df.groupby(segment_column).agg({ metric_column: ['count', 'mean', 'median', 'std', 'min', 'max'] }).round(2) results.columns = ['count', 'mean', 'median', 'std', 'min', 'max'] results = results.reset_index() # Calculate percentage of total total_count = results['count'].sum() results['pct_of_total'] = (results['count'] / total_count * 100).round(1) return results.sort_values('count', ascending=False) def percentile_rank(self, project_id: str, metric_column: str) -> Dict[str, Any]: """Get percentile rank for a specific project.""" if self.df is None or self.df.empty: return {} project = self.df[self.df['project_id'] == project_id] if project.empty: return {'error': 'Project not found'} value = project[metric_column].values[0] all_values = self.df[metric_column].dropna() percentile = (all_values < value).sum() / len(all_values) * 100 benchmark = self.benchmarks.get('all') or self._calculate_stats(all_values, metric_column) return { 'project_id': project_id, 'metric': metric_column, 'value': round(value, 2), 'percentile': round(percentile, 1), 'comparison': { 'mean': benchmark.mean, 'median': benchmark.median, 'vs_mean': round((value / benchmark.mean - 1) * 100, 1), 'vs_median': round((value / benchmark.median - 1) * 100, 1) } } def generate_summary_stats(self) -> Dict[str, Any]: """Generate summary statistics for the dataset.""" if self.df is None or self.df.empty: return {} numeric_cols = self.df.select_dtypes(include=['number']).columns return { 'total_projects': len(self.df), 'date_range': { 'min': str(self.df['start_date'].min()) if 'start_date' in self.df.columns else None, 'max': str(self.df['start_date'].max()) if 'start_date' in self.df.columns else None }, 'project_types': self.df['project_type'].nunique() if 'project_type' in self.df.columns else 0, 'locations': self.df['location'].nunique() if 'location' in self.df.columns else 0, 'total_value': self.df['total_cost'].sum() if 'total_cost' in self.df.columns else 0, 'total_sf': self.df['size_sf'].sum() if 'size_sf' in self.df.columns else 0, 'numeric_columns': list(numeric_cols) } def export_analysis(self, output_path: str, metrics: List[str] = None) -> str: """Export analysis results to Excel.""" metrics = metrics or ['cost_per_sf', 'duration_days'] with pd.ExcelWriter(output_path, engine='openpyxl') as writer: # Summary summary = self.generate_summary_stats() summary_df = pd.DataFrame([{ 'Total Projects': summary.get('total_projects', 0), 'Project Types': summary.get('project_types', 0), 'Locations': summary.get('locations', 0), 'Total Value ($)': summary.get('total_value', 0), 'Total SF': summary.get('total_sf', 0) }]) summary_df.to_excel(writer, sheet_name='Summary', index=False) # Benchmarks for metric in metrics: if metric in self.df.columns: benchmarks = self.calculate_benchmarks(metric, 'project_type') if benchmarks: bench_data = [{ 'Segment': k, 'Mean': v.mean, 'Median': v.median, 'Std': v.std, 'Min': v.min_val, 'Max': v.max_val, 'P25': v.percentile_25, 'P75': v.percentile_75, 'Count': v.sample_size } for k, v in benchmarks.items()] bench_df = pd.DataFrame(bench_data) sheet_name = f"Benchmark_{metric}"[:31] bench_df.to_excel(writer, sheet_name=sheet_name, index=False) # Anomalies for metric in metrics: if metric in self.df.columns: anomalies = self.find_anomalies(metric) if not anomalies.empty: sheet_name = f"Anomalies_{metric}"[:31] anomalies.to_excel(writer, sheet_name=sheet_name, index=False) return output_path
Quick Start
# Create analyzer analyzer = BigDataAnalyzer("Multi-Project Analysis") # Load data df = pd.DataFrame([ {'project_id': 'P001', 'name': 'Office A', 'project_type': 'Office', 'location': 'NYC', 'size_sf': 50000, 'duration_days': 365, 'total_cost': 15000000, 'start_date': '2023-01-01'}, {'project_id': 'P002', 'name': 'Office B', 'project_type': 'Office', 'location': 'LA', 'size_sf': 75000, 'duration_days': 400, 'total_cost': 20000000, 'start_date': '2023-03-01'}, {'project_id': 'P003', 'name': 'Warehouse', 'project_type': 'Industrial', 'location': 'Chicago', 'size_sf': 100000, 'duration_days': 200, 'total_cost': 12000000, 'start_date': '2023-06-01'} ]) # Add calculated metric df['cost_per_sf'] = df['total_cost'] / df['size_sf'] analyzer.load_from_dataframe(df) # Calculate benchmarks benchmarks = analyzer.calculate_benchmarks('cost_per_sf', 'project_type') for segment, stats in benchmarks.items(): print(f"{segment}: ${stats.mean:.2f}/SF (median: ${stats.median:.2f}/SF)")
Common Use Cases
1. Find Anomalies
anomalies = analyzer.find_anomalies('cost_per_sf', threshold_std=2.0) print(f"Found {len(anomalies)} anomalous projects")
2. Trend Analysis
trends = analyzer.analyze_trends('cost_per_sf', 'start_date', 'Y') print(trends)
3. Project Ranking
ranking = analyzer.percentile_rank('P001', 'cost_per_sf') print(f"Project is at {ranking['percentile']}th percentile")
4. Segment Analysis
segments = analyzer.segment_analysis('cost_per_sf', 'project_type') print(segments)
Resources
- DDC Book: Chapter 4.4 - Modern Data Technologies
- Apache Parquet: https://parquet.apache.org/
- Website: https://datadrivenconstruction.io