Skills worldmonitor
install
source · Clone the upstream repo
git clone https://github.com/TerminalSkills/skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/TerminalSkills/skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/worldmonitor" ~/.claude/skills/terminalskills-skills-worldmonitor && rm -rf "$T"
manifest:
skills/worldmonitor/SKILL.mdsafety · automated scan (medium risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
- pip install
- references API keys
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content
World Monitor
Overview
Build real-time intelligence dashboards that ingest data from multiple sources (RSS, news APIs, social media), use AI to categorize, summarize, deduplicate, and score severity, then push updates to a live dashboard. Think of it as your own AI-powered situation room.
Instructions
When a user asks to build a news monitoring system, intelligence dashboard, or event aggregation feed:
- Define scope — What topics/regions/industries to monitor?
- Select sources — RSS feeds, NewsAPI, social APIs, custom scrapers
- Set up pipeline — Ingest → Deduplicate → Classify → Summarize → Score
- Build output — API + WebSocket for real-time push, alert rules
Source Ingestion (Python)
"""Fetch from multiple source types in parallel.""" import asyncio, hashlib, feedparser, httpx from datetime import datetime, timezone class NewsItem: def __init__(self, title: str, content: str, source: str, url: str, published: datetime): self.title, self.content, self.source, self.url = title, content, source, url self.published = published self.id = hashlib.sha256(f"{title}{url}".encode()).hexdigest()[:16] async def fetch_rss(feeds: list[str]) -> list[NewsItem]: items = [] async with httpx.AsyncClient(timeout=15) as client: responses = await asyncio.gather(*[client.get(url) for url in feeds], return_exceptions=True) for resp in responses: if isinstance(resp, Exception): continue feed = feedparser.parse(resp.text) for entry in feed.entries[:20]: items.append(NewsItem( title=entry.get("title", ""), content=entry.get("summary", ""), source=feed.feed.get("title", "RSS"), url=entry.get("link", ""), published=datetime.now(timezone.utc), )) return items async def fetch_newsapi(query: str, api_key: str) -> list[NewsItem]: async with httpx.AsyncClient() as client: resp = await client.get("https://newsapi.org/v2/everything", params={"q": query, "sortBy": "publishedAt", "pageSize": 50}, headers={"X-Api-Key": api_key}) data = resp.json() return [ NewsItem(title=a["title"], content=a.get("description", ""), source=a["source"]["name"], url=a["url"], published=datetime.fromisoformat(a["publishedAt"].replace("Z", "+00:00"))) for a in data.get("articles", []) ]
Deduplication
from difflib import SequenceMatcher def deduplicate(items: list[NewsItem], threshold: float = 0.75) -> list[NewsItem]: unique, seen_titles = [], [] for item in sorted(items, key=lambda x: x.published, reverse=True): is_dup = any(SequenceMatcher(None, item.title.lower(), s.lower()).ratio() > threshold for s in seen_titles) if not is_dup: unique.append(item) seen_titles.append(item.title) return unique
AI Classification & Summarization
import json from openai import OpenAI client = OpenAI() CATEGORIES = ["geopolitics", "technology", "finance", "security", "climate", "health", "regulation", "market-move"] def analyze_article(item: NewsItem) -> dict: response = client.chat.completions.create( model="gpt-4o-mini", response_format={"type": "json_object"}, messages=[ {"role": "system", "content": f"Analyze this news. Return JSON: {{category: one of {CATEGORIES}, severity: 1-10, summary: '2-3 sentences', entities: [], sentiment: 'positive|negative|neutral', actionable: bool}}"}, {"role": "user", "content": f"Title: {item.title}\n\nContent: {item.content[:2000]}"}, ], ) analysis = json.loads(response.choices[0].message.content) return {**analysis, "id": item.id, "title": item.title, "url": item.url, "source": item.source}
Dashboard API (Node.js)
import express from "express"; import { WebSocketServer, WebSocket } from "ws"; import { createServer } from "http"; interface IntelItem { id: string; title: string; summary: string; category: string; severity: number; source: string; url: string; timestamp: string; } const app = express(); const server = createServer(app); const wss = new WebSocketServer({ server }); let feed: IntelItem[] = []; const clients = new Set<WebSocket>(); wss.on("connection", (ws) => { clients.add(ws); ws.send(JSON.stringify({ type: "init", items: feed.slice(0, 50) })); ws.on("close", () => clients.delete(ws)); }); function broadcast(item: IntelItem) { const msg = JSON.stringify({ type: "new", item }); clients.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) ws.send(msg); }); } app.post("/api/ingest", express.json(), (req, res) => { const item: IntelItem = { ...req.body, timestamp: new Date().toISOString() }; feed.unshift(item); feed = feed.slice(0, 1000); broadcast(item); res.json({ ok: true }); }); app.get("/api/feed", (req, res) => { let items = feed; const { category, minSeverity, limit } = req.query; if (category) items = items.filter((i) => i.category === category); if (minSeverity) items = items.filter((i) => i.severity >= Number(minSeverity)); res.json(items.slice(0, Number(limit) || 50)); }); server.listen(3000, () => console.log("Intelligence dashboard on :3000"));
Alert Rules
import httpx ALERT_RULES = [ {"category": "security", "min_severity": 7, "channel": "slack"}, {"category": "market-move", "min_severity": 8, "channel": "email"}, {"category": "*", "min_severity": 9, "channel": "all"}, ] async def check_alerts(item: dict): for rule in ALERT_RULES: cat_match = rule["category"] == "*" or rule["category"] == item["category"] if cat_match and item["severity"] >= rule["min_severity"]: await send_alert(rule["channel"], item) async def send_alert(channel: str, item: dict): msg = f"[{item['category'].upper()}] Severity {item['severity']}/10\n{item['title']}\n{item['summary']}" if channel in ("slack", "all"): await httpx.AsyncClient().post("https://hooks.slack.com/services/YOUR/WEBHOOK", json={"text": msg})
Examples
Example 1: Tech Industry News Monitor
RSS_FEEDS = [ "https://feeds.arstechnica.com/arstechnica/index", "https://techcrunch.com/feed/", "https://www.theverge.com/rss/index.xml", ] async def run_tech_monitor(): items = await fetch_rss(RSS_FEEDS) unique = deduplicate(items) for item in unique[:20]: analysis = analyze_article(item) async with httpx.AsyncClient() as c: await c.post("http://localhost:3000/api/ingest", json=analysis) await check_alerts(analysis) # Output: {"category": "technology", "severity": 4, "summary": "Apple announced..."}
Example 2: Geopolitical Event Monitoring with Scheduled Polling
async def monitor_loop(interval_minutes: int = 15): """Run the full pipeline on a schedule.""" while True: items = await fetch_rss(RSS_FEEDS) newsapi_items = await fetch_newsapi("geopolitics OR sanctions", NEWSAPI_KEY) all_items = items + newsapi_items unique = deduplicate(all_items) for item in unique: analysis = analyze_article(item) async with httpx.AsyncClient() as c: await c.post("http://localhost:3000/api/ingest", json=analysis) await check_alerts(analysis) await asyncio.sleep(interval_minutes * 60) # Runs every 15 min, deduplicates across sources, alerts on severity >= 7
Guidelines
- Dedup aggressively — The same story appears across 20+ outlets. Dedup by title similarity
- Batch AI calls — Process 10 articles per LLM call instead of 1 to save ~90% on API costs
- Severity calibration — Periodically review severity scores. LLMs tend to over-rate severity
- Source diversity — Mix mainstream, niche, and social sources for balanced coverage
- Rate limit respect — Cache RSS feeds for 15-30 min. Don't hammer free APIs
- Historical storage — Keep analyzed articles in a DB for trend analysis over time
Dependencies
pip install feedparser httpx openai # Python pipeline npm install express ws # Node.js dashboard