Claude-code-plugins-plus-skills clari-performance-tuning
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/clari-pack/skills/clari-performance-tuning" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-clari-performance-tuning && rm -rf "$T"
manifest:
plugins/saas-packs/clari-pack/skills/clari-performance-tuning/SKILL.mdsource content
Clari Performance Tuning
Overview
Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.
Instructions
Parallel Multi-Period Export
from concurrent.futures import ThreadPoolExecutor, as_completed def parallel_export( client, forecast_name: str, periods: list[str], max_workers: int = 3, ) -> dict[str, list[dict]]: results = {} def export_period(period: str) -> tuple[str, list[dict]]: data = client.export_and_download(forecast_name, period) return period, data.get("entries", []) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(export_period, p): p for p in periods } for future in as_completed(futures): period, entries = future.result() results[period] = entries print(f" {period}: {len(entries)} entries") return results
Cache Export Results
import json import hashlib from pathlib import Path from datetime import datetime, timedelta class ExportCache: def __init__(self, cache_dir: str = ".cache/clari", ttl_hours: int = 4): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self.ttl = timedelta(hours=ttl_hours) def _key(self, forecast: str, period: str) -> str: return hashlib.md5(f"{forecast}:{period}".encode()).hexdigest() def get(self, forecast: str, period: str) -> list[dict] | None: path = self.cache_dir / f"{self._key(forecast, period)}.json" if not path.exists(): return None meta = json.loads(path.read_text()) cached_at = datetime.fromisoformat(meta["cached_at"]) if datetime.utcnow() - cached_at > self.ttl: return None return meta["entries"] def set(self, forecast: str, period: str, entries: list[dict]): path = self.cache_dir / f"{self._key(forecast, period)}.json" path.write_text(json.dumps({ "cached_at": datetime.utcnow().isoformat(), "entries": entries, }))
Incremental Load to Warehouse
-- Use MERGE for incremental updates instead of full reload MERGE INTO clari_forecasts AS target USING staging_clari AS source ON target.owner_email = source.owner_email AND target.time_period = source.time_period AND target.forecast_name = source.forecast_name WHEN MATCHED THEN UPDATE SET forecast_amount = source.forecast_amount, quota_amount = source.quota_amount, crm_total = source.crm_total, crm_closed = source.crm_closed, exported_at = source.exported_at WHEN NOT MATCHED THEN INSERT VALUES ( source.owner_name, source.owner_email, source.forecast_amount, source.quota_amount, source.crm_total, source.crm_closed, source.adjustment_amount, source.time_period, source.exported_at, source.forecast_name );
Performance Benchmarks
| Optimization | Before | After |
|---|---|---|
| Sequential 4-period export | 2 min | 40s (parallel) |
| Cache hit | 5-10s API call | <1ms |
| Full table reload | 30s | 5s (MERGE) |
Resources
Next Steps
For cost optimization, see
clari-cost-tuning.