Asi analyzing-network-covert-channels-in-malware
Detect and analyze covert communication channels used by malware including DNS tunneling, ICMP exfiltration, steganographic HTTP, and protocol abuse for C2 and data exfiltration.
git clone https://github.com/plurigrid/asi
T=$(mktemp -d) && git clone --depth=1 https://github.com/plurigrid/asi "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/asi/skills/analyzing-network-covert-channels-in-malware" ~/.claude/skills/plurigrid-asi-analyzing-network-covert-channels-in-malware && rm -rf "$T"
plugins/asi/skills/analyzing-network-covert-channels-in-malware/SKILL.mdAnalyzing Network Covert Channels in Malware
Overview
Malware uses covert channels to disguise C2 communication and data exfiltration within legitimate-looking network traffic. DNS tunneling encodes data in DNS queries and responses (used by tools like iodine, dnscat2, and malware families like FrameworkPOS). ICMP tunneling hides data in echo request/reply payloads (icmpsh, ptunnel). HTTP covert channels embed C2 data in headers, cookies, or steganographic images. Protocol abuse exploits allowed protocols to bypass firewalls. DNS tunneling detection achieves 99%+ recall with modern ML-based approaches, though low-throughput exfiltration remains challenging. Palo Alto Unit42 tracked three major DNS tunneling campaigns (TrkCdn, SecShow, Savvy Seahorse) through 2024, showing the technique's continued prevalence.
When to Use
- When investigating security incidents that require analyzing network covert channels in malware
- When building detection rules or threat hunting queries for this domain
- When SOC analysts need structured procedures for this analysis type
- When validating security monitoring coverage for related attack techniques
Prerequisites
- Python 3.9+ with
,scapy
,dpktdnslib - Wireshark/tshark for PCAP analysis
- Zeek (formerly Bro) for network monitoring
- DNS query logging infrastructure
- Understanding of DNS, ICMP, HTTP protocols at packet level
Workflow
Step 1: DNS Tunneling Detection
#!/usr/bin/env python3 """Detect DNS tunneling and covert channels in network traffic.""" import sys import json import math from collections import Counter, defaultdict try: from scapy.all import rdpcap, DNS, DNSQR, DNSRR, IP, ICMP except ImportError: print("pip install scapy") sys.exit(1) def entropy(data): if not data: return 0 freq = Counter(data) length = len(data) return -sum((c/length) * math.log2(c/length) for c in freq.values()) def analyze_dns_tunneling(pcap_path): """Detect DNS tunneling indicators in PCAP.""" packets = rdpcap(pcap_path) domain_stats = defaultdict(lambda: { "queries": 0, "total_qname_len": 0, "subdomain_lengths": [], "query_types": Counter(), "unique_subdomains": set(), }) for pkt in packets: if pkt.haslayer(DNS) and pkt.haslayer(DNSQR): qname = pkt[DNSQR].qname.decode('utf-8', errors='replace').rstrip('.') qtype = pkt[DNSQR].qtype parts = qname.split('.') if len(parts) >= 3: base_domain = '.'.join(parts[-2:]) subdomain = '.'.join(parts[:-2]) stats = domain_stats[base_domain] stats["queries"] += 1 stats["total_qname_len"] += len(qname) stats["subdomain_lengths"].append(len(subdomain)) stats["query_types"][qtype] += 1 stats["unique_subdomains"].add(subdomain) # Score domains for tunneling indicators suspicious = [] for domain, stats in domain_stats.items(): if stats["queries"] < 5: continue avg_subdomain_len = (sum(stats["subdomain_lengths"]) / len(stats["subdomain_lengths"])) unique_ratio = len(stats["unique_subdomains"]) / stats["queries"] # Calculate subdomain entropy all_subdomains = ''.join(stats["unique_subdomains"]) sub_entropy = entropy(all_subdomains) score = 0 reasons = [] if avg_subdomain_len > 30: score += 30 reasons.append(f"Long subdomains (avg {avg_subdomain_len:.0f} chars)") if unique_ratio > 0.9: score += 25 reasons.append(f"High uniqueness ({unique_ratio:.2%})") if sub_entropy > 4.0: score += 25 reasons.append(f"High entropy ({sub_entropy:.2f})") if stats["query_types"].get(16, 0) > 10: # TXT records score += 20 reasons.append(f"Many TXT queries ({stats['query_types'][16]})") if score >= 50: suspicious.append({ "domain": domain, "score": score, "queries": stats["queries"], "avg_subdomain_length": round(avg_subdomain_len, 1), "unique_subdomains": len(stats["unique_subdomains"]), "subdomain_entropy": round(sub_entropy, 2), "reasons": reasons, }) return sorted(suspicious, key=lambda x: -x["score"]) def analyze_icmp_tunneling(pcap_path): """Detect ICMP tunneling in PCAP.""" packets = rdpcap(pcap_path) icmp_stats = defaultdict(lambda: {"count": 0, "payload_sizes": [], "payloads": []}) for pkt in packets: if pkt.haslayer(ICMP) and pkt.haslayer(IP): src = pkt[IP].src dst = pkt[IP].dst key = f"{src}->{dst}" payload = bytes(pkt[ICMP].payload) icmp_stats[key]["count"] += 1 icmp_stats[key]["payload_sizes"].append(len(payload)) if len(payload) > 64: icmp_stats[key]["payloads"].append(payload[:100]) suspicious = [] for flow, stats in icmp_stats.items(): if stats["count"] < 5: continue avg_size = sum(stats["payload_sizes"]) / len(stats["payload_sizes"]) if avg_size > 64 or stats["count"] > 100: suspicious.append({ "flow": flow, "packets": stats["count"], "avg_payload_size": round(avg_size, 1), "reason": "Large/frequent ICMP payloads suggest tunneling", }) return suspicious if __name__ == "__main__": if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} <pcap_file>") sys.exit(1) print("[+] DNS Tunneling Analysis") dns_results = analyze_dns_tunneling(sys.argv[1]) for r in dns_results: print(f" {r['domain']} (score: {r['score']})") for reason in r['reasons']: print(f" - {reason}") print("\n[+] ICMP Tunneling Analysis") icmp_results = analyze_icmp_tunneling(sys.argv[1]) for r in icmp_results: print(f" {r['flow']}: {r['reason']}")
Validation Criteria
- DNS tunneling detected via entropy, subdomain length, and query volume analysis
- ICMP covert channels identified through payload size anomalies
- Tunneling domains distinguished from legitimate CDN/cloud traffic
- Data exfiltration volume estimated from captured traffic
- C2 communication patterns and beaconing intervals extracted