Claude-skill-registry equipment-telematics
Integrate and analyze telematics data from heavy construction equipment. Track location, utilization, fuel consumption, maintenance needs, and operator behavior.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/equipment-telematics" ~/.claude/skills/majiayu000-claude-skill-registry-equipment-telematics && rm -rf "$T"
manifest:
skills/data/equipment-telematics/SKILL.mdsource content
Equipment Telematics
Overview
Integrate telematics data from heavy construction equipment (excavators, cranes, loaders, trucks) to monitor utilization, track location, analyze fuel efficiency, predict maintenance needs, and ensure safe operation.
Telematics Data Flow
┌─────────────────────────────────────────────────────────────────┐ │ EQUIPMENT TELEMATICS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ EQUIPMENT TELEMATICS ANALYTICS │ │ ───────── ────────── ───────── │ │ │ │ 🚜 Excavator ────┐ 📍 Location 📊 Utilization│ │ 🏗️ Crane ────┼──────→ 🔧 Engine Hours ────────→ ⛽ Fuel │ │ 🚛 Truck ────┤ ⛽ Fuel Level 🔧 Maintenance│ │ 🚧 Loader ────┘ ⚡ Performance 👷 Operator │ │ │ │ METRICS TRACKED: │ │ • GPS location and geofencing │ │ • Engine hours and idle time │ │ • Fuel consumption rate │ │ • Load cycles and productivity │ │ • Fault codes and diagnostics │ │ • Operator behavior and safety │ │ │ └─────────────────────────────────────────────────────────────────┘
Technical Implementation
from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple from datetime import datetime, timedelta from enum import Enum import statistics import math class EquipmentType(Enum): EXCAVATOR = "excavator" CRANE = "crane" LOADER = "loader" BULLDOZER = "bulldozer" DUMP_TRUCK = "dump_truck" CONCRETE_MIXER = "concrete_mixer" FORKLIFT = "forklift" COMPACTOR = "compactor" GRADER = "grader" TELEHANDLER = "telehandler" class OperatingStatus(Enum): OPERATING = "operating" IDLE = "idle" OFF = "off" MAINTENANCE = "maintenance" FAULT = "fault" class FaultSeverity(Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" SHUTDOWN = "shutdown" @dataclass class GPSLocation: latitude: float longitude: float altitude: float = 0.0 speed: float = 0.0 heading: float = 0.0 timestamp: datetime = field(default_factory=datetime.now) @dataclass class TelematicsReading: equipment_id: str timestamp: datetime location: GPSLocation engine_hours: float fuel_level: float # Percentage fuel_rate: float # L/hr engine_rpm: int hydraulic_temp: float coolant_temp: float operating_status: OperatingStatus load_percentage: float = 0.0 operator_id: str = "" @dataclass class FaultCode: code: str description: str severity: FaultSeverity timestamp: datetime equipment_id: str resolved: bool = False @dataclass class Equipment: id: str name: str equipment_type: EquipmentType make: str model: str year: int serial_number: str hourly_rate: float = 0.0 fuel_capacity: float = 0.0 # Liters current_hours: float = 0.0 next_service_hours: float = 0.0 assigned_site: str = "" assigned_operator: str = "" @dataclass class Geofence: id: str name: str center_lat: float center_lon: float radius_meters: float allowed_equipment: List[str] = field(default_factory=list) @dataclass class UtilizationReport: equipment_id: str period_start: datetime period_end: datetime total_hours: float operating_hours: float idle_hours: float off_hours: float utilization_pct: float idle_pct: float fuel_consumed: float fuel_efficiency: float # L/operating hour cycles: int class EquipmentTelematics: """Integrate and analyze equipment telematics data.""" # Maintenance intervals by type (hours) SERVICE_INTERVALS = { EquipmentType.EXCAVATOR: 250, EquipmentType.CRANE: 200, EquipmentType.LOADER: 250, EquipmentType.BULLDOZER: 250, EquipmentType.DUMP_TRUCK: 300, } # Typical fuel rates (L/hr) TYPICAL_FUEL_RATES = { EquipmentType.EXCAVATOR: 15, EquipmentType.CRANE: 12, EquipmentType.LOADER: 18, EquipmentType.BULLDOZER: 25, EquipmentType.DUMP_TRUCK: 20, } def __init__(self, fleet_name: str): self.fleet_name = fleet_name self.equipment: Dict[str, Equipment] = {} self.readings: List[TelematicsReading] = [] self.faults: List[FaultCode] = [] self.geofences: Dict[str, Geofence] = {} def register_equipment(self, id: str, name: str, equipment_type: EquipmentType, make: str, model: str, year: int, serial_number: str, hourly_rate: float = 0, fuel_capacity: float = 0) -> Equipment: """Register equipment in fleet.""" equipment = Equipment( id=id, name=name, equipment_type=equipment_type, make=make, model=model, year=year, serial_number=serial_number, hourly_rate=hourly_rate, fuel_capacity=fuel_capacity ) self.equipment[id] = equipment return equipment def add_geofence(self, id: str, name: str, center_lat: float, center_lon: float, radius_meters: float, allowed_equipment: List[str] = None) -> Geofence: """Add geofence boundary.""" geofence = Geofence( id=id, name=name, center_lat=center_lat, center_lon=center_lon, radius_meters=radius_meters, allowed_equipment=allowed_equipment or [] ) self.geofences[id] = geofence return geofence def ingest_reading(self, equipment_id: str, location: GPSLocation, engine_hours: float, fuel_level: float, fuel_rate: float, engine_rpm: int, hydraulic_temp: float, coolant_temp: float, load_percentage: float = 0, operator_id: str = "") -> TelematicsReading: """Ingest telematics reading from equipment.""" if equipment_id not in self.equipment: raise ValueError(f"Unknown equipment: {equipment_id}") # Determine operating status if engine_rpm == 0: status = OperatingStatus.OFF elif engine_rpm < 800 or load_percentage < 10: status = OperatingStatus.IDLE else: status = OperatingStatus.OPERATING reading = TelematicsReading( equipment_id=equipment_id, timestamp=location.timestamp, location=location, engine_hours=engine_hours, fuel_level=fuel_level, fuel_rate=fuel_rate, engine_rpm=engine_rpm, hydraulic_temp=hydraulic_temp, coolant_temp=coolant_temp, operating_status=status, load_percentage=load_percentage, operator_id=operator_id ) self.readings.append(reading) # Update equipment status equip = self.equipment[equipment_id] equip.current_hours = engine_hours # Check for issues self._check_diagnostics(equipment_id, reading) self._check_geofence(equipment_id, location) return reading def _check_diagnostics(self, equipment_id: str, reading: TelematicsReading): """Check for diagnostic issues.""" equip = self.equipment[equipment_id] # High temperature warning if reading.hydraulic_temp > 90: self._add_fault(equipment_id, "HYD_TEMP_HIGH", "Hydraulic temperature high", FaultSeverity.WARNING) if reading.coolant_temp > 100: self._add_fault(equipment_id, "COOLANT_TEMP_HIGH", "Coolant temperature critical", FaultSeverity.CRITICAL) # Low fuel warning if reading.fuel_level < 15: self._add_fault(equipment_id, "FUEL_LOW", "Fuel level below 15%", FaultSeverity.WARNING) # Service due service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250) hours_to_service = equip.next_service_hours - reading.engine_hours if hours_to_service < 0: self._add_fault(equipment_id, "SERVICE_OVERDUE", "Maintenance service overdue", FaultSeverity.WARNING) elif hours_to_service < 50: self._add_fault(equipment_id, "SERVICE_DUE", f"Service due in {hours_to_service:.0f} hours", FaultSeverity.INFO) def _check_geofence(self, equipment_id: str, location: GPSLocation): """Check geofence violations.""" for geofence in self.geofences.values(): # Calculate distance from center distance = self._haversine_distance( location.latitude, location.longitude, geofence.center_lat, geofence.center_lon ) if distance > geofence.radius_meters: if (not geofence.allowed_equipment or equipment_id in geofence.allowed_equipment): self._add_fault(equipment_id, "GEOFENCE_EXIT", f"Equipment left {geofence.name} boundary", FaultSeverity.WARNING) def _haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two coordinates in meters.""" R = 6371000 # Earth radius in meters phi1 = math.radians(lat1) phi2 = math.radians(lat2) delta_phi = math.radians(lat2 - lat1) delta_lambda = math.radians(lon2 - lon1) a = (math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) return R * c def _add_fault(self, equipment_id: str, code: str, description: str, severity: FaultSeverity): """Add fault code.""" # Check if same fault already active existing = [f for f in self.faults if f.equipment_id == equipment_id and f.code == code and not f.resolved] if existing: return fault = FaultCode( code=code, description=description, severity=severity, timestamp=datetime.now(), equipment_id=equipment_id ) self.faults.append(fault) def get_current_status(self, equipment_id: str) -> Dict: """Get current status of equipment.""" if equipment_id not in self.equipment: raise ValueError(f"Unknown equipment: {equipment_id}") equip = self.equipment[equipment_id] # Get latest reading readings = [r for r in self.readings if r.equipment_id == equipment_id] if not readings: return {"equipment": equip, "status": "no_data"} latest = max(readings, key=lambda r: r.timestamp) # Active faults active_faults = [f for f in self.faults if f.equipment_id == equipment_id and not f.resolved] return { "equipment_id": equip.id, "name": equip.name, "type": equip.equipment_type.value, "status": latest.operating_status.value, "location": { "lat": latest.location.latitude, "lon": latest.location.longitude, "speed": latest.location.speed }, "engine_hours": latest.engine_hours, "fuel_level": latest.fuel_level, "fuel_rate": latest.fuel_rate, "temps": { "hydraulic": latest.hydraulic_temp, "coolant": latest.coolant_temp }, "operator": latest.operator_id, "active_faults": len(active_faults), "last_update": latest.timestamp } def calculate_utilization(self, equipment_id: str, start_date: datetime, end_date: datetime) -> UtilizationReport: """Calculate utilization metrics for equipment.""" readings = [r for r in self.readings if r.equipment_id == equipment_id and start_date <= r.timestamp <= end_date] if not readings: return None readings.sort(key=lambda r: r.timestamp) total_hours = (end_date - start_date).total_seconds() / 3600 operating_hours = 0 idle_hours = 0 fuel_consumed = 0 # Calculate from readings for i in range(1, len(readings)): prev = readings[i-1] curr = readings[i] interval_hours = (curr.timestamp - prev.timestamp).total_seconds() / 3600 if prev.operating_status == OperatingStatus.OPERATING: operating_hours += interval_hours fuel_consumed += prev.fuel_rate * interval_hours elif prev.operating_status == OperatingStatus.IDLE: idle_hours += interval_hours fuel_consumed += prev.fuel_rate * interval_hours * 0.3 # Idle uses ~30% fuel off_hours = total_hours - operating_hours - idle_hours utilization_pct = (operating_hours / total_hours * 100) if total_hours > 0 else 0 idle_pct = (idle_hours / (operating_hours + idle_hours) * 100) if (operating_hours + idle_hours) > 0 else 0 fuel_efficiency = (fuel_consumed / operating_hours) if operating_hours > 0 else 0 return UtilizationReport( equipment_id=equipment_id, period_start=start_date, period_end=end_date, total_hours=total_hours, operating_hours=operating_hours, idle_hours=idle_hours, off_hours=off_hours, utilization_pct=utilization_pct, idle_pct=idle_pct, fuel_consumed=fuel_consumed, fuel_efficiency=fuel_efficiency, cycles=0 # Would need load cycle detection ) def get_fleet_summary(self) -> Dict: """Get summary of entire fleet.""" summary = { "total_equipment": len(self.equipment), "by_status": {}, "by_type": {}, "active_faults": 0, "service_due": [] } for equip in self.equipment.values(): # Count by type eq_type = equip.equipment_type.value summary["by_type"][eq_type] = summary["by_type"].get(eq_type, 0) + 1 # Get current status try: status = self.get_current_status(equip.id) op_status = status.get("status", "unknown") summary["by_status"][op_status] = summary["by_status"].get(op_status, 0) + 1 # Check service due service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250) hours_to_service = equip.next_service_hours - equip.current_hours if hours_to_service < 50: summary["service_due"].append({ "equipment": equip.name, "hours_remaining": hours_to_service }) except Exception: summary["by_status"]["unknown"] = summary["by_status"].get("unknown", 0) + 1 # Count active faults summary["active_faults"] = len([f for f in self.faults if not f.resolved]) return summary def predict_maintenance(self, equipment_id: str) -> Dict: """Predict maintenance needs based on usage patterns.""" if equipment_id not in self.equipment: raise ValueError(f"Unknown equipment: {equipment_id}") equip = self.equipment[equipment_id] # Calculate average daily hours week_ago = datetime.now() - timedelta(days=7) recent_readings = [r for r in self.readings if r.equipment_id == equipment_id and r.timestamp > week_ago] if len(recent_readings) < 2: return {"prediction": "insufficient_data"} hours_start = min(r.engine_hours for r in recent_readings) hours_end = max(r.engine_hours for r in recent_readings) days = (max(r.timestamp for r in recent_readings) - min(r.timestamp for r in recent_readings)).days or 1 daily_hours = (hours_end - hours_start) / days # Predict service date service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250) hours_to_service = equip.next_service_hours - equip.current_hours if daily_hours > 0: days_to_service = hours_to_service / daily_hours service_date = datetime.now() + timedelta(days=days_to_service) else: service_date = None return { "equipment_id": equipment_id, "current_hours": equip.current_hours, "next_service_hours": equip.next_service_hours, "hours_to_service": hours_to_service, "avg_daily_hours": daily_hours, "predicted_service_date": service_date, "service_type": "Routine maintenance", "estimated_downtime_hours": 8 } def generate_report(self) -> str: """Generate fleet telematics report.""" summary = self.get_fleet_summary() lines = [ "# Equipment Telematics Report", "", f"**Fleet:** {self.fleet_name}", f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}", "", "## Fleet Summary", "", f"| Metric | Value |", f"|--------|-------|", f"| Total Equipment | {summary['total_equipment']} |", f"| Active Faults | {summary['active_faults']} |", f"| Service Due | {len(summary['service_due'])} |", "", "## Status Distribution", "" ] for status, count in summary["by_status"].items(): lines.append(f"- {status}: {count}") # Equipment details lines.extend([ "", "## Equipment Status", "", "| Equipment | Type | Status | Hours | Fuel | Faults |", "|-----------|------|--------|-------|------|--------|" ]) for equip in self.equipment.values(): try: status = self.get_current_status(equip.id) status_icon = "✅" if status['status'] == 'operating' else "⏸️" if status['status'] == 'idle' else "⏹️" lines.append( f"| {equip.name} | {equip.equipment_type.value} | " f"{status_icon} {status['status']} | {status['engine_hours']:.0f} | " f"{status['fuel_level']:.0f}% | {status['active_faults']} |" ) except Exception: lines.append( f"| {equip.name} | {equip.equipment_type.value} | ⚠️ No data | - | - | - |" ) # Service due if summary["service_due"]: lines.extend([ "", "## Service Due Soon", "", "| Equipment | Hours Remaining |", "|-----------|-----------------|" ]) for svc in summary["service_due"]: lines.append(f"| {svc['equipment']} | {svc['hours_remaining']:.0f} |") # Active faults active_faults = [f for f in self.faults if not f.resolved] if active_faults: lines.extend([ "", "## Active Faults", "", "| Equipment | Code | Description | Severity |", "|-----------|------|-------------|----------|" ]) for fault in active_faults[:10]: sev_icon = "🔴" if fault.severity == FaultSeverity.CRITICAL else "🟡" equip = self.equipment.get(fault.equipment_id) lines.append( f"| {equip.name if equip else fault.equipment_id} | " f"{fault.code} | {fault.description} | {sev_icon} {fault.severity.value} |" ) return "\n".join(lines)
Quick Start
from datetime import datetime, timedelta # Initialize telematics system telematics = EquipmentTelematics("Site A Fleet") # Register equipment telematics.register_equipment( "EX-001", "Excavator #1", EquipmentType.EXCAVATOR, make="Caterpillar", model="320", year=2022, serial_number="CAT320X12345", hourly_rate=150, fuel_capacity=400 ) telematics.register_equipment( "CR-001", "Tower Crane #1", EquipmentType.CRANE, make="Liebherr", model="200EC-H", year=2021, serial_number="LH200EC54321", hourly_rate=200, fuel_capacity=300 ) # Add geofence for site boundary telematics.add_geofence( "SITE-A", "Site A Boundary", center_lat=40.7128, center_lon=-74.0060, radius_meters=500 ) # Ingest telematics reading location = GPSLocation( latitude=40.7128, longitude=-74.0059, speed=5.0, timestamp=datetime.now() ) telematics.ingest_reading( "EX-001", location, engine_hours=1250.5, fuel_level=65.0, fuel_rate=18.5, engine_rpm=1800, hydraulic_temp=75.0, coolant_temp=85.0, load_percentage=75, operator_id="OP-101" ) # Get current status status = telematics.get_current_status("EX-001") print(f"Excavator status: {status['status']}") print(f"Location: {status['location']}") print(f"Fuel: {status['fuel_level']}%") # Calculate utilization util = telematics.calculate_utilization( "EX-001", datetime.now() - timedelta(days=7), datetime.now() ) if util: print(f"Utilization: {util.utilization_pct:.1f}%") print(f"Fuel efficiency: {util.fuel_efficiency:.1f} L/hr") # Predict maintenance maintenance = telematics.predict_maintenance("EX-001") print(f"Days to service: {maintenance.get('predicted_service_date')}") # Fleet summary summary = telematics.get_fleet_summary() print(f"Fleet: {summary['total_equipment']} units") # Generate report print(telematics.generate_report())
Requirements
pip install (no external dependencies)