Research-mind toolchains-ai-techniques-session-compression
AI Session Compression Techniques
git clone https://github.com/MacPhobos/research-mind
T=$(mktemp -d) && git clone --depth=1 https://github.com/MacPhobos/research-mind "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/toolchains-ai-techniques-session-compression" ~/.claude/skills/macphobos-research-mind-toolchains-ai-techniques-session-compression && rm -rf "$T"
.claude/skills/toolchains-ai-techniques-session-compression/skill.mdAI Session Compression Techniques
Summary
Compress long AI conversations to fit context windows while preserving critical information.
Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.
Key Benefits:
- Cost Reduction: 80-90% token cost savings through hierarchical memory
- Performance: 2x faster responses with compressed context
- Scalability: Handle conversations exceeding 1M tokens
- Quality: Preserve critical information with <2% accuracy loss
When to Use
Use session compression when:
- Multi-turn conversations approach context window limits (>50% capacity)
- Long-running chat sessions (customer support, tutoring, code assistants)
- Token costs become significant (high-volume applications)
- Response latency increases due to large context
- Managing conversation history across multiple sessions
Don't use when:
- Short conversations (<10 turns) fitting easily in context
- Every detail must be preserved verbatim (legal, compliance)
- Single-turn or stateless interactions
- Context window usage is <30%
Ideal scenarios:
- Chatbots with 50+ turn conversations
- AI code assistants tracking long development sessions
- Customer support with multi-session ticket history
- Educational tutors with student progress tracking
- Multi-day collaborative AI workflows
Quick Start
Basic Setup with LangChain
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic from anthropic import Anthropic # Initialize Claude client llm = ChatAnthropic( model="claude-3-5-sonnet-20241022", api_key="your-api-key" ) # Setup memory with automatic summarization memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, # Summarize when exceeding this return_messages=True ) # Add conversation turns memory.save_context( {"input": "What's session compression?"}, {"output": "Session compression reduces conversation token usage..."} ) # Retrieve compressed context context = memory.load_memory_variables({})
Progressive Compression Pattern
from anthropic import Anthropic client = Anthropic(api_key="your-api-key") class ProgressiveCompressor: def __init__(self, thresholds=[0.70, 0.85, 0.95]): self.thresholds = thresholds self.messages = [] self.max_tokens = 200000 # Claude context window def add_message(self, role: str, content: str): self.messages.append({"role": role, "content": content}) # Check if compression needed current_usage = self._estimate_tokens() usage_ratio = current_usage / self.max_tokens if usage_ratio >= self.thresholds[0]: self._compress(level=self._get_compression_level(usage_ratio)) def _estimate_tokens(self): return sum(len(m["content"]) // 4 for m in self.messages) def _get_compression_level(self, ratio): for i, threshold in enumerate(self.thresholds): if ratio < threshold: return i return len(self.thresholds) def _compress(self, level: int): """Apply compression based on severity level.""" if level == 1: # 70% threshold: Light compression self._remove_redundant_messages() elif level == 2: # 85% threshold: Medium compression self._summarize_old_messages(keep_recent=10) else: # 95% threshold: Aggressive compression self._summarize_old_messages(keep_recent=5) def _remove_redundant_messages(self): """Remove duplicate or low-value messages.""" # Implementation: Use semantic deduplication pass def _summarize_old_messages(self, keep_recent: int): """Summarize older messages, keep recent ones verbatim.""" if len(self.messages) <= keep_recent: return # Messages to summarize to_summarize = self.messages[:-keep_recent] recent = self.messages[-keep_recent:] # Generate summary conversation_text = "\n\n".join([ f"{m['role'].upper()}: {m['content']}" for m in to_summarize ]) response = client.messages.create( model="claude-3-5-haiku-20241022", max_tokens=500, messages=[{ "role": "user", "content": f"Summarize this conversation:\n\n{conversation_text}" }] ) # Replace old messages with summary summary = { "role": "system", "content": f"[Summary]\n{response.content[0].text}" } self.messages = [summary] + recent # Usage compressor = ProgressiveCompressor() for i in range(100): compressor.add_message("user", f"Message {i}") compressor.add_message("assistant", f"Response {i}")
Using Anthropic Prompt Caching (90% Cost Reduction)
from anthropic import Anthropic client = Anthropic(api_key="your-api-key") # Build context with cache control messages = [ { "role": "user", "content": [ { "type": "text", "text": "Long conversation context here...", "cache_control": {"type": "ephemeral"} # Cache this } ] }, { "role": "assistant", "content": "Previous response..." }, { "role": "user", "content": "New question" # Not cached, changes frequently } ] response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages ) # Cache hit reduces costs by 90% for cached content
Core Concepts
Context Windows and Token Limits
Context window: Maximum tokens an LLM can process in a single request (input + output).
Current limits (2025):
- Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
- GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
- Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)
Token estimation:
- English: ~4 characters per token
- Code: ~3 characters per token
- Rule of thumb: 1 token ≈ 0.75 words
Why compression matters:
- Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
- Latency: Larger contexts increase processing time
- Quality: Excessive context can dilute attention on relevant information
Compression Ratios
Compression ratio = Original tokens / Compressed tokens
Industry benchmarks:
- Extractive summarization: 2-3x
- Abstractive summarization: 5-10x
- Hierarchical summarization: 20x+
- LLMLingua (prompt compression): 20x with 1.5% accuracy loss
- KVzip (KV cache compression): 3-4x with 2x speed improvement
Target ratios by use case:
- Customer support: 5-7x (preserve details)
- General chat: 8-12x (balance quality/efficiency)
- Code assistants: 3-5x (preserve technical accuracy)
- Long documents: 15-20x (extract key insights)
Progressive Compression Thresholds
Industry standard pattern:
Context Usage Action Technique ───────────────────────────────────────────────────────── 0-70% No compression Store verbatim 70-85% Light compression Remove redundancy 85-95% Medium compression Summarize old messages 95-100% Aggressive compression Hierarchical + RAG
Implementation guidelines:
- 70% threshold: Remove duplicate/redundant messages, semantic deduplication
- 85% threshold: Summarize messages older than 20 turns, keep recent 10-15
- 95% threshold: Multi-level hierarchical summarization + vector store archival
- Emergency (100%): Drop least important messages, aggressive summarization
Compression Techniques
1. Summarization Techniques
1.1 Extractive Summarization
Selects key sentences/phrases without modification.
Pros: No hallucination, fast, deterministic Cons: Limited compression (2-3x), may feel disjointed Best for: Legal/compliance, short-term compression
from sklearn.feature_extraction.text import TfidfVectorizer import numpy as np def extractive_compress(messages: list, compression_ratio: float = 0.3): """Extract most important messages using TF-IDF scoring.""" texts = [msg['content'] for msg in messages] # Calculate TF-IDF scores vectorizer = TfidfVectorizer() tfidf_matrix = vectorizer.fit_transform(texts) scores = np.array(tfidf_matrix.sum(axis=1)).flatten() # Select top messages n_keep = max(1, int(len(messages) * compression_ratio)) top_indices = sorted(np.argsort(scores)[-n_keep:]) return [messages[i] for i in top_indices]
1.2 Abstractive Summarization
Uses LLMs to semantically condense conversation history.
Pros: Higher compression (5-10x), coherent, synthesizes information Cons: Risk of hallucination, higher cost, less deterministic Best for: General chat, customer support, multi-session continuity
from anthropic import Anthropic def abstractive_compress(messages: list, client: Anthropic): """Generate semantic summary using Claude.""" conversation_text = "\n\n".join([ f"{msg['role'].upper()}: {msg['content']}" for msg in messages ]) response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=500, messages=[{ "role": "user", "content": f"""Summarize this conversation, preserving: 1. Key decisions made 2. Important context and facts 3. Unresolved questions 4. Action items Conversation: {conversation_text} Summary (aim for 1/5 the original length):""" }] ) return { "role": "assistant", "content": f"[Summary]\n{response.content[0].text}" }
1.3 Hierarchical Summarization (Multi-Level)
Creates summaries of summaries in a tree structure.
Pros: Extreme compression (20x+), handles 1M+ token conversations Cons: Complex implementation, multiple LLM calls, information loss accumulates Best for: Long-running conversations, multi-session applications
Architecture:
Level 0 (Raw): [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8] Level 1 (Chunk): [Summary1-2] [Summary3-4] [Summary5-6] [Summary7-8] Level 2 (Group): [Summary1-4] [Summary5-8] Level 3 (Session): [Overall Session Summary]
from anthropic import Anthropic from typing import List, Dict class HierarchicalMemory: def __init__(self, client: Anthropic, chunk_size: int = 10): self.client = client self.chunk_size = chunk_size self.levels: List[List[Dict]] = [[]] # Level 0 = raw messages def add_message(self, message: Dict): """Add message and trigger summarization if needed.""" self.levels[0].append(message) if len(self.levels[0]) >= self.chunk_size * 2: self._summarize_level(0) def _summarize_level(self, level: int): """Summarize a level into the next higher level.""" messages = self.levels[level] # Ensure next level exists while len(self.levels) <= level + 1: self.levels.append([]) # Summarize first chunk chunk = messages[:self.chunk_size] summary = self._generate_summary(chunk, level) # Move to next level self.levels[level + 1].append(summary) self.levels[level] = messages[self.chunk_size:] # Recursively check if next level needs summarization if len(self.levels[level + 1]) >= self.chunk_size * 2: self._summarize_level(level + 1) def _generate_summary(self, messages: List[Dict], level: int) -> Dict: """Generate summary for a chunk.""" conversation_text = "\n\n".join([ f"{msg['role'].upper()}: {msg['content']}" for msg in messages ]) response = self.client.messages.create( model="claude-3-5-haiku-20241022", max_tokens=300, messages=[{ "role": "user", "content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}" }] ) return { "role": "system", "content": f"[L{level+1} Summary] {response.content[0].text}", "level": level + 1 } def get_context(self, max_tokens: int = 4000) -> List[Dict]: """Retrieve context within token budget.""" context = [] token_count = 0 # Prioritize recent raw messages for msg in reversed(self.levels[0]): msg_tokens = len(msg['content']) // 4 if token_count + msg_tokens > max_tokens * 0.6: break context.insert(0, msg) token_count += msg_tokens # Add summaries from higher levels for level in range(1, len(self.levels)): for summary in self.levels[level]: summary_tokens = len(summary['content']) // 4 if token_count + summary_tokens > max_tokens: break context.insert(0, summary) token_count += summary_tokens return context
Academic reference: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
1.4 Rolling Summarization (Continuous)
Continuously compresses conversation with sliding window.
Pros: Low latency, predictable token usage, simple Cons: Early details over-compressed, no information recovery Best for: Real-time chat, streaming conversations
from anthropic import Anthropic class RollingMemory: def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15): self.client = client self.window_size = window_size self.compress_threshold = compress_threshold self.rolling_summary = None self.recent_messages = [] def add_message(self, message: dict): self.recent_messages.append(message) if len(self.recent_messages) >= self.compress_threshold: self._compress() def _compress(self): """Compress older messages into rolling summary.""" messages_to_compress = self.recent_messages[:-self.window_size] parts = [] if self.rolling_summary: parts.append(f"Existing summary:\n{self.rolling_summary}") parts.append("\nNew messages:\n" + "\n\n".join([ f"{msg['role']}: {msg['content']}" for msg in messages_to_compress ])) response = self.client.messages.create( model="claude-3-5-haiku-20241022", max_tokens=400, messages=[{ "role": "user", "content": "\n".join(parts) + "\n\nUpdate the summary:" }] ) self.rolling_summary = response.content[0].text self.recent_messages = self.recent_messages[-self.window_size:] def get_context(self): context = [] if self.rolling_summary: context.append({ "role": "system", "content": f"[Summary]\n{self.rolling_summary}" }) context.extend(self.recent_messages) return context
2. Embedding-Based Approaches
2.1 RAG (Retrieval-Augmented Generation)
Store full conversation in vector database, retrieve only relevant chunks.
Pros: Extremely scalable, no information loss, high relevance Cons: Requires vector DB infrastructure, retrieval latency Best for: Knowledge bases, customer support with large history
from anthropic import Anthropic from openai import OpenAI import chromadb class RAGMemory: def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI): self.anthropic = anthropic_client self.openai = openai_client # Initialize vector store self.chroma = chromadb.Client() self.collection = self.chroma.create_collection( name="conversation", metadata={"hnsw:space": "cosine"} ) self.recent_messages = [] self.recent_window = 5 self.message_counter = 0 def add_message(self, message: dict): """Add to recent memory and vector store.""" self.recent_messages.append(message) if len(self.recent_messages) > self.recent_window: old_msg = self.recent_messages.pop(0) self._store_in_vectordb(old_msg) def _store_in_vectordb(self, message: dict): """Archive to vector database.""" # Generate embedding response = self.openai.embeddings.create( model="text-embedding-3-small", input=message['content'] ) self.collection.add( embeddings=[response.data[0].embedding], documents=[message['content']], metadatas=[{"role": message['role']}], ids=[f"msg_{self.message_counter}"] ) self.message_counter += 1 def retrieve_context(self, query: str, max_tokens: int = 4000): """Retrieve relevant context using RAG.""" context = [] token_count = 0 # 1. Recent messages (short-term memory) for msg in self.recent_messages: context.append(msg) token_count += len(msg['content']) // 4 # 2. Retrieve relevant historical context if token_count < max_tokens: query_embedding = self.openai.embeddings.create( model="text-embedding-3-small", input=query ) n_results = min(10, (max_tokens - token_count) // 100) results = self.collection.query( query_embeddings=[query_embedding.data[0].embedding], n_results=n_results ) for i, doc in enumerate(results['documents'][0]): if token_count + len(doc) // 4 > max_tokens: break metadata = results['metadatas'][0][i] context.insert(0, { "role": metadata['role'], "content": f"[Retrieved] {doc}" }) token_count += len(doc) // 4 return context
Vector database options:
- ChromaDB: Embedded, easy local development
- Pinecone: Managed, 50ms p95 latency
- Weaviate: Open-source, hybrid search
- Qdrant: High performance, payload filtering
2.2 Vector Search and Clustering
Group similar messages into clusters, represent with centroids.
Pros: Reduces redundancy, identifies themes, multi-topic handling Cons: Requires sufficient data, may lose nuances Best for: Multi-topic conversations, meeting summaries
from sklearn.cluster import KMeans from openai import OpenAI import numpy as np class ClusteredMemory: def __init__(self, openai_client: OpenAI, n_clusters: int = 5): self.client = openai_client self.n_clusters = n_clusters self.messages = [] self.embeddings = [] def add_messages(self, messages: list): for msg in messages: self.messages.append(msg) response = self.client.embeddings.create( model="text-embedding-3-small", input=msg['content'] ) self.embeddings.append(response.data[0].embedding) def compress_by_clustering(self): """Cluster messages and return representatives.""" if len(self.messages) < self.n_clusters: return self.messages embeddings_array = np.array(self.embeddings) kmeans = KMeans(n_clusters=self.n_clusters, random_state=42) labels = kmeans.fit_predict(embeddings_array) # Select message closest to each centroid compressed = [] for cluster_id in range(self.n_clusters): cluster_indices = np.where(labels == cluster_id)[0] centroid = kmeans.cluster_centers_[cluster_id] cluster_embeddings = embeddings_array[cluster_indices] distances = np.linalg.norm(cluster_embeddings - centroid, axis=1) closest_idx = cluster_indices[np.argmin(distances)] compressed.append({ **self.messages[closest_idx], "cluster_id": int(cluster_id), "cluster_size": len(cluster_indices) }) return compressed
2.3 Semantic Deduplication
Remove semantically similar messages that convey redundant information.
Pros: Reduces redundancy without losing unique content Cons: Requires threshold tuning, O(n²) complexity Best for: FAQ systems, repetitive conversations
from openai import OpenAI import numpy as np from sklearn.metrics.pairwise import cosine_similarity class SemanticDeduplicator: def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85): self.client = openai_client self.threshold = similarity_threshold def deduplicate(self, messages: list): """Remove semantically similar messages.""" if len(messages) <= 1: return messages # Generate embeddings embeddings = [] for msg in messages: response = self.client.embeddings.create( model="text-embedding-3-small", input=msg['content'] ) embeddings.append(response.data[0].embedding) embeddings_array = np.array(embeddings) similarity_matrix = cosine_similarity(embeddings_array) # Mark unique messages keep_indices = [] for i in range(len(messages)): is_unique = True for j in keep_indices: if similarity_matrix[i][j] > self.threshold: is_unique = False break if is_unique: keep_indices.append(i) return [messages[i] for i in keep_indices]
3. Token-Efficient Strategies
3.1 Message Prioritization
Assign importance scores and retain only high-priority content.
Pros: Retains most important information, flexible criteria Cons: Scoring is heuristic-based, may break flow Best for: Mixed-importance conversations, filtering noise
import re class MessagePrioritizer: def score_message(self, msg: dict, index: int, total: int) -> float: """Calculate composite importance score.""" scores = [] # Length score (longer = more info) scores.append(min(len(msg['content']) / 500, 1.0)) # Question score if msg['role'] == 'user': scores.append(min(msg['content'].count('?') * 0.5, 1.0)) # Entity score (capitalized words) entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content'])) scores.append(min(entities / 10, 1.0)) # Recency score (linear decay) scores.append(index / max(total - 1, 1)) # Role score scores.append(0.6 if msg['role'] == 'user' else 0.4) return sum(scores) / len(scores) def prioritize(self, messages: list, target_count: int): """Select top N messages by priority.""" scored = [ (msg, self.score_message(msg, i, len(messages)), i) for i, msg in enumerate(messages) ] scored.sort(key=lambda x: x[1], reverse=True) top_messages = scored[:target_count] top_messages.sort(key=lambda x: x[2]) # Restore chronological order return [msg for msg, score, idx in top_messages]
3.2 Delta Compression
Store only changes between consecutive messages.
Pros: Highly efficient for incremental changes Cons: Reconstruction overhead, not suitable for all content Best for: Code assistants with incremental edits
import difflib class DeltaCompressor: def __init__(self): self.base_messages = [] self.deltas = [] def add_message(self, message: dict): if not self.base_messages: self.base_messages.append(message) return # Find most similar previous message last_msg = self.base_messages[-1] if last_msg['role'] == message['role']: # Calculate delta diff = list(difflib.unified_diff( last_msg['content'].splitlines(), message['content'].splitlines(), lineterm='' )) if len('\n'.join(diff)) < len(message['content']) * 0.7: # Store as delta if compression achieved self.deltas.append({ 'base_index': len(self.base_messages) - 1, 'delta': diff, 'role': message['role'] }) return # Store as new base message self.base_messages.append(message) def reconstruct(self): """Reconstruct full conversation from bases + deltas.""" messages = self.base_messages.copy() for delta_info in self.deltas: base_content = messages[delta_info['base_index']]['content'] # Apply diff to reconstruct (simplified) reconstructed = base_content # Full implementation would apply diff messages.append({ 'role': delta_info['role'], 'content': reconstructed }) return messages
4. LangChain Memory Types
4.1 ConversationSummaryMemory
Automatically summarizes conversation as it progresses.
from langchain.memory import ConversationSummaryMemory from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-3-5-sonnet-20241022") memory = ConversationSummaryMemory(llm=llm) # Add conversation memory.save_context( {"input": "Hi, I'm working on a Python project"}, {"output": "Great! How can I help with your Python project?"} ) # Get summary summary = memory.load_memory_variables({}) print(summary['history'])
Pros: Automatic summarization, simple API Cons: Every turn triggers LLM call Best for: Medium conversations (20-50 turns)
4.2 ConversationSummaryBufferMemory
Hybrid: Recent messages verbatim, older summarized.
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-3-5-haiku-20241022") memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, # Summarize when exceeding return_messages=True ) # Add conversation for i in range(50): memory.save_context( {"input": f"Question {i}"}, {"output": f"Answer {i}"} ) # Automatically keeps recent messages + summary of old context = memory.load_memory_variables({})
Pros: Best balance of detail and compression Cons: Requires token limit tuning Best for: Most production applications
4.3 ConversationTokenBufferMemory
Maintains fixed token budget, drops oldest when exceeded.
from langchain.memory import ConversationTokenBufferMemory from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-3-5-sonnet-20241022") memory = ConversationTokenBufferMemory( llm=llm, max_token_limit=2000 ) # Simple FIFO when token limit exceeded
Pros: Predictable token usage, simple Cons: Loses old information completely Best for: Real-time chat with strict limits
4.4 VectorStoreRetrieverMemory
Stores all messages in vector database, retrieves relevant ones.
from langchain.memory import VectorStoreRetrieverMemory from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() vectorstore = Chroma(embedding_function=embeddings) memory = VectorStoreRetrieverMemory( retriever=vectorstore.as_retriever(search_kwargs={"k": 5}) ) # Automatically retrieves most relevant context
Pros: Infinite conversation length, semantic retrieval Cons: Requires vector DB, retrieval overhead Best for: Long-running conversations, knowledge bases
5. Anthropic-Specific Patterns
5.1 Prompt Caching (90% Cost Reduction)
Cache static context to reduce token costs.
from anthropic import Anthropic client = Anthropic(api_key="your-api-key") # Long conversation context conversation_history = [ {"role": "user", "content": "Message 1"}, {"role": "assistant", "content": "Response 1"}, # ... many more messages ] # Mark context for caching messages = [] for i, msg in enumerate(conversation_history[:-1]): content = msg['content'] # Add cache control to last context message if i == len(conversation_history) - 2: messages.append({ "role": msg['role'], "content": [ { "type": "text", "text": content, "cache_control": {"type": "ephemeral"} } ] }) else: messages.append(msg) # Add new user message (not cached) messages.append(conversation_history[-1]) response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages ) # Subsequent calls with same cached context cost 90% less
Cache TTL: 5 minutes Savings: 90% cost reduction for cached tokens Limits: Max 4 cache breakpoints per request Best practices:
- Cache conversation history, not current query
- Update cache when context changes significantly
- Combine with summarization for maximum efficiency
5.2 Extended Thinking for Compression Planning
Use extended thinking to plan optimal compression strategy.
from anthropic import Anthropic client = Anthropic(api_key="your-api-key") response = client.messages.create( model="claude-3-7-sonnet-20250219", max_tokens=16000, thinking={ "type": "enabled", "budget_tokens": 10000 }, messages=[{ "role": "user", "content": f"""Analyze this conversation and recommend compression: {conversation_text} Current token count: {current_tokens} Target: {target_tokens} Required compression: {compression_ratio}x Recommend optimal strategy.""" }] ) # Access thinking process thinking_content = [ block for block in response.content if block.type == "thinking" ] # Get compression recommendation recommendation = response.content[-1].text
Production Patterns
Checkpointing and Persistence
Save compression state for recovery and resume.
import json import pickle from pathlib import Path class PersistentMemory: def __init__(self, checkpoint_dir: str = "./checkpoints"): self.checkpoint_dir = Path(checkpoint_dir) self.checkpoint_dir.mkdir(exist_ok=True) self.memory = [] self.summary = None def save_checkpoint(self, session_id: str): """Save current memory state.""" checkpoint = { 'messages': self.memory, 'summary': self.summary, 'timestamp': time.time() } checkpoint_file = self.checkpoint_dir / f"{session_id}.json" with open(checkpoint_file, 'w') as f: json.dump(checkpoint, f, indent=2) def load_checkpoint(self, session_id: str): """Load memory state from checkpoint.""" checkpoint_file = self.checkpoint_dir / f"{session_id}.json" if checkpoint_file.exists(): with open(checkpoint_file, 'r') as f: checkpoint = json.load(f) self.memory = checkpoint['messages'] self.summary = checkpoint.get('summary') return True return False def auto_checkpoint(self, session_id: str, interval: int = 10): """Automatically save every N messages.""" if len(self.memory) % interval == 0: self.save_checkpoint(session_id)
Resume Workflows
Continue conversations across sessions.
from anthropic import Anthropic import json class ResumableConversation: def __init__(self, client: Anthropic, session_id: str): self.client = client self.session_id = session_id self.memory = self._load_or_create() def _load_or_create(self): """Load existing session or create new.""" try: with open(f'sessions/{self.session_id}.json', 'r') as f: return json.load(f) except FileNotFoundError: return { 'messages': [], 'summary': None, 'created_at': time.time() } def add_turn(self, user_message: str): """Add user message and get response.""" # Add user message self.memory['messages'].append({ 'role': 'user', 'content': user_message }) # Build context (with compression) context = self._build_context() # Get response response = self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=context + [{ 'role': 'user', 'content': user_message }] ) # Save response assistant_message = response.content[0].text self.memory['messages'].append({ 'role': 'assistant', 'content': assistant_message }) # Compress if needed if len(self.memory['messages']) > 20: self._compress() # Save state self._save() return assistant_message def _build_context(self): """Build context with compression.""" context = [] # Add summary if exists if self.memory['summary']: context.append({ 'role': 'system', 'content': f"[Previous conversation summary]\n{self.memory['summary']}" }) # Add recent messages context.extend(self.memory['messages'][-10:]) return context def _compress(self): """Compress older messages.""" if len(self.memory['messages']) < 15: return # Messages to summarize to_summarize = self.memory['messages'][:-10] # Generate summary conversation_text = "\n\n".join([ f"{msg['role']}: {msg['content']}" for msg in to_summarize ]) response = self.client.messages.create( model="claude-3-5-haiku-20241022", max_tokens=500, messages=[{ 'role': 'user', 'content': f"Summarize this conversation:\n\n{conversation_text}" }] ) # Update memory self.memory['summary'] = response.content[0].text self.memory['messages'] = self.memory['messages'][-10:] def _save(self): """Save session to disk.""" with open(f'sessions/{self.session_id}.json', 'w') as f: json.dump(self.memory, f, indent=2) # Usage client = Anthropic(api_key="your-api-key") conversation = ResumableConversation(client, session_id="user123_session1") # Continue across multiple sessions response1 = conversation.add_turn("What's Python?") # ... later session response2 = conversation.add_turn("Show me an example") # Remembers context
Hybrid Approaches (Best Practice)
Combine multiple techniques for optimal results.
from anthropic import Anthropic from openai import OpenAI import chromadb class HybridMemorySystem: """ Combines: - Rolling summarization (short-term compression) - RAG retrieval (long-term memory) - Prompt caching (cost optimization) - Progressive compression (adaptive behavior) """ def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI): self.anthropic = anthropic_client self.openai = openai_client # Recent messages (verbatim) self.recent_messages = [] self.recent_window = 10 # Rolling summary self.rolling_summary = None # Vector store (long-term) self.chroma = chromadb.Client() self.collection = self.chroma.create_collection(name="memory") self.message_counter = 0 # Compression thresholds self.thresholds = { 'light': 0.70, # Start basic compression 'medium': 0.85, # Aggressive summarization 'heavy': 0.95 # Emergency measures } def add_message(self, message: dict): """Add message with intelligent compression.""" self.recent_messages.append(message) # Check compression needs usage_ratio = self._estimate_usage() if usage_ratio >= self.thresholds['heavy']: self._emergency_compress() elif usage_ratio >= self.thresholds['medium']: self._medium_compress() elif usage_ratio >= self.thresholds['light']: self._light_compress() def _light_compress(self): """Remove redundancy, archive to vector store.""" if len(self.recent_messages) > self.recent_window * 1.5: # Archive oldest to vector store to_archive = self.recent_messages[:5] for msg in to_archive: self._archive_to_vectorstore(msg) self.recent_messages = self.recent_messages[5:] def _medium_compress(self): """Generate rolling summary, aggressive archival.""" if len(self.recent_messages) > self.recent_window: # Summarize older messages to_summarize = self.recent_messages[:-self.recent_window] summary_text = "\n\n".join([ f"{msg['role']}: {msg['content']}" for msg in to_summarize ]) if self.rolling_summary: summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}" response = self.anthropic.messages.create( model="claude-3-5-haiku-20241022", max_tokens=400, messages=[{ 'role': 'user', 'content': f"Update summary:\n{summary_text}" }] ) self.rolling_summary = response.content[0].text # Archive all summarized messages for msg in to_summarize: self._archive_to_vectorstore(msg) self.recent_messages = self.recent_messages[-self.recent_window:] def _emergency_compress(self): """Extreme compression for near-limit situations.""" # Keep only 5 most recent messages to_archive = self.recent_messages[:-5] for msg in to_archive: self._archive_to_vectorstore(msg) self.recent_messages = self.recent_messages[-5:] # Compress summary further if needed if self.rolling_summary and len(self.rolling_summary) > 1000: response = self.anthropic.messages.create( model="claude-3-5-haiku-20241022", max_tokens=200, messages=[{ 'role': 'user', 'content': f"Create ultra-concise summary:\n{self.rolling_summary}" }] ) self.rolling_summary = response.content[0].text def _archive_to_vectorstore(self, message: dict): """Store in vector database for retrieval.""" embedding_response = self.openai.embeddings.create( model="text-embedding-3-small", input=message['content'] ) self.collection.add( embeddings=[embedding_response.data[0].embedding], documents=[message['content']], metadatas=[{'role': message['role']}], ids=[f"msg_{self.message_counter}"] ) self.message_counter += 1 def get_context(self, current_query: str, max_tokens: int = 8000): """Build optimal context for current query.""" context = [] token_count = 0 # 1. Add rolling summary (if exists) if self.rolling_summary: summary_msg = { 'role': 'system', 'content': [ { 'type': 'text', 'text': f"[Conversation Summary]\n{self.rolling_summary}", 'cache_control': {'type': 'ephemeral'} # Cache it } ] } context.append(summary_msg) token_count += len(self.rolling_summary) // 4 # 2. Retrieve relevant historical context (RAG) if token_count < max_tokens * 0.3: query_embedding = self.openai.embeddings.create( model="text-embedding-3-small", input=current_query ) results = self.collection.query( query_embeddings=[query_embedding.data[0].embedding], n_results=5 ) for i, doc in enumerate(results['documents'][0]): if token_count + len(doc) // 4 > max_tokens * 0.3: break metadata = results['metadatas'][0][i] context.append({ 'role': metadata['role'], 'content': f"[Retrieved] {doc}" }) token_count += len(doc) // 4 # 3. Add recent messages verbatim for msg in self.recent_messages: if token_count + len(msg['content']) // 4 > max_tokens * 0.8: break context.append(msg) token_count += len(msg['content']) // 4 return context def _estimate_usage(self): """Estimate current context window usage.""" total_tokens = 0 if self.rolling_summary: total_tokens += len(self.rolling_summary) // 4 for msg in self.recent_messages: total_tokens += len(msg['content']) // 4 return total_tokens / 200000 # Claude Sonnet context window # Usage anthropic_client = Anthropic(api_key="your-anthropic-key") openai_client = OpenAI(api_key="your-openai-key") memory = HybridMemorySystem(anthropic_client, openai_client) # Add messages over time for i in range(1000): memory.add_message({ 'role': 'user' if i % 2 == 0 else 'assistant', 'content': f"Message {i} with some content..." }) # Retrieve optimized context current_query = "What did we discuss about pricing?" context = memory.get_context(current_query) # Use with Claude response = anthropic_client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=context + [{ 'role': 'user', 'content': current_query }] )
Performance Benchmarks
Compression Efficiency
| Technique | Compression Ratio | Quality Loss | Latency | Cost Impact |
|---|---|---|---|---|
| Extractive | 2-3x | <1% | <10ms | None |
| Abstractive | 5-10x | 2-5% | 1-2s | +$0.001/turn |
| Hierarchical | 20x+ | 5-8% | 2-5s | +$0.003/turn |
| LLMLingua | 20x | 1.5% | 500ms | None |
| RAG | Variable | <1% | 100-300ms | +$0.0005/turn |
| Prompt Caching | N/A | 0% | 0ms | -90% |
Token Savings by Use Case
Customer Support (50-turn conversation):
- No compression: ~8,000 tokens/request
- Rolling summary: ~2,000 tokens/request (75% reduction)
- Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)
Code Assistant (100-turn session):
- No compression: ~25,000 tokens/request
- Hierarchical: ~5,000 tokens/request (80% reduction)
- Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)
Educational Tutor (multi-session):
- No compression: Would exceed context window
- RAG + summarization: ~3,000 tokens/request
- Infinite session length enabled
Cost Analysis
Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)
1,000 conversations, 50 turns each:
-
No compression:
- Avg 8K tokens/request × 50K requests = 400M tokens
- Cost: $1,200
-
With rolling summarization:
- Avg 2K tokens/request × 50K requests = 100M tokens
- Summarization overhead: +10M tokens
- Cost: $330 (72% savings)
-
With hybrid system + caching:
- First turn: 2K tokens (no cache)
- Subsequent: 200 tokens effective (90% cache hit)
- Total: ~15M tokens effective
- Cost: $45 (96% savings)
Tool Recommendations
Memory Management Tools
Mem0 (Recommended for Production)
Best for: Hybrid memory systems with minimal code
from mem0 import MemoryClient client = MemoryClient(api_key="your-mem0-key") # Automatically handles compression, summarization, RAG memory = client.create_memory( user_id="user123", messages=[ {"role": "user", "content": "I'm working on a Python project"}, {"role": "assistant", "content": "Great! What kind of project?"} ] ) # Retrieve relevant context context = client.get_memory( user_id="user123", query="What programming language am I using?" )
Features:
- Automatic hierarchical summarization
- Built-in RAG retrieval
- Multi-user session management
- Analytics dashboard
Pricing: $0.40/1K memory operations
Zep
Best for: Low-latency production deployments**
from zep_python import ZepClient client = ZepClient(api_key="your-zep-key") # Add to session client.memory.add_memory( session_id="session123", messages=[ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"} ] ) # Auto-summarized retrieval memory = client.memory.get_memory(session_id="session123")
Features:
- <100ms retrieval latency
- Automatic fact extraction
- Entity recognition
- Session management
Pricing: Open-source (self-hosted) or $0.50/1K operations (cloud)
ChromaDB
Best for: Self-hosted vector storage**
import chromadb client = chromadb.Client() collection = client.create_collection("conversations") # Store embeddings collection.add( documents=["Message content"], embeddings=[[0.1, 0.2, ...]], ids=["msg1"] ) # Retrieve results = collection.query( query_embeddings=[[0.1, 0.2, ...]], n_results=5 )
Features:
- Fully open-source
- Embedded or client-server
- Fast local development
Pricing: Free (self-hosted)
LangChain
Best for: Rapid prototyping and experimentation**
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-3-5-sonnet-20241022") memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)
Features:
- Multiple memory types
- Framework integration
- Extensive documentation
Pricing: Free (uses your LLM API costs)
Compression Libraries
LLMLingua
Best for: Extreme compression with minimal quality loss**
from llmlingua import PromptCompressor compressor = PromptCompressor() compressed = compressor.compress_prompt( context="Long conversation history...", instruction="Current user query", target_token=500 ) # Achieves 20x compression with 1.5% accuracy loss
Features:
- 20x compression ratios
- <2% quality degradation
- Fast inference (<500ms)
Pricing: Free (open-source)
Use Cases and Patterns
Chatbot (Customer Support)
Requirements:
- Multi-turn conversations (50-100 turns)
- Preserve customer context
- Fast response times
- Cost-efficient
Recommended approach:
- ConversationSummaryBufferMemory (LangChain)
- 70% threshold: Semantic deduplication
- 85% threshold: Rolling summarization
- Prompt caching for frequent patterns
Implementation:
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-3-5-haiku-20241022") memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, return_messages=True ) # Add customer conversation for turn in customer_conversation: memory.save_context( {"input": turn['customer_message']}, {"output": turn['agent_response']} ) # Retrieve compressed context context = memory.load_memory_variables({})
Code Assistant
Requirements:
- Long development sessions (100+ turns)
- Preserve technical details
- Handle large code blocks
- Track incremental changes
Recommended approach:
- Hierarchical summarization for overall context
- RAG retrieval for specific code references
- Delta compression for iterative edits
- Prompt caching for system prompts
Implementation:
from anthropic import Anthropic client = Anthropic(api_key="your-api-key") class CodeAssistantMemory: def __init__(self): self.hierarchy = HierarchicalMemory(client, chunk_size=15) self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client) self.deltas = DeltaCompressor() def add_interaction(self, code_change: dict): # Store in hierarchy self.hierarchy.add_message({ 'role': 'user', 'content': code_change['description'] }) # Store in RAG for retrieval self.rag.add_message(code_change) # Store as delta if incremental if code_change.get('is_incremental'): self.deltas.add_message(code_change) def get_context(self, current_query: str): # Combine hierarchical summary + RAG retrieval summary_context = self.hierarchy.get_context(max_tokens=2000) rag_context = self.rag.retrieve_context(current_query, max_tokens=2000) return summary_context + rag_context
Educational Tutor
Requirements:
- Multi-session tracking
- Student progress persistence
- Personalized context retrieval
- Long-term knowledge retention
Recommended approach:
- VectorStoreRetrieverMemory for multi-session
- Fact extraction for student knowledge
- Progressive compression across sessions
- Resumable conversations
Implementation:
from langchain.memory import VectorStoreRetrieverMemory from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings class TutorMemory: def __init__(self, student_id: str): self.student_id = student_id # Vector store for all sessions embeddings = OpenAIEmbeddings() vectorstore = Chroma( collection_name=f"student_{student_id}", embedding_function=embeddings ) self.memory = VectorStoreRetrieverMemory( retriever=vectorstore.as_retriever(search_kwargs={"k": 10}) ) def add_lesson_content(self, lesson: dict): """Add lesson interaction to student memory.""" self.memory.save_context( {"input": lesson['topic']}, {"output": lesson['explanation']} ) def get_student_context(self, current_topic: str): """Retrieve relevant past lessons for current topic.""" return self.memory.load_memory_variables({ "prompt": current_topic })
Best Practices
1. Choose the Right Technique for Your Use Case
- Short conversations (<20 turns): No compression needed
- Medium conversations (20-50 turns): ConversationSummaryBufferMemory
- Long conversations (50-100 turns): Hierarchical or rolling summarization
- Very long (100+ turns): Hybrid (RAG + summarization + caching)
- Multi-session: VectorStoreRetrieverMemory or Mem0
2. Implement Progressive Compression
Don't compress aggressively from the start. Use thresholds:
- 0-70%: Store verbatim
- 70-85%: Light compression (deduplication)
- 85-95%: Medium compression (summarization)
- 95-100%: Aggressive compression (hierarchical)
3. Combine Techniques
Single-technique approaches are suboptimal. Best production systems use:
- Rolling summarization (short-term)
- RAG retrieval (long-term)
- Prompt caching (cost optimization)
- Semantic deduplication (redundancy removal)
4. Monitor Quality Metrics
Track compression impact:
- Response relevance score
- Information retention rate
- User satisfaction metrics
- Token usage reduction
5. Use Prompt Caching Strategically
Cache stable content:
- Conversation summaries
- System prompts
- Knowledge base context
- User profiles
Don't cache frequently changing content:
- Current user query
- Real-time data
- Session-specific state
6. Implement Checkpointing
Save compression state for:
- Recovery from failures
- Multi-session continuity
- Analytics and debugging
- A/B testing different strategies
7. Tune Compression Parameters
Test and optimize:
- Summary token limits
- Compression thresholds
- Retrieval result counts
- Cache TTLs
- Chunk sizes for hierarchical
8. Handle Edge Cases
Plan for:
- Very long messages (split or compress individually)
- Code blocks (preserve formatting)
- Multi-language content
- Rapidly changing context
Troubleshooting
Problem: Summary loses critical information
Solutions:
- Lower compression ratio (less aggressive)
- Implement importance scoring to preserve key messages
- Use extractive summarization for critical sections
- Increase summary token budget
Problem: Retrieval returns irrelevant context
Solutions:
- Improve embedding model quality
- Add metadata filtering (timestamps, topics)
- Adjust similarity threshold
- Use hybrid search (semantic + keyword)
Problem: High latency from compression
Solutions:
- Compress asynchronously (background tasks)
- Use faster models for summarization (Haiku instead of Sonnet)
- Cache summaries more aggressively
- Reduce compression frequency
Problem: Conversations still exceeding context window
Solutions:
- Implement hierarchical compression
- Archive to vector database more aggressively
- Use more aggressive compression ratios
- Consider switching to model with larger context window
Problem: High costs despite compression
Solutions:
- Implement prompt caching
- Use cheaper models for summarization (Haiku)
- Batch summarization operations
- Reduce summarization frequency
Problem: Lost conversation continuity
Solutions:
- Increase recent message window
- Include summary in every request
- Use more descriptive summaries
- Implement session resumption with context injection
Advanced Topics
Streaming Compression
Compress in real-time as conversation progresses:
async def streaming_compress(messages: list): """Compress while streaming responses.""" compressor = ProgressiveCompressor() async for message in conversation_stream: compressor.add_message(message) # Compression happens asynchronously if compressor.should_compress(): asyncio.create_task(compressor.compress_async()) return compressor.get_context()
Multi-User Session Management
Handle concurrent conversations with shared context:
class MultiUserMemory: def __init__(self): self.user_sessions = {} def get_or_create_session(self, user_id: str): if user_id not in self.user_sessions: self.user_sessions[user_id] = HybridMemorySystem(...) return self.user_sessions[user_id] def cleanup_inactive_sessions(self, timeout: int = 3600): """Remove sessions inactive for > timeout seconds.""" current_time = time.time() inactive = [ user_id for user_id, session in self.user_sessions.items() if current_time - session.last_activity > timeout ] for user_id in inactive: self._archive_session(user_id) del self.user_sessions[user_id]
Custom Importance Scoring
Train ML models to score message importance:
from transformers import pipeline class MLImportanceScorer: def __init__(self): # Use pre-trained classifier or fine-tune on your data self.classifier = pipeline( "text-classification", model="your-importance-model" ) def score(self, message: dict) -> float: """Score message importance (0-1).""" result = self.classifier(message['content']) return result[0]['score']
Context Window Utilization Optimization
Maximize information density within token budget:
def optimize_context_allocation( summary_tokens: int, recent_tokens: int, retrieval_tokens: int, max_tokens: int ): """ Optimal allocation (empirically tested): - 20% summary - 50% recent messages - 30% retrieved context """ return { 'summary': int(max_tokens * 0.20), 'recent': int(max_tokens * 0.50), 'retrieval': int(max_tokens * 0.30) }
Future Directions
Emerging Techniques (2025+)
1. Infinite Attention Mechanisms
- Models with >10M token context windows (Gemini 1.5, future Claude)
- Reduces need for compression but doesn't eliminate cost concerns
2. Learned Compression Models
- Neural networks trained to compress conversation optimally
- Maintain semantic meaning while minimizing tokens
- Examples: LLMLingua v2, PromptCompressor
3. Multimodal Session Compression
- Compress conversations with images, audio, video
- Maintain cross-modal context relationships
4. Federated Memory Systems
- Distributed compression across multiple memory stores
- Privacy-preserving compression for sensitive conversations
5. Adaptive Compression Strategies
- RL-based systems that learn optimal compression per user/domain
- Dynamic threshold adjustment based on conversation importance
References
Academic Papers
- "Recursively Summarizing Enables Long-Term Dialogue Memory" (arXiv:2308.15022)
- "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
- "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)
Documentation
Tools
- Mem0 - Managed memory service
- Zep - Fast memory layer
- LLMLingua - Prompt compression
- ChromaDB - Vector database
Last Updated: 2025-11-30 Version: 1.0.0 License: MIT