Source code for atloop.orchestrator.phases.act

"""ACT phase implementation."""

import logging
from typing import TYPE_CHECKING

from atloop.config.limits import (
    ERROR_SUMMARY_LIMIT_FILE_VIEW,
    ERROR_SUMMARY_LIMIT_NORMAL,
    STDERR_TAIL_LIMIT,
    STDOUT_STDERR_LIMIT_FILE_VIEW,
    STDOUT_STDERR_LIMIT_NORMAL,
    STDOUT_STDERR_LIMIT_OTHER,
    is_file_view_command,
)
from atloop.llm import ActionJSON
from atloop.orchestrator.executor.tool_executor import ToolExecutor
from atloop.orchestrator.phases.base import BasePhase, PhaseContext, PhaseResult
from atloop.orchestrator.phases.stop_reason_handler import StopReasonHandler
from atloop.orchestrator.state_machine import Phase

if TYPE_CHECKING:
    from atloop.orchestrator.coordinator import WorkflowCoordinator

logger = logging.getLogger(__name__)


[docs] class ActPhase(BasePhase): """ACT phase: Execute tool calls."""
[docs] def __init__(self, coordinator: "WorkflowCoordinator"): """Initialize ACT phase.""" super().__init__(coordinator) self.executor = ToolExecutor(coordinator) logger.debug("[ActPhase] Initialized with ToolExecutor")
[docs] def execute(self, context: PhaseContext) -> PhaseResult: """ Execute ACT phase. Args: context: Phase execution context Returns: Phase execution result """ logger.info(f"[ActPhase] Entering ACT phase (Step {context.step})") state = self.coordinator.state_manager.agent_state try: # Get actions from job_state actions_dict = self.coordinator.job_state.shared_data.get("actions", {}) logger.debug( f"[ActPhase] ACT phase: actions_dict keys = " f"{list(actions_dict.keys()) if actions_dict else 'None'}" ) if not actions_dict or "actions" not in actions_dict: logger.warning("[ActPhase] No actions found, transitioning back to DISCOVER") self.coordinator.state_manager.update(phase="DISCOVER") self._transition(Phase.DISCOVER) return PhaseResult( success=True, data={}, next_phase=Phase.DISCOVER, ) try: action_json = ActionJSON.from_dict(actions_dict) logger.debug(f"[ActPhase] Parsed ActionJSON: {len(action_json.actions)} actions") except Exception as e: logger.error(f"[ActPhase] Invalid Action JSON: {e}") state.last_error.summary = f"Invalid Action JSON: {e}" self.coordinator.state_manager.update(phase="DISCOVER") self._transition(Phase.DISCOVER) return PhaseResult( success=False, data={}, next_phase=Phase.DISCOVER, error=f"Invalid Action JSON: {e}", ) # Execute actions logger.debug(f"[ActPhase] Executing {len(action_json.actions)} actions") results = [] modified_files = [] for i, action in enumerate(action_json.actions): tool = action.get("tool") args = action.get("args", {}) logger.debug( f"[ActPhase] Executing action {i + 1}/{len(action_json.actions)}: {tool}" ) # Execute tool via executor result = self.executor._execute_action(action) logger.debug( f"[ActPhase] Action {i + 1} completed: success={result.get('success', False)}" ) # Add tool name to result result["tool"] = tool if tool == "run": result["command"] = args.get("cmd", "") results.append(result) # Process result for LLM stderr = result.get("stderr", "") stdout = result.get("stdout", "") error_msg = result.get("error", "") # Build comprehensive result summary result_parts = [] result_parts.append(f"Tool: {tool}") result_parts.append( "⚠️ Important: Please carefully read the stdout and stderr content below to determine if the command succeeded." ) if tool == "run": cmd = args.get("cmd", "") if cmd: result_parts.append(f"Command: {cmd}") state.last_error.repro_cmd = cmd if error_msg: result_parts.append(f"Error: {error_msg}") # Include FULL stderr if stderr: if tool == "run": cmd = args.get("cmd", "") max_stderr = ( STDOUT_STDERR_LIMIT_FILE_VIEW if is_file_view_command(cmd) else STDOUT_STDERR_LIMIT_NORMAL ) else: max_stderr = STDOUT_STDERR_LIMIT_OTHER if len(stderr) > max_stderr: omitted = len(stderr) - max_stderr stderr_preview = ( stderr[: max_stderr // 2] + f"\n... [omitted {omitted} chars in middle] ...\n" + stderr[-max_stderr // 2 :] ) else: stderr_preview = stderr result_parts.append(f"Stderr ({len(stderr)} chars):\n{stderr_preview}") # Include FULL stdout if stdout: if tool == "run": cmd = args.get("cmd", "") max_stdout = ( STDOUT_STDERR_LIMIT_FILE_VIEW if is_file_view_command(cmd) else STDOUT_STDERR_LIMIT_NORMAL ) else: max_stdout = STDOUT_STDERR_LIMIT_OTHER if len(stdout) > max_stdout: omitted = len(stdout) - max_stdout stdout_preview = ( stdout[: max_stdout // 2] + f"\n... [omitted {omitted} chars in middle] ...\n" + stdout[-max_stdout // 2 :] ) else: stdout_preview = stdout result_parts.append(f"Stdout ({len(stdout)} chars):\n{stdout_preview}") # Update last_error result_summary = "\n".join(result_parts) if result_summary: if tool == "run": cmd = args.get("cmd", "") max_summary = ( ERROR_SUMMARY_LIMIT_FILE_VIEW if is_file_view_command(cmd) else ERROR_SUMMARY_LIMIT_NORMAL ) else: max_summary = ERROR_SUMMARY_LIMIT_NORMAL state.last_error.summary = result_summary[:max_summary] state.last_error.raw_stderr_tail = stderr[-STDERR_TAIL_LIMIT:] if stderr else "" logger.debug( f"[ActPhase] Updated last_error: summary_length={len(state.last_error.summary)}" ) # Track modified files if tool == "write_file": file_path = args.get("path", "") if file_path: modified_files.append(file_path) if file_path not in state.memory.created_files: state.memory.created_files.append(file_path) logger.info( f"[ActPhase] Tracking newly created file: {file_path} (total: {len(state.memory.created_files)})" ) # Update current_diff to show file creation file_content = args.get("content", "") if file_content: # Create a simple diff showing file was created diff_content = f"+++ {file_path}\n@@ -0,0 +1,{len(file_content.splitlines())} @@\n" for line in file_content.splitlines()[:50]: # First 50 lines diff_content += f"+{line}\n" if len(file_content.splitlines()) > 50: diff_content += ( f"... ({len(file_content.splitlines()) - 50} more lines)\n" ) state.artifacts.current_diff = diff_content[ :5000 ] # Limit diff size logger.debug( f"[ActPhase] Updated current_diff after file creation: {file_path}" ) self.coordinator.state_manager.save() # Update budget state.budget_used.tool_calls += 1 self.coordinator.budget_manager.budget_used.tool_calls += 1 logger.debug( f"[ActPhase] Budget updated: tool_calls={state.budget_used.tool_calls}" ) # Record attempt success = all(r.get("ok", False) for r in results) state.memory.attempts.append( { "step": state.step, "files": modified_files, "success": success, "results": results, } ) logger.debug( f"[ActPhase] Recorded attempt: success={success}, files={len(modified_files)}" ) # Auto-detect milestones if success and modified_files: if len(modified_files) >= 3: from atloop.memory.memory_manager import MemoryManager milestone_content = f"Successfully modified {len(modified_files)} files: {', '.join(modified_files[:3])}" if len(modified_files) > 3: milestone_content += " etc" MemoryManager.add_milestone(state, milestone_content) self.coordinator.state_manager.save() logger.debug(f"[ActPhase] Added milestone: {milestone_content}") # Check and apply pending stop_reason using unified handler pending_stop_reason = self.coordinator.job_state.shared_data.pop( "pending_stop_reason", None ) if pending_stop_reason: logger.info( f"[ActPhase] Applying pending stop_reason='{pending_stop_reason}' " f"after actions execution (Step {state.step})" ) return StopReasonHandler.apply_pending_stop_reason( pending_stop_reason=pending_stop_reason, step=state.step, verification_success=state.artifacts.verification_success, event_logger=self.coordinator.event_logger, state_manager=self.coordinator.state_manager, state_machine=self.coordinator.state_machine, ) # Transition to VERIFY logger.debug("[ActPhase] Transitioning to VERIFY phase") self._transition(Phase.VERIFY) self.coordinator.state_manager.update(phase="VERIFY") logger.info("[ActPhase] Successfully transitioned to VERIFY phase") return PhaseResult( success=True, data={"results": results}, next_phase=Phase.VERIFY, ) except Exception as e: logger.error(f"[ActPhase] ACT phase error: {e}") logger.debug(f"[ActPhase] Exception details: {type(e).__name__}: {e}", exc_info=True) state = self.coordinator.state_manager.agent_state state.last_error.summary = f"ACT phase error: {e}" self.coordinator.state_manager.update(phase="FAIL") self._transition(Phase.FAIL) return PhaseResult( success=False, data={}, next_phase=Phase.FAIL, error=str(e), )