"""ACT phase implementation."""
import logging
from typing import TYPE_CHECKING, Any, Dict, List
from atloop.llm import ActionJSON, ActionJSONValidationError
from atloop.orchestrator.executor.tool_executor import ToolExecutor
from atloop.orchestrator.phases.act_result_processor import (
ErrorStateManager,
FileChangeTracker,
ToolResultFormatter,
)
from atloop.orchestrator.phases.base import BasePhase, PhaseContext, PhaseResult
from atloop.orchestrator.phases.placeholder_replacer import PlaceholderReplacer
from atloop.orchestrator.phases.stop_reason_handler import StopReasonHandler
from atloop.orchestrator.state_machine import Phase
if TYPE_CHECKING:
from atloop.orchestrator.coordinator import WorkflowCoordinator
logger = logging.getLogger(__name__)
[docs]
class ActPhase(BasePhase):
"""ACT phase: Execute tool calls."""
[docs]
def __init__(self, coordinator: "WorkflowCoordinator"):
"""Initialize ACT phase."""
super().__init__(coordinator)
self.executor = ToolExecutor(coordinator)
logger.debug("[ActPhase] Initialized with ToolExecutor")
[docs]
def execute(self, context: PhaseContext) -> PhaseResult:
"""
Execute ACT phase.
Args:
context: Phase execution context
Returns:
Phase execution result
"""
logger.info(f"[ActPhase] Entering ACT phase (Step {context.step})")
state = self.coordinator.state_manager.agent_state
try:
# Get actions from job_state
# Design principle: Validate at the boundary
# ActionJSON.from_dict() will validate the data structure
actions_dict = self.coordinator.job_state.shared_data.get("actions")
if not actions_dict:
logger.warning("[ActPhase] No actions found, transitioning back to DISCOVER")
self.coordinator.state_manager.update(phase="DISCOVER")
self._transition(Phase.DISCOVER)
return PhaseResult(
success=True,
data={},
next_phase=Phase.DISCOVER,
)
# Parse and validate ActionJSON
# ActionJSON.from_dict() will raise ActionJSONValidationError if invalid
# Pass tool_registry for dynamic validation
try:
action_json = ActionJSON.from_dict(
actions_dict,
validate=True,
tool_registry=self.coordinator.tool_runtime.registry,
)
logger.debug(
f"[ActPhase] Parsed and validated ActionJSON: {len(action_json.actions)} actions"
)
except ActionJSONValidationError as e:
# Clear validation error with detailed message
logger.error(f"[ActPhase] Action JSON validation failed: {e.message}")
state.last_error.summary = f"Invalid Action JSON: {e.message}"
self.coordinator.state_manager.update(phase="DISCOVER")
self._transition(Phase.DISCOVER)
return PhaseResult(
success=False,
data={},
next_phase=Phase.DISCOVER,
error=f"Invalid Action JSON: {e.message}",
)
except TypeError as e:
# Type error (e.g., not a dict)
logger.error(f"[ActPhase] Invalid Action JSON type: {e}")
state.last_error.summary = f"Invalid Action JSON type: {e}"
self.coordinator.state_manager.update(phase="DISCOVER")
self._transition(Phase.DISCOVER)
return PhaseResult(
success=False,
data={},
next_phase=Phase.DISCOVER,
error=f"Invalid Action JSON type: {e}",
)
# Execute actions
# Note: Loop intervention is now handled entirely in PlanPhase by LoopInterventionExecutor
# ActPhase simply executes whatever actions it receives
logger.debug(f"[ActPhase] Executing {len(action_json.actions)} actions")
results, modified_files = self._execute_actions(action_json.actions, state)
# Update memory and detect milestones
success = all(r.get("ok", False) for r in results)
self._update_memory_after_execution(state, results, modified_files, success)
# If actions failed, let Workflow handle error recovery
# We just return the results - Workflow will check for errors and handle recovery
if not success:
# Check if any result has an error that should trigger recovery
has_errors = any(r.get("error") or r.get("stderr") for r in results)
if has_errors:
# Return with error info - Workflow will classify and handle recovery
# Note: Detailed error information has already been set in state.last_error.summary
# during tool execution (see lines 217-221). PhaseResult.error is only for
# logging and error classification, not for updating state.
error_messages = [
r.get("error") or r.get("stderr", "")
for r in results
if r.get("error") or r.get("stderr")
]
combined_error = "\n".join(error_messages[:3]) # Limit to first 3 errors
logger.warning(
"[ActPhase] Actions completed with errors. "
"Detailed error info already set in state.last_error.summary. "
"Workflow will handle error recovery."
)
return PhaseResult(
success=False,
data={"results": results},
next_phase=None, # Let Workflow decide recovery phase
error=combined_error, # For logging/classification only
recoverable=True, # Mark as recoverable - Workflow will verify
error_already_set_in_state=True, # Phase has set detailed error in state
)
# Check and apply pending stop_reason using unified handler
pending_stop_reason = self.coordinator.job_state.shared_data.pop(
"pending_stop_reason", None
)
if pending_stop_reason:
logger.info(
f"[ActPhase] Applying pending stop_reason='{pending_stop_reason}' "
f"after actions execution (Step {state.step})"
)
return StopReasonHandler.apply_pending_stop_reason(
pending_stop_reason=pending_stop_reason,
step=state.step,
verification_success=state.artifacts.verification_success,
event_logger=self.coordinator.event_logger,
state_manager=self.coordinator.state_manager,
state_machine=self.coordinator.state_machine,
)
# Transition to VERIFY
logger.debug("[ActPhase] Transitioning to VERIFY phase")
self._transition(Phase.VERIFY)
self.coordinator.state_manager.update(phase="VERIFY")
logger.info("[ActPhase] Successfully transitioned to VERIFY phase")
return PhaseResult(
success=True,
data={"results": results},
next_phase=Phase.VERIFY,
)
except Exception as e:
# Let Workflow handle the exception with unified error handling
# Just re-raise - Workflow will catch, classify, and handle recovery
logger.error(f"[ActPhase] ACT phase exception: {e}")
raise # Re-raise for Workflow to handle
def _execute_actions(
self, actions: List[Dict[str, Any]], state: Any
) -> tuple[List[Dict[str, Any]], List[str]]:
"""
Execute all actions and process their results.
Design principle: Trust validated data
- Actions have been validated by ActionJSON.from_dict()
- We can trust the data structure and focus on execution logic
- No defensive type checks needed here
Args:
actions: List of actions to execute (guaranteed to be valid by ActionJSON validation)
state: Agent state
Returns:
Tuple of (results, modified_files)
"""
# Sort actions to ensure correct execution order:
# write_file -> append_file -> edit_file -> other operations
# This is a defensive measure in case LLM doesn't follow ordering instructions
sorted_actions = self._sort_actions(actions)
if sorted_actions != actions:
logger.info(
f"[ActPhase] Actions were reordered for correct execution sequence. "
f"Original order: {[a.get('tool') for a in actions]}, "
f"Sorted order: {[a.get('tool') for a in sorted_actions]}"
)
results = []
modified_files = []
import time
# Get placeholder info from job_state (saved by PlanPhase)
placeholder_info_dict = self.coordinator.job_state.shared_data.get("placeholder_info", {})
# Store placeholder info for memory recording (matched by action index)
placeholder_info = []
for i, action in enumerate(sorted_actions):
logger.debug(
f"[ActPhase] Executing action {i + 1}/{len(actions)}: {action.get('tool')}"
)
# Get placeholder info from job_state (saved by PlanPhase)
# Use original action index before sorting
original_index = actions.index(action) if action in actions else i
placeholder_data = placeholder_info_dict.get(original_index, {})
tool = action.get("tool", "unknown")
args = action.get("args", {})
placeholder_name = placeholder_data.get("placeholder")
# If no placeholder was used, record the actual args
placeholder_info.append(
{
"tool": tool,
"placeholder": placeholder_name,
"args": placeholder_data.get("args") if not placeholder_name else None,
}
)
# Validate and execute action
self._validate_action(action, i + 1)
result = self._execute_single_action(action)
results.append(result)
# Process result: format, update error state, track files
self._process_action_result(action, result, state, modified_files)
# Record action to progress tracker
self.coordinator.progress_tracker.record_action(
step=state.step,
tool=tool,
args=args,
result=result,
timestamp=time.time(),
)
# Update budget
state.budget_used.tool_calls += 1
self.coordinator.budget_manager.budget_used.tool_calls += 1
logger.debug(f"[ActPhase] Budget updated: tool_calls={state.budget_used.tool_calls}")
# Store placeholder info for memory recording (will be used in _update_memory_after_execution)
state._act_phase_placeholder_info = placeholder_info
# Clean up placeholder info from job_state after use
if "placeholder_info" in self.coordinator.job_state.shared_data:
del self.coordinator.job_state.shared_data["placeholder_info"]
# Save action history to memory for persistence
state.memory.action_history = [
a.to_dict() for a in self.coordinator.progress_tracker.action_history
]
return results, modified_files
def _sort_actions(self, actions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Sort actions to ensure correct execution order.
Required order:
1. write_file (create new files)
2. append_file (append to existing files)
3. edit_file (modify existing files)
4. All other operations (read_file, run, load_skill, etc.)
Args:
actions: List of actions to sort
Returns:
Sorted list of actions
"""
# Define priority order (lower number = higher priority, executed first)
tool_priority = {
"write_file": 1,
"append_file": 2,
"edit_file": 3,
# All other tools have priority 4 (executed last)
}
def get_priority(action: Dict[str, Any]) -> int:
tool = action.get("tool", "")
return tool_priority.get(tool, 4)
# Sort by priority, maintaining relative order within same priority
# Use stable sort to preserve order of actions with same tool type
sorted_actions = sorted(actions, key=get_priority)
return sorted_actions
def _cache_skill_metadata(
self, state: Any, args: Dict[str, Any], result: Dict[str, Any], tool_name: str
) -> None:
"""
Cache skill metadata in memory.skill_cache.
Args:
state: Agent state
args: Tool arguments
result: Tool execution result
tool_name: Tool name
"""
if not result.get("ok"):
return
skill_name = args.get("name")
if not skill_name:
return
meta = result.get("meta", {})
resources = meta.get("resources", {})
# Initialize skill cache entry if not exists
if skill_name not in state.memory.skill_cache:
state.memory.skill_cache[skill_name] = {
"metadata": {},
"resources": {
"scripts": {},
"references": {},
"assets": {},
},
}
# Extract metadata from stdout (which contains the formatted skill content)
# The stdout contains: # Skill: name, Description, Main Content (body), Available Resources
stdout = result.get("stdout", "")
# Parse description and body from stdout
# Format: # Skill: name\n**Source**: ...\n\n## Description\n...\n\n## Main Content\n...\n\n
description = ""
body = ""
if "## Description" in stdout:
desc_start = stdout.find("## Description") + len("## Description")
if "## Main Content" in stdout:
desc_end = stdout.find("## Main Content")
description = stdout[desc_start:desc_end].strip()
else:
description = stdout[desc_start:].strip()
if "## Main Content" in stdout:
body_start = stdout.find("## Main Content") + len("## Main Content")
if "## Available Resources" in stdout:
body_end = stdout.find("## Available Resources")
body = stdout[body_start:body_end].strip()
else:
body = stdout[body_start:].strip()
# Update metadata
state.memory.skill_cache[skill_name]["metadata"] = {
"name": skill_name,
"description": description or meta.get("description", ""),
"body": body or stdout, # Fallback to full stdout if parsing fails
"loaded_at_step": state.step,
}
# Store resource list for reference
state.memory.skill_cache[skill_name]["_resource_list"] = resources
logger.debug(f"[ActPhase] Cached skill metadata: {skill_name} (Step {state.step})")
def _cache_skill_resource(
self, state: Any, args: Dict[str, Any], result: Dict[str, Any], tool_name: str
) -> None:
"""
Cache skill resource content in memory.skill_cache.
Args:
state: Agent state
args: Tool arguments
result: Tool execution result
tool_name: Tool name
"""
if not result.get("ok"):
return
skill_name = args.get("skill_name")
resource_type = args.get("resource_type")
resource_name = args.get("resource_name")
if not all([skill_name, resource_type, resource_name]):
return
# Get content from meta (stored by tool)
meta = result.get("meta", {})
content = meta.get("_content")
if not content:
logger.warning(
f"[ActPhase] No content found in result meta for {skill_name}/{resource_type}/{resource_name}"
)
return
# Initialize skill cache entry if not exists
if skill_name not in state.memory.skill_cache:
state.memory.skill_cache[skill_name] = {
"metadata": {},
"resources": {
"scripts": {},
"references": {},
"assets": {},
},
}
# Cache resource content
if resource_type not in state.memory.skill_cache[skill_name]["resources"]:
state.memory.skill_cache[skill_name]["resources"][resource_type] = {}
state.memory.skill_cache[skill_name]["resources"][resource_type][resource_name] = {
"content": content,
"loaded_at_step": state.step,
}
logger.debug(
f"[ActPhase] Cached skill resource: {skill_name}/{resource_type}/{resource_name} "
f"({len(content)} chars, Step {state.step})"
)
def _validate_action(self, action: Dict[str, Any], action_index: int) -> None:
"""
Validate action before execution.
Args:
action: Action dictionary
action_index: Index of action (for logging)
"""
tool = action.get("tool")
args = action.get("args", {})
# Check for unreplaced placeholders in all tools that use placeholders
field_name, value = PlaceholderReplacer.get_placeholder_field_value(tool, args)
if field_name and PlaceholderReplacer._is_valid_placeholder(value):
logger.error(
f"[ActPhase] ❌ CRITICAL: {tool} action {action_index} has unreplaced placeholder "
f"{value} in field '{field_name}'! This indicates placeholder replacement failed in PlanPhase."
)
def _execute_single_action(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a single action and return result.
Args:
action: Action dictionary
Returns:
Tool execution result
"""
tool = action.get("tool")
args = action.get("args", {})
try:
result = self.executor._execute_action(action)
logger.debug(f"[ActPhase] Action completed: success={result.get('success', False)}")
return result
except Exception as e:
# Convert exception to error result format
# This allows the workflow to continue processing other actions
logger.warning(
f"[ActPhase] Action raised exception: {e}. "
f"Converting to error result for LLM to handle."
)
error_result = {
"success": False,
"ok": False,
"tool": tool,
"stdout": "",
"stderr": f"Execution error: {str(e)}",
"error": str(e),
"exit_code": 1,
}
if tool == "run":
error_result["command"] = args.get("cmd", "")
return error_result
def _process_action_result(
self,
action: Dict[str, Any],
result: Dict[str, Any],
state: Any,
modified_files: List[str],
) -> None:
"""
Process a single action result: format, update error state, track files.
Args:
action: Original action dictionary
result: Tool execution result
state: Agent state
modified_files: List to append modified files to
"""
tool_name = action.get("tool")
args = action.get("args", {})
# Get tool instance from registry
tool = self.coordinator.tool_runtime.registry.get(tool_name)
if not tool:
# Fallback: use tool name if tool instance not found
# This should not happen in normal operation, but provides defensive handling
logger.warning(
f"[ActPhase] Tool instance not found for '{tool_name}', creating fallback tool"
)
# Create a minimal tool-like object that implements BaseTool interface
from atloop.tools.base import BaseTool
from atloop.tools.output_semantic_type import OutputSemanticType
class FallbackTool(BaseTool):
def __init__(self, name: str):
self._name = name
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str:
return f"Fallback tool for {self._name}"
def execute(self, args):
raise NotImplementedError("Fallback tool cannot execute")
@property
def output_semantic_type(self):
return OutputSemanticType.STATUS_MESSAGE
tool = FallbackTool(tool_name)
# Format result summary for LLM
result_summary = ToolResultFormatter.format_result_summary(tool, args, result)
# Update error state if there's an error
ErrorStateManager.update_error_state(state, tool_name, args, result, result_summary)
# Handle skill caching
if tool_name == "load_skill":
self._cache_skill_metadata(state, args, result, tool_name)
elif tool_name == "load_skill_resource":
self._cache_skill_resource(state, args, result, tool_name)
# Track file changes - populate modified_files_content for LLM context
if tool_name == "write_file":
file_path = args.get("path", "")
file_content = args.get("content", "")
FileChangeTracker.track_file_creation(
state, self.coordinator, file_path, file_content, modified_files
)
elif tool_name in ("edit_file", "append_file"):
file_path = args.get("path", "")
file_content = args.get("content", "")
# For edit_file and append_file, track as modification
# Note: The content here is the edit/append content, not the full file
# We store it so LLM knows what was changed
FileChangeTracker.track_file_modification(
state, self.coordinator, file_path, file_content, modified_files
)
def _update_memory_after_execution(
self,
state: Any,
results: List[Dict[str, Any]],
modified_files: List[str],
success: bool,
) -> None:
"""
Update memory with execution results and detect milestones.
Args:
state: Agent state
results: Tool execution results
modified_files: List of modified files
success: Whether all actions succeeded
"""
# Record attempt (without results field - results are in tool_results_history)
state.memory.attempts.append(
{
"step": state.step,
"files": modified_files,
"success": success,
# NOTE: results field removed - tool execution results are stored in tool_results_history
}
)
logger.debug(f"[ActPhase] Recorded attempt: success={success}, files={len(modified_files)}")
# Record tool results to tool_results_history with placeholder info
placeholder_info = getattr(state, "_act_phase_placeholder_info", [])
for i, (result, placeholder_data) in enumerate(zip(results, placeholder_info)):
tool = placeholder_data["tool"]
placeholder = placeholder_data["placeholder"]
args = placeholder_data["args"]
# Record to tool_results_history (with modified_files field)
tool_result_record = {
"step": state.step,
"tool": tool,
"args": args if args is not None else {}, # Use actual args if no placeholder
"placeholder": placeholder, # Placeholder name if exists, None otherwise
"result": result,
"modified_files": modified_files
if tool in ["write_file", "edit_file", "append_file"]
else [],
}
state.memory.tool_results_history.append(tool_result_record)
# Clean up temporary placeholder info
if hasattr(state, "_act_phase_placeholder_info"):
delattr(state, "_act_phase_placeholder_info")
logger.debug(f"[ActPhase] Recorded {len(results)} tool results to tool_results_history")
# Auto-detect milestones
if success and modified_files and len(modified_files) >= 3:
from atloop.memory.memory_manager import MemoryManager
milestone_content = (
f"Successfully modified {len(modified_files)} files: "
f"{', '.join(modified_files[:3])}"
)
if len(modified_files) > 3:
milestone_content += " etc"
MemoryManager.add_milestone(state, milestone_content)
self.coordinator.state_manager.save()
logger.debug(f"[ActPhase] Added milestone: {milestone_content}")