Source code for atloop.memory.summarizer

"""Memory summarizer for condensing memory into prompts."""

from typing import Any, Optional

from atloop.config.loader import ConfigLoader
from atloop.memory.formatter import ToolResultFormatter
from atloop.memory.state import AgentState


[docs] class MemorySummarizer: """Summarize agent memory for LLM input."""
[docs] @staticmethod def get_memory_overview(state: AgentState) -> str: """ Get a brief overview of memory for terminal output. Args: state: Agent state Returns: Brief overview string (single line, compact format) """ parts = [] # Created files count if state.memory.created_files: parts.append(f"📁 {len(state.memory.created_files)} files") # Show last file name (truncated if too long) last_file = state.memory.created_files[-1] if len(last_file) > 30: last_file = "..." + last_file[-27:] parts.append(f"Latest: {last_file}") # Recent attempts if state.memory.attempts: last_attempt = state.memory.attempts[-1] success = last_attempt.get("success", False) files = last_attempt.get("files", []) status = "✓" if success else "✗" parts.append(f"{status} Modified {len(files)} files") # Budget usage parts.append(f"💰 LLM:{state.budget_used.llm_calls} Tools:{state.budget_used.tool_calls}") # Long-term memory preview if state.memory.plan or state.memory.task_summary: long_term_parts = [] if state.memory.plan: from atloop.memory.plan import PlanManager plan_str = PlanManager.plan_to_string(state.memory.plan) if plan_str: plan_preview = plan_str[:40] + "..." if len(plan_str) > 40 else plan_str long_term_parts.append(f"Plan: {plan_preview}") if state.memory.important_decisions: long_term_parts.append(f"Decisions:{len(state.memory.important_decisions)}") if state.memory.milestones: long_term_parts.append(f"Milestones:{len(state.memory.milestones)}") if long_term_parts: parts.append(f"≡ {' | '.join(long_term_parts)}") # Last error (if any, very brief) if state.last_error.summary: error_preview = state.last_error.summary[:50] if len(state.last_error.summary) > 50: error_preview += "..." # Extract first line or key info error_first_line = error_preview.split("\n")[0] parts.append(f"⚠️ {error_first_line}") return " | ".join(parts) if parts else "No memory information"
[docs] @staticmethod def summarize( state: AgentState, max_length: Optional[int] = None, task_goal: Optional[str] = None, tool_registry: Optional[Any] = None, ) -> str: """ Summarize agent state memory. Args: state: Agent state max_length: Maximum summary length (defaults to config value) task_goal: Optional task goal for completion detection Returns: Summary string """ config = ConfigLoader.get() if max_length is None: max_length = config.memory.summary_default_limit parts = [] # If memory is completely empty, return a minimal summary if ( not state.memory.task_summary and not state.memory.plan and not state.memory.decisions and not state.memory.attempts and not state.memory.important_decisions and not state.memory.milestones and not state.memory.learnings and not state.memory.llm_responses and not state.memory.tool_results_history and not state.memory.modified_files_content ): return "Initial state: Task just started, no operations executed yet." # Long-term memory: Task summary (shown first, persists across steps) if state.memory.task_summary: parts.append("## ≡ Task Overview (Long-term Memory)") parts.append(state.memory.task_summary) parts.append("") # Show loaded skills from skill cache skill_contents = [] if state.memory.skill_cache: for skill_name, skill_data in list(state.memory.skill_cache.items())[-3:]: metadata = skill_data.get("metadata", {}) resources = skill_data.get("resources", {}) skill_body = metadata.get("body", "") loaded_step = metadata.get("loaded_at_step", "?") if skill_body: skill_contents.append( { "name": skill_name, "content": skill_body, "step": loaded_step, "resources": resources, # Include resource info } ) if skill_contents: parts.append("## 📚 Loaded Skills (Complete Content - Use These Guidelines)") # Show most recent skills (last 3 to avoid too much content) for skill in skill_contents[-3:]: parts.append(f"### Skill: {skill['name']} (Loaded at Step {skill['step']})") parts.append(f"```\n{skill['content']}\n```") parts.append("") # Show cached resources if available resources = skill.get("resources", {}) if resources: cached_resources = [] for resource_type in ["scripts", "references", "assets"]: type_resources = resources.get(resource_type, {}) if type_resources: resource_list = [] for res_name, res_data in type_resources.items(): if isinstance(res_data, dict): res_step = res_data.get("loaded_at_step", "?") resource_list.append(f"{res_name} (Step {res_step})") else: resource_list.append(res_name) if resource_list: cached_resources.append( f"**{resource_type.capitalize()}**: {', '.join(resource_list)}" ) if cached_resources: parts.append("**Cached Resources:**") parts.extend(f"- {r}" for r in cached_resources) parts.append("") parts.append( "**Note**: Use `load_skill_resource` to load additional resources when needed." ) parts.append("") if len(skill_contents) > 3: parts.append( f"*Note: {len(skill_contents) - 3} more skills were loaded earlier. " f"See Recent Attempts for details.*" ) parts.append("") # Long-term memory: Current plan (can be dynamically updated) if state.memory.plan: from atloop.memory.plan import PlanManager, PlanStep parts.append("## 📝 Current Execution Plan (Long-term Memory, Dynamically Updated)") # Convert plan to string representation plan_str = PlanManager.plan_to_string(state.memory.plan) if plan_str: parts.append(plan_str) # Show progress if structured if ( isinstance(state.memory.plan, list) and state.memory.plan and isinstance(state.memory.plan[0], (PlanStep, dict)) ): progress = PlanManager.get_progress(state) if progress["total"] > 0: parts.append( f"\nProgress: {progress['completed']}/{progress['total']} completed " f"({progress['completion_rate'] * 100:.0f}%), " f"{progress['in_progress']} in progress, {progress['pending']} pending" ) parts.append("") # Long-term memory: Important decisions (sorted by importance) if state.memory.important_decisions: from atloop.memory.scorer import ImportanceScorer parts.append("## 🎯 Important Decisions (Long-term Memory)") # Score and sort by importance scored_decisions = [] for decision in state.memory.important_decisions: score = ImportanceScorer.score_decision(decision) scored_decisions.append((score, decision)) # Sort by score (descending) and take top 5 scored_decisions.sort(key=lambda x: x[0], reverse=True) for score, decision in scored_decisions[:5]: step = decision.get("step", "?") content = decision.get("content", "") # Show importance indicator importance_indicator = "⭐" * min(3, int(score * 3) + 1) parts.append(f"- {importance_indicator} Step {step}: {content}") parts.append("") # Long-term memory: Milestones (sorted by importance) if state.memory.milestones: from atloop.memory.scorer import ImportanceScorer parts.append("## 🏆 Achieved Milestones (Long-term Memory)") # Score and sort by importance scored_milestones = [] for milestone in state.memory.milestones: score = ImportanceScorer.score_milestone(milestone) scored_milestones.append((score, milestone)) # Sort by score (descending) and take top 5 scored_milestones.sort(key=lambda x: x[0], reverse=True) for score, milestone in scored_milestones[:5]: step = milestone.get("step", "?") content = milestone.get("content", "") importance_indicator = "⭐" * min(3, int(score * 3) + 1) parts.append(f"- {importance_indicator} Step {step}: {content}") parts.append("") # Long-term memory: Learnings (sorted by importance) if state.memory.learnings: from atloop.memory.scorer import ImportanceScorer parts.append("## 💡 Important Learnings (Long-term Memory)") # Score and sort by importance scored_learnings = [] for learning in state.memory.learnings: score = ImportanceScorer.score_learning(learning) scored_learnings.append((score, learning)) # Sort by score (descending) and take top 3 scored_learnings.sort(key=lambda x: x[0], reverse=True) for score, learning in scored_learnings[:3]: importance_indicator = "⭐" * min(3, int(score * 3) + 1) parts.append(f"- {importance_indicator} {learning}") parts.append("") # Recent Steps Summary (FACTS ONLY - no LLM interpretations) # NOTE: We intentionally do NOT show current_step_thoughts or LLM plans here # to prevent feedback loops where LLM's previous hypotheses become "facts" if state.memory.decisions: parts.append("## Recent Steps (Facts Only)") for decision in state.memory.decisions[-3:]: step = decision.get("step", "?") actions = decision.get("actions", []) actions_count = len(actions) stop_reason = decision.get("stop_reason", "?") # Show only factual information: what tools were called tools_used = [a.get("tool", "?") for a in actions[:3]] tools_str = ", ".join(tools_used) if len(actions) > 3: tools_str += f" ... (+{len(actions) - 3} more)" parts.append( f"- Step {step}: {actions_count} actions [{tools_str}] ({stop_reason})" ) # NOTE: llm_responses are NOT shown to LLM to prevent feedback loops # They are preserved in memory for debugging only # Recent File Modifications - extracted from tool_results_history # NOTE: This replaces the old "Recent Attempts" section # We extract file modifications from tool_results_history to have a single source of truth if state.memory.tool_results_history: # Extract file modifications by step step_files = {} for tool_result in state.memory.tool_results_history: step = tool_result.get("step", 0) modified_files = tool_result.get("modified_files", []) if modified_files: if step not in step_files: step_files[step] = [] step_files[step].extend(modified_files) # Display recent file modifications (last 3 steps) if step_files: parts.append("\n## Recent File Modifications") recent_steps = sorted(step_files.keys(), reverse=True)[:3] for step in recent_steps: files = list(set(step_files[step])) # Remove duplicates parts.append(f"- Step {step}: Modified {len(files)} files") if files: parts.append(f" Files: {', '.join(files[:5])}") if len(files) > 5: parts.append(f" ... (+{len(files) - 5} more)") # Backward compatibility: Also show attempts if tool_results_history is empty # This handles cases where old data doesn't have tool_results_history elif state.memory.attempts: parts.append("\n## Recent File Modifications") for attempt in state.memory.attempts[-3:]: step = attempt.get("step", "?") files = attempt.get("files", []) parts.append(f"- Step {step}: Modified {len(files)} files") if files: parts.append(f" Files: {', '.join(files[:5])}") if len(files) > 5: parts.append(f" ... (+{len(files) - 5} more)") # Task completion status check (add at the beginning for visibility) # Check if task goal matches created files for simple "write code" tasks if task_goal and state.memory.created_files: task_goal_lower = task_goal.lower() # Simple heuristic: if goal contains "write" and "code" and file is created, task might be complete if ("write" in task_goal_lower or "create" in task_goal_lower) and ( "code" in task_goal_lower or "file" in task_goal_lower or "python" in task_goal_lower ): parts.insert(0, "\n## ✓ Task Completion Status") parts.insert(1, f"**Task Goal**: {task_goal}") parts.insert(2, f"**Created Files**: {', '.join(state.memory.created_files)}") parts.insert(3, "") parts.insert( 4, "**Analysis**: File(s) have been created. For simple 'write code' tasks, this typically means the task is complete.", ) parts.insert( 5, "**Recommendation**: If the created file(s) satisfy the task goal, please set `stop_reason='done'`.", ) parts.insert(6, "") # Created files (for resume capability) - Important but after long-term memory if state.memory.created_files: parts.insert(0, "\n## ⚠️⚠️⚠️ Created Files (CRITICAL: Do NOT recreate!)") parts.insert(1, f"**{len(state.memory.created_files)} files created**:") for i, file_path in enumerate(state.memory.created_files[-20:], 1): # Last 20 files parts.insert(1 + i, f"- ✓ {file_path}") if len(state.memory.created_files) > 20: parts.insert( 1 + len(state.memory.created_files[-20:]) + 1, f"... ({len(state.memory.created_files) - 20} more files)", ) insert_pos = ( 1 + min(20, len(state.memory.created_files)) + (2 if len(state.memory.created_files) > 20 else 1) ) parts.insert(insert_pos, "") parts.insert(insert_pos + 1, "🚨🚨🚨 **CRITICAL WARNING**:") parts.insert(insert_pos + 2, "1. **These files already exist, DO NOT recreate them!**") parts.insert( insert_pos + 3, "2. If task requires multiple files, continue creating **remaining files** (not in the list above)", ) parts.insert( insert_pos + 4, "3. If files above need modification, use `edit_file` tool, do NOT use `write_file` to recreate", ) parts.insert( insert_pos + 5, "4. **Before creating any new file, check the list above to ensure no duplicates**", ) parts.insert( insert_pos + 6, "5. If a file is in the list above, it already exists - use `read_file` to read or `edit_file` to modify", ) # Key files if state.memory.key_files: parts.append("\n## Key Files") for key_file in state.memory.key_files[-5:]: # Last 5 path = key_file.get("path", "?") reason = key_file.get("reason", "") parts.append(f"- {path}: {reason}") # Phase 5: Recently modified files content (auto-read) if state.memory.modified_files_content: parts.append("\n## Recently Modified File Content (Auto-read)") # Sort by importance, take top N sorted_files = sorted( state.memory.modified_files_content, key=lambda x: (x.get("importance_score", 0), x.get("last_modified_step", 0)), reverse=True, ) # Show recently modified, most important files (max 5) max_files_to_show = 5 total_size = 0 max_total_size = 20000 # Max 20KB content (~5k tokens) for file_record in sorted_files[:max_files_to_show]: path = file_record.get("path", "?") content = file_record.get("content", "") step = file_record.get("last_modified_step", "?") size = file_record.get("size", 0) importance = file_record.get("importance_score", 0) # If total size exceeds limit, truncate content if total_size + size > max_total_size: remaining = max_total_size - total_size if remaining > 100: # At least show 100 chars content = ( content[:remaining] + f"\n... [File too large, truncated, full content {size} bytes]" ) else: content = f"[File too large ({size} bytes), content not shown]" parts.append(f"\n### {path} (Step {step}, Importance: {importance:.2f})") parts.append(f"```\n{content}\n```") total_size += 100 # Estimate continue parts.append(f"\n### {path} (Step {step}, Importance: {importance:.2f})") # Display strategy based on file size if size > 10000: # Larger than 10KB # Show first 5000 chars and last 500 chars preview = ( content[:5000] + f"\n... [Omitted {size - 5500} chars] ...\n" + content[-500:] ) parts.append(f"```\n{preview}\n```") else: parts.append(f"```\n{content}\n```") total_size += min(size, max_total_size - total_size) if total_size >= max_total_size: remaining_files = len(sorted_files) - max_files_to_show if remaining_files > 0: parts.append(f"\n... [{remaining_files} more files not shown]") break # Notes if state.memory.notes: parts.append("\n## Important Notes") for note in state.memory.notes[-3:]: # Last 3 parts.append(f"- {note}") # NOTE: Loop detection and intervention is now handled by LoopDetector # which provides more robust pattern detection and graduated interventions. # The intervention messages are injected at PlanPhase level, not here. # Last error (includes all recent tool execution results) # CRITICAL: This is the PRIMARY source of tool execution info for LLM # Must include ALL outputs, especially stderr which often contains critical error info if state.last_error.summary: parts.append("\n## Last Tool Execution Result (Most Important)") parts.append("⚠️ Key Points:") parts.append( " - Even if exit_code=0, error messages in stderr (e.g., 'not found', 'error', 'failed') need to be handled" ) parts.append(" - Please carefully check complete content of stderr and stdout") parts.append( " - For shell commands, stderr usually contains the real execution status" ) parts.append("") parts.append(f"{state.last_error.summary}") if state.last_error.repro_cmd: parts.append(f"\nRepro Command: {state.last_error.repro_cmd}") if state.last_error.raw_stderr_tail: # Show more of stderr tail for detailed analysis config = ConfigLoader.get() stderr_tail_limit = config.memory.summary_stderr_tail stderr_tail = ( state.last_error.raw_stderr_tail[-stderr_tail_limit:] if len(state.last_error.raw_stderr_tail) > stderr_tail_limit else state.last_error.raw_stderr_tail ) parts.append( f"\nComplete Stderr Details ({len(state.last_error.raw_stderr_tail)} chars):\n{stderr_tail}" ) # Recent Tool Execution Results - unified display from tool_results_history # This is the ONLY place where tool execution results are shown if state.memory.tool_results_history: parts.append("\n## Recent Tool Execution Results") formatted = ToolResultFormatter.format_results_list( state.memory.tool_results_history, tool_registry=tool_registry, max_count=5, ) if formatted: parts.append(formatted) parts.append("") # NOTE: Removed duplicate "Recent Tool Execution Results" section # Tool execution results are now only shown once in the section above # (from tool_results_history using ToolResultFormatter) summary = "\n".join(parts) # Smart truncation with importance-based prioritization # Priority order: long-term memory > last_error > high-importance items > others config = ConfigLoader.get() min_effective_limit = config.memory.summary_min_effective_length effective_max_length = max(max_length, min_effective_limit) if len(summary) > effective_max_length: # Strategy: Keep long-term memory + last_error + high-importance items # Find section boundaries long_term_end = summary.find("## Recent Decisions") last_error_start = summary.find("## Last Tool Execution Result") # Calculate what we can keep if long_term_end > 0: long_term_section = summary[:long_term_end] else: long_term_section = "" # Try to preserve last_error section if last_error_start > 0: # Keep long-term + last_error remaining = effective_max_length - len(long_term_section) if remaining > 0: # Find end of last_error section (before "## Recent Tool Execution Results" or end) recent_exec_start = summary.find( "## Recent Tool Execution Results", last_error_start ) if recent_exec_start > 0: last_error_section = summary[last_error_start:recent_exec_start] else: # Take as much as we can last_error_section = summary[ last_error_start : last_error_start + remaining ] # Truncate last_error_section if needed if len(long_term_section) + len(last_error_section) > effective_max_length: available = ( effective_max_length - len(long_term_section) - 100 ) # Reserve 100 chars for message last_error_section = last_error_section[:available] + "..." summary = long_term_section + "\n" + last_error_section if len(summary) < effective_max_length: summary += "\n[Summary truncated, but preserved long-term memory and last tool execution result...]" else: summary = ( long_term_section + "\n[Summary truncated, but preserved long-term memory...]" ) else: # Fallback: simple truncation summary = summary[:effective_max_length] + "\n[Summary truncated...]" return summary if summary.strip() else "No memory information"