Asi building-automated-malware-submission-pipeline
install
source · Clone the upstream repo
git clone https://github.com/plurigrid/asi
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/plurigrid/asi "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/asi/skills/building-automated-malware-submission-pipeline" ~/.claude/skills/plurigrid-asi-building-automated-malware-submission-pipeline && rm -rf "$T"
manifest:
plugins/asi/skills/building-automated-malware-submission-pipeline/SKILL.mdsource content
Building Automated Malware Submission Pipeline
When to Use
Use this skill when:
- SOC teams face high volume of suspicious file alerts requiring sandbox analysis
- Manual sandbox submission creates bottlenecks in alert triage workflow
- Endpoint and email security tools quarantine files needing automated verdict determination
- Incident response requires rapid malware family identification and IOC extraction
Do not use for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.
Prerequisites
- Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
- VirusTotal API key (Enterprise for submission, free for lookup)
- MalwareBazaar API access for known malware lookup
- File collection mechanism: EDR quarantine API, email gateway export, network capture
- Python 3.8+ with
,requests
,vt-py
librariespefile - Isolated analysis network with no production connectivity
Workflow
Step 1: Build File Collection Pipeline
Collect suspicious files from multiple sources:
import requests import hashlib import os from pathlib import Path from datetime import datetime class MalwareCollector: def __init__(self, quarantine_dir="/opt/malware_quarantine"): self.quarantine_dir = Path(quarantine_dir) self.quarantine_dir.mkdir(exist_ok=True) def collect_from_edr(self, edr_api_url, api_token): """Pull quarantined files from CrowdStrike Falcon""" headers = {"Authorization": f"Bearer {api_token}"} # Get recent quarantine events response = requests.get( f"{edr_api_url}/quarantine/queries/quarantined-files/v1", headers=headers, params={"filter": "state:'quarantined'", "limit": 50} ) file_ids = response.json()["resources"] for file_id in file_ids: # Download quarantined file dl_response = requests.get( f"{edr_api_url}/quarantine/entities/quarantined-files/v1", headers=headers, params={"ids": file_id} ) file_data = dl_response.content sha256 = hashlib.sha256(file_data).hexdigest() filepath = self.quarantine_dir / f"{sha256}.sample" filepath.write_bytes(file_data) yield {"sha256": sha256, "path": str(filepath), "source": "edr"} def collect_from_email_gateway(self, smtp_quarantine_path): """Pull attachments from email gateway quarantine""" import email from email import policy for eml_file in Path(smtp_quarantine_path).glob("*.eml"): msg = email.message_from_binary_file( eml_file.open("rb"), policy=policy.default ) for attachment in msg.iter_attachments(): content = attachment.get_content() if isinstance(content, str): content = content.encode() sha256 = hashlib.sha256(content).hexdigest() filename = attachment.get_filename() or "unknown" filepath = self.quarantine_dir / f"{sha256}.sample" filepath.write_bytes(content) yield { "sha256": sha256, "path": str(filepath), "source": "email", "original_filename": filename, "sender": msg["From"], "subject": msg["Subject"] } def compute_hashes(self, filepath): """Calculate MD5, SHA1, SHA256 for a file""" with open(filepath, "rb") as f: content = f.read() return { "md5": hashlib.md5(content).hexdigest(), "sha1": hashlib.sha1(content).hexdigest(), "sha256": hashlib.sha256(content).hexdigest(), "size": len(content) }
Step 2: Pre-Screen with Hash Lookups
Check if the file is already known before sandbox submission:
import vt class MalwarePreScreener: def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"): self.vt_client = vt.Client(vt_api_key) self.mb_api_url = mb_api_url def check_virustotal(self, sha256): """Lookup hash in VirusTotal""" try: file_obj = self.vt_client.get_object(f"/files/{sha256}") stats = file_obj.last_analysis_stats return { "found": True, "malicious": stats.get("malicious", 0), "suspicious": stats.get("suspicious", 0), "undetected": stats.get("undetected", 0), "total": sum(stats.values()), "threat_label": getattr(file_obj, "popular_threat_classification", {}).get( "suggested_threat_label", "Unknown" ), "type": getattr(file_obj, "type_description", "Unknown") } except vt.APIError: return {"found": False} def check_malwarebazaar(self, sha256): """Lookup hash in MalwareBazaar""" response = requests.post( self.mb_api_url, data={"query": "get_info", "hash": sha256} ) data = response.json() if data["query_status"] == "ok": entry = data["data"][0] return { "found": True, "signature": entry.get("signature", "Unknown"), "tags": entry.get("tags", []), "file_type": entry.get("file_type", "Unknown"), "first_seen": entry.get("first_seen", "Unknown") } return {"found": False} def pre_screen(self, sha256): """Run all pre-screening checks""" vt_result = self.check_virustotal(sha256) mb_result = self.check_malwarebazaar(sha256) verdict = "UNKNOWN" if vt_result["found"] and vt_result.get("malicious", 0) > 10: verdict = "KNOWN_MALICIOUS" elif vt_result["found"] and vt_result.get("malicious", 0) == 0: verdict = "LIKELY_CLEAN" return { "sha256": sha256, "virustotal": vt_result, "malwarebazaar": mb_result, "pre_screen_verdict": verdict, "needs_sandbox": verdict == "UNKNOWN" } def close(self): self.vt_client.close()
Step 3: Submit to Sandbox for Dynamic Analysis
Cuckoo Sandbox Submission:
class SandboxSubmitter: def __init__(self, cuckoo_url="http://cuckoo.internal:8090"): self.cuckoo_url = cuckoo_url def submit_to_cuckoo(self, filepath, timeout=300): """Submit file to Cuckoo Sandbox""" with open(filepath, "rb") as f: response = requests.post( f"{self.cuckoo_url}/tasks/create/file", files={"file": f}, data={ "timeout": timeout, "options": "procmemdump=yes,route=none", "priority": 2, "machine": "win10_x64" } ) task_id = response.json()["task_id"] return task_id def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600): """Wait for sandbox analysis to complete""" import time elapsed = 0 while elapsed < max_wait: response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}") status = response.json()["task"]["status"] if status == "reported": return self.get_report(task_id) elif status == "failed_analysis": return {"error": "Analysis failed"} time.sleep(poll_interval) elapsed += poll_interval return {"error": "Analysis timed out"} def get_report(self, task_id): """Retrieve analysis report""" response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}") report = response.json() # Extract key indicators return { "task_id": task_id, "score": report.get("info", {}).get("score", 0), "signatures": [ {"name": s["name"], "severity": s["severity"], "description": s["description"]} for s in report.get("signatures", []) ], "network": { "dns": [d["request"] for d in report.get("network", {}).get("dns", [])], "http": [ {"url": h["uri"], "method": h["method"]} for h in report.get("network", {}).get("http", []) ], "hosts": report.get("network", {}).get("hosts", []) }, "dropped_files": [ {"name": f["name"], "sha256": f["sha256"], "size": f["size"]} for f in report.get("dropped", []) ], "processes": [ {"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")} for p in report.get("behavior", {}).get("processes", []) ], "registry_keys": [ k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", []) ] } def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"): """Submit to Joe Sandbox Cloud""" with open(filepath, "rb") as f: response = requests.post( f"{joe_url}/v2/submission/new", headers={"Authorization": f"Bearer {joe_api_key}"}, files={"sample": f}, data={ "systems": "w10_64", "internet-access": False, "report-cache": True } ) return response.json()["data"]["webid"]
Step 4: Extract IOCs and Generate Verdict
class VerdictGenerator: def __init__(self): self.malicious_threshold = 7 # Cuckoo score threshold def generate_verdict(self, pre_screen, sandbox_report): """Combine pre-screening and sandbox results for final verdict""" iocs = { "ips": [], "domains": [], "urls": [], "hashes": [], "registry_keys": [], "files_dropped": [] } # Extract IOCs from sandbox report if sandbox_report: iocs["domains"] = sandbox_report.get("network", {}).get("dns", []) iocs["ips"] = sandbox_report.get("network", {}).get("hosts", []) iocs["urls"] = [ h["url"] for h in sandbox_report.get("network", {}).get("http", []) ] iocs["hashes"] = [ f["sha256"] for f in sandbox_report.get("dropped_files", []) ] iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10] iocs["files_dropped"] = sandbox_report.get("dropped_files", []) # Determine verdict vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0) sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0 sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0 combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5) if combined_score >= 100: verdict = "MALICIOUS" confidence = "HIGH" elif combined_score >= 50: verdict = "SUSPICIOUS" confidence = "MEDIUM" elif combined_score >= 20: verdict = "POTENTIALLY_UNWANTED" confidence = "LOW" else: verdict = "CLEAN" confidence = "HIGH" return { "verdict": verdict, "confidence": confidence, "combined_score": combined_score, "iocs": iocs, "vt_detections": vt_malicious, "sandbox_score": sandbox_score, "signatures": sandbox_report.get("signatures", []) if sandbox_report else [] }
Step 5: Push Results to SIEM
def push_to_splunk(verdict_result, splunk_url, splunk_token): """Send malware analysis verdict to Splunk HEC""" import json event = { "sourcetype": "malware_analysis", "source": "malware_pipeline", "event": { "sha256": verdict_result["sha256"], "verdict": verdict_result["verdict"], "confidence": verdict_result["confidence"], "score": verdict_result["combined_score"], "vt_detections": verdict_result["vt_detections"], "sandbox_score": verdict_result["sandbox_score"], "malware_family": verdict_result.get("threat_label", "Unknown"), "iocs": verdict_result["iocs"], "signatures": [s["name"] for s in verdict_result["signatures"]] } } response = requests.post( f"{splunk_url}/services/collector/event", headers={ "Authorization": f"Splunk {splunk_token}", "Content-Type": "application/json" }, json=event, verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments ) return response.status_code == 200 def push_iocs_to_blocklist(iocs, firewall_api): """Push extracted IOCs to blocking infrastructure""" for ip in iocs.get("ips", []): requests.post( f"{firewall_api}/block", json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"} ) for domain in iocs.get("domains", []): requests.post( f"{firewall_api}/block", json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"} )
Step 6: Orchestrate the Full Pipeline
def run_malware_pipeline(sample_path, config): """Execute full malware analysis pipeline""" collector = MalwareCollector() screener = MalwarePreScreener(config["vt_key"]) submitter = SandboxSubmitter(config["cuckoo_url"]) generator = VerdictGenerator() # Step 1: Hash and pre-screen hashes = collector.compute_hashes(sample_path) pre_screen = screener.pre_screen(hashes["sha256"]) # Step 2: Submit to sandbox if unknown sandbox_report = None if pre_screen["needs_sandbox"]: task_id = submitter.submit_to_cuckoo(sample_path) sandbox_report = submitter.wait_for_analysis(task_id) # Step 3: Generate verdict verdict = generator.generate_verdict(pre_screen, sandbox_report) verdict["sha256"] = hashes["sha256"] verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown") # Step 4: Push to SIEM push_to_splunk(verdict, config["splunk_url"], config["splunk_token"]) # Step 5: Block if malicious if verdict["verdict"] == "MALICIOUS": push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"]) screener.close() return verdict
Key Concepts
| Term | Definition |
|---|---|
| Dynamic Analysis | Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes) |
| Static Analysis | Examining malware without execution (hash lookup, string analysis, PE header inspection) |
| Sandbox Evasion | Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis |
| IOC Extraction | Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports |
| Multi-AV Scanning | Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection |
| Verdict | Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean |
Tools & Systems
- Cuckoo Sandbox: Open-source automated malware analysis platform with behavioral analysis and network capture
- Joe Sandbox: Commercial sandbox with deep behavioral analysis, YARA matching, and MITRE ATT&CK mapping
- Any.Run: Interactive sandbox service allowing real-time manipulation during analysis for debugging evasive malware
- VirusTotal: Multi-engine scanning service providing 70+ AV results and behavioral analysis reports
- CAPE Sandbox: Community-maintained Cuckoo fork with enhanced payload extraction and configuration dumping
Common Scenarios
- Email Attachment Triage: Auto-submit quarantined email attachments, generate verdict in <5 minutes
- EDR Quarantine Processing: Batch-process files quarantined by endpoint security for detailed analysis
- Incident Investigation: Submit suspicious binaries found during IR for malware family identification and IOC extraction
- Threat Intel Enrichment: Analyze samples from threat feeds to extract C2 infrastructure and update blocking
- Zero-Day Detection: Sandbox catches novel malware missed by signature-based AV through behavioral analysis
Output Format
MALWARE ANALYSIS REPORT — Pipeline Submission ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Sample: invoice_march.docx SHA256: a1b2c3d4e5f6a7b8... File Type: Microsoft Word Document (macro-enabled) Pre-Screening: VirusTotal: 34/72 malicious (Emotet.Downloader) MalwareBazaar: Tags: emotet, macro, downloader Sandbox Analysis (Cuckoo): Score: 9.2/10 (MALICIOUS) Signatures: - Macro executes PowerShell download cradle (severity: 8) - Process injection into explorer.exe (severity: 9) - Connects to known Emotet C2 server (severity: 9) Extracted IOCs: C2 IPs: 185.234.218[.]50:8080, 45.77.123[.]45:443 Domains: update-service[.]evil[.]com Dropped Files: payload.dll (SHA256: b2c3d4e5...) Registry: HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH ACTIONS: [DONE] IOCs pushed to Splunk threat intel [DONE] C2 IPs blocked on firewall [DONE] Domain sinkholed on DNS [DONE] Hash blocked on endpoint