Asi performing-ot-vulnerability-assessment-with-claroty
install
source · Clone the upstream repo
git clone https://github.com/plurigrid/asi
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/plurigrid/asi "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/asi/skills/performing-ot-vulnerability-assessment-with-claroty" ~/.claude/skills/plurigrid-asi-performing-ot-vulnerability-assessment-with-claroty && rm -rf "$T"
manifest:
plugins/asi/skills/performing-ot-vulnerability-assessment-with-claroty/SKILL.mdsource content
Performing OT Vulnerability Assessment with Claroty
When to Use
- When conducting scheduled OT vulnerability assessments per IEC 62443 or NERC CIP requirements
- When deploying Claroty xDome for the first time and performing initial asset discovery and risk assessment
- When correlating newly published ICS-CERT advisories against your OT asset inventory
- When prioritizing OT vulnerability remediation with limited maintenance windows
- When generating compliance evidence for CIP-010-4 vulnerability assessment requirements
Do not use for active vulnerability scanning of PLCs and safety systems (see performing-ot-network-security-assessment for passive approaches), for IT-only vulnerability management (see standard vulnerability scanners), or for penetration testing (see performing-ics-penetration-testing).
Prerequisites
- Claroty xDome or CTD (Continuous Threat Detection) deployed with sensors on OT network
- Network SPAN/TAP access for passive asset discovery
- CISA ICS-CERT advisory subscription for vulnerability tracking
- Asset inventory with firmware versions for all OT devices
- Change management process for patch deployment during maintenance windows
Workflow
Step 1: Configure Asset Discovery and Vulnerability Correlation
Configure Claroty to perform passive and active-safe discovery to build complete asset inventory with firmware versions for vulnerability correlation.
#!/usr/bin/env python3 """OT Vulnerability Assessment Manager. Correlates OT asset inventory with ICS-CERT advisories and CVE data to identify, prioritize, and track OT vulnerabilities. Designed to integrate with Claroty xDome API or standalone operation. """ import json import sys from collections import defaultdict from dataclasses import dataclass, field, asdict from datetime import datetime import requests @dataclass class OTAsset: asset_id: str name: str vendor: str model: str firmware_version: str asset_type: str # PLC, HMI, RTU, historian, switch, etc. purdue_level: str ip_address: str protocol: str criticality: str # critical, high, medium, low zone: str @dataclass class OTVulnerability: vuln_id: str cve_id: str title: str severity: str # critical, high, medium, low cvss_score: float affected_vendor: str affected_product: str affected_versions: str description: str ics_cert_advisory: str = "" remediation: str = "" patch_available: bool = False compensating_controls: str = "" @dataclass class RiskAssessment: asset: OTAsset vulnerability: OTVulnerability risk_score: float = 0.0 risk_rating: str = "" exploitability: str = "" operational_impact: str = "" compensating_controls: list = field(default_factory=list) remediation_priority: int = 0 class OTVulnerabilityAssessment: """OT vulnerability assessment and prioritization engine.""" def __init__(self): self.assets = [] self.vulnerabilities = [] self.risk_assessments = [] def load_assets(self, assets_data): """Load asset inventory from Claroty export or manual inventory.""" for a in assets_data: self.assets.append(OTAsset(**a)) print(f"[*] Loaded {len(self.assets)} OT assets") def fetch_ics_advisories(self): """Fetch latest ICS-CERT advisories from CISA.""" print("[*] Fetching ICS-CERT advisories from CISA...") try: # CISA Known Exploited Vulnerabilities catalog url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json" resp = requests.get(url, timeout=30) resp.raise_for_status() data = resp.json() ics_vulns = [] for vuln in data.get("vulnerabilities", []): # Filter for ICS-relevant vendors ics_vendors = [ "siemens", "schneider", "rockwell", "honeywell", "abb", "ge", "emerson", "yokogawa", "omron", "mitsubishi", "phoenix", "moxa", "advantech", ] vendor = vuln.get("vendorProject", "").lower() if any(v in vendor for v in ics_vendors): ics_vulns.append(vuln) print(f" Found {len(ics_vulns)} ICS-relevant known exploited vulnerabilities") return ics_vulns except Exception as e: print(f"[WARN] Could not fetch advisories: {e}") return [] def correlate_vulnerabilities(self): """Match vulnerabilities to assets based on vendor/model/firmware.""" print("[*] Correlating vulnerabilities to assets...") for asset in self.assets: for vuln in self.vulnerabilities: if (vuln.affected_vendor.lower() in asset.vendor.lower() and vuln.affected_product.lower() in asset.model.lower()): # Check firmware version if specified ra = RiskAssessment(asset=asset, vulnerability=vuln) self._calculate_risk_score(ra) self.risk_assessments.append(ra) print(f" Correlated {len(self.risk_assessments)} asset-vulnerability pairs") def _calculate_risk_score(self, ra): """Calculate OT-specific risk score considering operational impact.""" # Base score from CVSS base = ra.vulnerability.cvss_score # Criticality multiplier based on asset function criticality_weights = { "critical": 1.5, # SIS, safety systems "high": 1.3, # PLCs, primary control "medium": 1.0, # HMIs, historians "low": 0.7, # non-critical support systems } criticality = criticality_weights.get(ra.asset.criticality, 1.0) # Purdue level proximity factor (lower levels = higher risk) level_weights = { "Level 0-1": 1.5, "Level 2": 1.3, "Level 3": 1.0, "Level 3.5": 0.8, "Level 4": 0.6, } level_factor = level_weights.get(ra.asset.purdue_level, 1.0) # Network exposure reduction if compensating controls exist comp_reduction = 0.8 if ra.compensating_controls else 1.0 ra.risk_score = round(base * criticality * level_factor * comp_reduction, 1) ra.risk_score = min(ra.risk_score, 10.0) if ra.risk_score >= 9.0: ra.risk_rating = "critical" ra.remediation_priority = 1 elif ra.risk_score >= 7.0: ra.risk_rating = "high" ra.remediation_priority = 2 elif ra.risk_score >= 4.0: ra.risk_rating = "medium" ra.remediation_priority = 3 else: ra.risk_rating = "low" ra.remediation_priority = 4 def generate_report(self): """Generate vulnerability assessment report.""" # Sort by risk score descending sorted_ra = sorted(self.risk_assessments, key=lambda x: -x.risk_score) report = [] report.append("=" * 70) report.append("OT VULNERABILITY ASSESSMENT REPORT") report.append(f"Date: {datetime.now().isoformat()}") report.append(f"Assets: {len(self.assets)} | Vulnerabilities: {len(self.vulnerabilities)}") report.append(f"Risk Assessments: {len(self.risk_assessments)}") report.append("=" * 70) for sev in ["critical", "high", "medium", "low"]: findings = [ra for ra in sorted_ra if ra.risk_rating == sev] if findings: report.append(f"\n--- {sev.upper()} RISK ({len(findings)}) ---") for ra in findings[:10]: report.append(f"\n Risk Score: {ra.risk_score}/10.0") report.append(f" Asset: {ra.asset.name} ({ra.asset.vendor} {ra.asset.model})") report.append(f" Zone: {ra.asset.zone} ({ra.asset.purdue_level})") report.append(f" CVE: {ra.vulnerability.cve_id} (CVSS: {ra.vulnerability.cvss_score})") report.append(f" Title: {ra.vulnerability.title}") if ra.vulnerability.patch_available: report.append(f" Patch: Available - schedule for next maintenance window") else: report.append(f" Patch: Not available - apply compensating controls") return "\n".join(report) def export_json(self, output_file): """Export assessment to JSON.""" data = { "assessment_date": datetime.now().isoformat(), "asset_count": len(self.assets), "vulnerability_count": len(self.vulnerabilities), "risk_assessments": [ { "asset_name": ra.asset.name, "asset_ip": ra.asset.ip_address, "cve": ra.vulnerability.cve_id, "risk_score": ra.risk_score, "risk_rating": ra.risk_rating, "priority": ra.remediation_priority, } for ra in sorted(self.risk_assessments, key=lambda x: -x.risk_score) ], } with open(output_file, "w") as f: json.dump(data, f, indent=2) if __name__ == "__main__": assessment = OTVulnerabilityAssessment() advisories = assessment.fetch_ics_advisories() print(f"Fetched {len(advisories)} ICS advisories from CISA KEV catalog")
Key Concepts
| Term | Definition |
|---|---|
| Claroty xDome | Cyber-physical systems protection platform providing asset discovery, vulnerability management, and threat detection for OT/IoT environments |
| Passive Discovery | Identifying OT assets by analyzing network traffic without sending any packets, safe for production environments |
| Safe Active Query | Querying OT devices using native industrial protocols at safe rates to collect detailed asset information without disrupting operations |
| OT Risk Score | Risk rating that factors CVSS base score, asset criticality, Purdue level, and compensating controls for OT-appropriate prioritization |
| ICS-CERT Advisory | CISA-published security advisories for industrial control system vulnerabilities with vendor-specific remediation guidance |
| Virtual Patching | Deploying IPS/firewall rules to block exploitation of known vulnerabilities when firmware patches cannot be immediately applied |
Tools & Systems
- Claroty xDome: Comprehensive OT/IoT asset discovery, vulnerability management, and continuous threat detection platform
- Claroty CTD: Continuous Threat Detection sensor for passive network monitoring in OT environments
- CISA ICS-CERT: US government advisory service publishing ICS vulnerability notifications and mitigation guidance
- Dragos Platform: Alternative OT security platform with asset visibility and vulnerability management capabilities
- Nozomi Networks Guardian: OT monitoring platform with vulnerability correlation and risk scoring
Output Format
OT Vulnerability Assessment Report ===================================== Tool: Claroty xDome / Manual Assessment Date: YYYY-MM-DD Assets Scanned: [N] RISK SUMMARY: Critical Risk: [N] vulnerabilities on [N] assets High Risk: [N] vulnerabilities on [N] assets Medium Risk: [N] vulnerabilities on [N] assets Low Risk: [N] vulnerabilities on [N] assets TOP RISKS: [Risk Score] [CVE-ID] on [Asset Name] ([Zone]) Remediation: [Patch/Compensating Control] Timeline: [Next maintenance window / Immediate]