Source code for atloop.memory.compressor

"""Memory compressor for preventing unbounded growth."""

import json
import logging
from difflib import SequenceMatcher
from typing import Any, Dict, List

from atloop.memory.state import AgentState

logger = logging.getLogger(__name__)


[docs] class MemoryCompressor: """Compress old memory to prevent unbounded growth.""" # Default configuration (will be overridden by config if provided) # These are kept for backward compatibility ATTEMPTS_KEEP_RECENT = 10 DECISIONS_KEEP_RECENT = 5 IMPORTANT_DECISIONS_KEEP = 20 MILESTONES_KEEP = 20 LEARNINGS_KEEP = 10
[docs] @staticmethod def compress_if_needed(state: AgentState, memory_config=None, llm_client=None) -> bool: """ Compress memory if it exceeds limits. Phase 4: Enhanced with LLM compression and deduplication. Args: state: Agent state memory_config: Optional MemoryConfig instance (if None, uses defaults) llm_client: Optional LLMClient for LLM compression (if None, LLM compression is skipped) Returns: True if compression was performed, False otherwise """ # Use config values if provided, otherwise use defaults if memory_config: attempts_keep = memory_config.attempts_keep_recent decisions_keep = memory_config.decisions_keep_recent important_decisions_keep = memory_config.important_decisions_keep milestones_keep = memory_config.milestones_keep learnings_keep = memory_config.learnings_keep else: attempts_keep = MemoryCompressor.ATTEMPTS_KEEP_RECENT decisions_keep = MemoryCompressor.DECISIONS_KEEP_RECENT important_decisions_keep = MemoryCompressor.IMPORTANT_DECISIONS_KEEP milestones_keep = MemoryCompressor.MILESTONES_KEEP learnings_keep = MemoryCompressor.LEARNINGS_KEEP compressed = False # 1. Based-rule compression (existing logic) # Compress attempts if len(state.memory.attempts) > attempts_keep: MemoryCompressor._compress_attempts(state, attempts_keep) compressed = True # Compress decisions if len(state.memory.decisions) > decisions_keep: MemoryCompressor._compress_decisions(state, decisions_keep) compressed = True # Trim important_decisions (keep most recent) if len(state.memory.important_decisions) > important_decisions_keep: state.memory.important_decisions = state.memory.important_decisions[ -important_decisions_keep: ] logger.info( f"[MemoryCompressor] 修剪 important_decisions 到 {important_decisions_keep} 个" ) compressed = True # Trim milestones (keep most recent) if len(state.memory.milestones) > milestones_keep: state.memory.milestones = state.memory.milestones[-milestones_keep:] logger.info(f"[MemoryCompressor] 修剪 milestones 到 {milestones_keep} 个") compressed = True # Trim learnings (keep most recent) if len(state.memory.learnings) > learnings_keep: state.memory.learnings = state.memory.learnings[-learnings_keep:] logger.info(f"[MemoryCompressor] 修剪 learnings 到 {learnings_keep} 个") compressed = True # Phase 4: 2. LLM compression (if enabled and threshold exceeded) if memory_config and memory_config.llm_compression_enabled and llm_client: try: # Estimate memory size using new interface memory_context = state.memory.get_formatted_context( state=state, max_length=999999, ) memory_size = len(memory_context) if memory_size > memory_config.llm_compression_threshold: logger.info( f"[MemoryCompressor] Memory size ({memory_size} chars) exceeds LLM compression threshold ({memory_config.llm_compression_threshold}), triggering LLM compression" ) MemoryCompressor._compress_with_llm(state, memory_config, llm_client) compressed = True except Exception as e: logger.warning( f"[MemoryCompressor] LLM compression failed: {e}, continuing with rule-based compression only" ) # Phase 4: 3. Deduplication (if enabled) if memory_config and memory_config.deduplication_enabled: try: if MemoryCompressor._deduplicate_memory(state, memory_config): compressed = True except Exception as e: logger.warning( f"[MemoryCompressor] Deduplication failed: {e}, continuing without deduplication" ) # Phase 5: Compress modified files content (handled in AgentLoop._compress_modified_files_if_needed) # This is called automatically after each file modification, so we don't need to do it here return compressed
@staticmethod def _compress_attempts(state: AgentState, keep_recent: int = None) -> None: """Compress old attempts, keeping only recent ones.""" if keep_recent is None: keep_recent = MemoryCompressor.ATTEMPTS_KEEP_RECENT if len(state.memory.attempts) <= keep_recent: return recent = state.memory.attempts[-keep_recent:] old = state.memory.attempts[:-keep_recent] # Generate summary of old attempts summary = MemoryCompressor._summarize_attempts(old) # Create compressed record compressed_record = { "step": 0, "type": "compressed", "summary": summary, "original_count": len(old), "compressed_at_step": state.step, "files": [], # No files in compressed record "success": None, # Mixed results "results": [], } # Replace with compressed + recent state.memory.attempts = [compressed_record] + recent logger.info( f"[MemoryCompressor] 压缩了 {len(old)} 个旧 attempts,保留 {len(recent)} 个最近的" ) @staticmethod def _compress_decisions(state: AgentState, keep_recent: int = None) -> None: """Compress old decisions, keeping only recent ones.""" if keep_recent is None: keep_recent = MemoryCompressor.DECISIONS_KEEP_RECENT if len(state.memory.decisions) <= keep_recent: return recent = state.memory.decisions[-keep_recent:] old = state.memory.decisions[:-keep_recent] # Generate summary of old decisions summary = MemoryCompressor._summarize_decisions(old) # Add summary to learnings learning_entry = f"[Step {state.step}] 历史决策总结: {summary}" state.memory.learnings.append(learning_entry) # Keep only recent decisions state.memory.decisions = recent logger.info( f"[MemoryCompressor] 压缩了 {len(old)} 个旧 decisions,保留 {len(recent)} 个最近的" ) @staticmethod def _summarize_attempts(attempts: List[Dict[str, Any]]) -> str: """Summarize a list of attempts.""" if not attempts: return "无历史尝试" total = len(attempts) successful = sum(1 for a in attempts if a.get("success", False)) files_modified = set() tools_used = set() for a in attempts: files_modified.update(a.get("files", [])) for result in a.get("results", []): tools_used.add(result.get("tool", "unknown")) return ( f"历史 {total} 次尝试:成功 {successful} 次," f"修改了 {len(files_modified)} 个文件,使用了 {len(tools_used)} 种工具" ) @staticmethod def _summarize_decisions(decisions: List[Dict[str, Any]]) -> str: """ Summarize a list of decisions, extracting key factual information. NOTE: Only extracts factual information, NOT LLM's thinking process. """ if not decisions: return "无历史决策" total = len(decisions) total_actions = sum(len(d.get("actions", [])) for d in decisions) # ✓ 改进:提取关键事实信息 # 统计 stop_reason 分布 stop_reasons = {} for d in decisions: reason = d.get("stop_reason", "unknown") stop_reasons[reason] = stop_reasons.get(reason, 0) + 1 # 统计验证结果 verification_results = { "success": 0, "failure": 0, "unknown": 0, } for d in decisions: verification = d.get("verification_success") if verification is True: verification_results["success"] += 1 elif verification is False: verification_results["failure"] += 1 else: verification_results["unknown"] += 1 # 统计常用工具 tools_used = {} for d in decisions: actions = d.get("actions", []) for action in actions: if isinstance(action, dict): tool = action.get("tool", "unknown") tools_used[tool] = tools_used.get(tool, 0) + 1 # 构建摘要 summary_parts = [f"历史 {total} 个决策,共执行了 {total_actions} 个动作"] if stop_reasons: reasons_str = ", ".join([f"{k}:{v}" for k, v in stop_reasons.items()]) summary_parts.append(f"停止原因分布: {reasons_str}") if verification_results["success"] > 0 or verification_results["failure"] > 0: summary_parts.append( f"验证结果: 成功 {verification_results['success']} 次, " f"失败 {verification_results['failure']} 次" ) if tools_used: top_tools = sorted(tools_used.items(), key=lambda x: x[1], reverse=True)[:3] tools_str = ", ".join([f"{tool}({count})" for tool, count in top_tools]) summary_parts.append(f"常用工具: {tools_str}") return "。".join(summary_parts) # Phase 4: LLM Compression @staticmethod def _compress_with_llm(state: AgentState, memory_config, llm_client) -> None: """ Use LLM to compress old memory history. Args: state: Agent state memory_config: MemoryConfig instance llm_client: LLMClient instance for compression """ # Keep recent N decisions (configurable, default 10) recent_count = 10 if len(state.memory.decisions) <= recent_count: return old_decisions = state.memory.decisions[:-recent_count] recent_decisions = state.memory.decisions[-recent_count:] if not old_decisions: return logger.info(f"[MemoryCompressor] 开始 LLM 压缩: {len(old_decisions)} 个旧决策") try: # ✓ 改进:过滤掉不应反馈给 LLM 的字段 # 只保留事实信息,排除 LLM 的主观内容(current_step_thoughts, plan, llm_output) filtered_decisions = [] for decision in old_decisions: # 只保留事实信息,排除 LLM 的主观内容 filtered = { "step": decision.get("step"), "stop_reason": decision.get("stop_reason"), "actions_count": decision.get("actions_count"), "verification_success": decision.get("verification_success"), "actions": decision.get("actions", []), # 动作列表是事实 # ❌ 明确排除以下字段(防止反馈循环): # - current_step_thoughts (LLM 思考过程) # - plan (LLM 计划,已在 memory.plan 中) # - llm_output (完整 LLM 输出) } filtered_decisions.append(filtered) # Build compression prompt # Limit the data size to avoid exceeding LLM context max_data_size = 50000 # 50KB of decision data decisions_json = json.dumps(filtered_decisions, ensure_ascii=False, indent=2) if len(decisions_json) > max_data_size: # Truncate if too large decisions_json = decisions_json[:max_data_size] + "\n... [数据已截断]" compression_prompt = f"""请将以下历史决策压缩为简洁的摘要,保留关键信息: {decisions_json} 要求: 1. 保留任务目标、关键决策、重要里程碑 2. 移除重复和冗余信息 3. 保留工具执行结果的关键信息(错误、成功状态) 4. 摘要长度控制在 {memory_config.llm_compression_target // 2} 字符以内 5. 使用结构化格式(Markdown) 6. ⚠️ **重要**:只提取事实信息(工具、动作、结果),不要包含任何 LLM 的思考过程、假设或推理 注意:这些是历史决策的事实信息(工具、动作、结果),不包含 LLM 的思考过程。 输出格式: ## 压缩摘要 [摘要内容 - 只包含事实信息] ## 关键信息 - 任务目标:... - 关键决策:... - 重要里程碑:...""" # Call LLM for compression # Use a simple completion call (not plan_and_act) # lexilux 2.1.0: complete(messages, *, system=..., params=ChatParams, **kwargs) from lexilux import ChatParams chat_params = ChatParams(temperature=0.3, max_tokens=4000) # Use LLM client's chat directly result = llm_client.chat.complete( compression_prompt, system="你是一个记忆压缩专家。请将历史决策压缩为简洁的摘要,保留关键信息。注意:只处理事实信息,不包含 LLM 的思考过程。", params=chat_params, ) compressed_summary = result.text # Create compressed record compressed_record = { "type": "llm_compressed", "summary": compressed_summary, "original_count": len(old_decisions), "compressed_at_step": state.step, "compression_target": memory_config.llm_compression_target, } # Replace with compressed + recent state.memory.decisions = [compressed_record] + recent_decisions logger.info( f"[MemoryCompressor] ✓ LLM 压缩完成: {len(old_decisions)} 个决策压缩为摘要,保留 {len(recent_decisions)} 个最近的" ) except Exception as e: logger.error(f"[MemoryCompressor] LLM 压缩失败: {e},回退到基于规则的压缩") # Fallback to rule-based compression MemoryCompressor._compress_decisions(state, recent_count) # Phase 4: Deduplication @staticmethod def _deduplicate_memory(state: AgentState, memory_config) -> bool: """ Deduplicate similar memory items. Args: state: Agent state memory_config: MemoryConfig instance Returns: True if deduplication was performed, False otherwise """ deduplicated = False # Deduplicate decisions if len(state.memory.decisions) > 1: unique_decisions = [] seen_signatures = set() for decision in state.memory.decisions: signature = MemoryCompressor._get_decision_signature(decision) if signature not in seen_signatures: seen_signatures.add(signature) unique_decisions.append(decision) else: # Check similarity with existing decisions similar = False for existing in unique_decisions: similarity = MemoryCompressor._calculate_similarity(decision, existing) if similarity >= memory_config.deduplication_similarity_threshold: # Merge similar decisions (keep the more recent one) similar = True break if not similar: # Not similar enough, keep it unique_decisions.append(decision) else: deduplicated = True if deduplicated: original_count = len(state.memory.decisions) state.memory.decisions = unique_decisions logger.info( f"[MemoryCompressor] 去重完成: {original_count} 个决策 -> {len(unique_decisions)} 个唯一决策" ) # Deduplicate attempts (similar logic) if len(state.memory.attempts) > 1: unique_attempts = [] seen_attempt_signatures = set() for attempt in state.memory.attempts: # Skip compressed records if attempt.get("type") == "compressed" or attempt.get("type") == "llm_compressed": unique_attempts.append(attempt) continue signature = MemoryCompressor._get_attempt_signature(attempt) if signature not in seen_attempt_signatures: seen_attempt_signatures.add(signature) unique_attempts.append(attempt) else: # Check similarity similar = False for existing in unique_attempts: if existing.get("type") in ["compressed", "llm_compressed"]: continue similarity = MemoryCompressor._calculate_attempt_similarity( attempt, existing ) if similarity >= memory_config.deduplication_similarity_threshold: similar = True break if not similar: unique_attempts.append(attempt) else: deduplicated = True if deduplicated: original_count = len(state.memory.attempts) state.memory.attempts = unique_attempts logger.info( f"[MemoryCompressor] 去重完成: {original_count} 个尝试 -> {len(unique_attempts)} 个唯一尝试" ) return deduplicated @staticmethod def _get_decision_signature(decision: Dict[str, Any]) -> str: """Get a signature for a decision (for deduplication).""" # Create signature from key fields step = decision.get("step", "") # ✓ 更新:使用 current_step_thoughts 而不是 thought_summary # 支持向后兼容:如果 current_step_thoughts 不存在,尝试 thought_summary thought = ( decision.get("current_step_thoughts", "") or decision.get("thought_summary", "") # 向后兼容 )[:50] # First 50 chars actions_count = len(decision.get("actions", [])) stop_reason = decision.get("stop_reason", "") return f"{step}:{thought}:{actions_count}:{stop_reason}" @staticmethod def _get_attempt_signature(attempt: Dict[str, Any]) -> str: """Get a signature for an attempt (for deduplication).""" step = attempt.get("step", "") files = sorted(attempt.get("files", [])) files_str = ",".join(files[:5]) # First 5 files success = attempt.get("success", False) return f"{step}:{files_str}:{success}" @staticmethod def _calculate_similarity(decision1: Dict[str, Any], decision2: Dict[str, Any]) -> float: """Calculate similarity between two decisions (0.0-1.0).""" # Compare key fields # ✓ 更新:使用 current_step_thoughts 而不是 thought_summary # 支持向后兼容:如果 current_step_thoughts 不存在,尝试 thought_summary thought1 = str( decision1.get("current_step_thoughts", "") or decision1.get("thought_summary", "") # 向后兼容 ) thought2 = str( decision2.get("current_step_thoughts", "") or decision2.get("thought_summary", "") # 向后兼容 ) actions1 = decision1.get("actions", []) actions2 = decision2.get("actions", []) # Calculate text similarity for current_step_thoughts if thought1 and thought2: similarity = SequenceMatcher(None, thought1, thought2).ratio() else: similarity = 0.0 # Boost similarity if actions are similar if actions1 and actions2: # Simple comparison: same number of actions and same tools tools1 = [a.get("tool", "") for a in actions1 if isinstance(a, dict)] tools2 = [a.get("tool", "") for a in actions2 if isinstance(a, dict)] if tools1 == tools2: similarity = min(1.0, similarity + 0.2) return similarity @staticmethod def _calculate_attempt_similarity(attempt1: Dict[str, Any], attempt2: Dict[str, Any]) -> float: """Calculate similarity between two attempts (0.0-1.0).""" # Compare files modified files1 = set(attempt1.get("files", [])) files2 = set(attempt2.get("files", [])) if not files1 and not files2: return 1.0 # Both empty if not files1 or not files2: return 0.0 # Jaccard similarity intersection = len(files1 & files2) union = len(files1 | files2) if union == 0: return 1.0 return intersection / union