"""PLAN phase implementation."""
import logging
from atloop.memory.summarizer import MemorySummarizer
from atloop.orchestrator.phases.base import BasePhase, PhaseContext, PhaseResult
from atloop.orchestrator.phases.stop_reason_handler import StopReasonHandler
from atloop.orchestrator.state_machine import Phase
logger = logging.getLogger(__name__)
[docs]
class PlanPhase(BasePhase):
"""PLAN phase: Call LLM to get next actions."""
[docs]
def execute(self, context: PhaseContext) -> PhaseResult:
"""
Execute PLAN phase.
Args:
context: Phase execution context
Returns:
Phase execution result
"""
logger.debug(f"[PlanPhase] Executing PLAN phase at step {context.step}")
state = self.coordinator.state_manager.agent_state
try:
# Rebuild context pack with latest state
logger.debug("[PlanPhase] Building context pack with latest state")
memory_config = getattr(self.coordinator.config, "memory", None)
if memory_config:
memory_summary_max_length = getattr(
self.coordinator, "_memory_summary_max_length", memory_config.summary_max_length
)
logger.debug(
f"[PlanPhase] Using memory config: max_length={memory_summary_max_length}"
)
else:
memory_summary_max_length = getattr(
self.coordinator, "_memory_summary_max_length", 64000
)
logger.debug(
f"[PlanPhase] Using default memory summary max length: {memory_summary_max_length}"
)
memory_summary = MemorySummarizer.summarize(
state,
max_length=memory_summary_max_length,
task_goal=self.coordinator.task_spec.goal,
)
logger.debug(
f"[PlanPhase] Memory summary length: {len(memory_summary)} chars "
f"(max: {memory_summary_max_length})"
)
# Extract keywords
logger.debug("[PlanPhase] Extracting keywords")
keywords = self._extract_keywords()
logger.debug(f"[PlanPhase] Extracted {len(keywords)} keywords: {keywords[:5]}")
# Build context pack
logger.debug("[PlanPhase] Building context pack")
context_pack = self.coordinator.context_builder.build(
goal=self.coordinator.task_spec.goal,
constraints=self.coordinator.task_spec.constraints,
recent_error=state.last_error.summary,
current_diff=state.artifacts.current_diff,
test_results=state.artifacts.test_results,
verification_success=state.artifacts.verification_success,
memory_summary=memory_summary,
keywords=keywords,
)
logger.debug(
f"[PlanPhase] Context pack built: project_profile={context_pack.project_profile}"
)
# Build user message
logger.debug("[PlanPhase] Building user message")
user_message = self.coordinator.llm_client.build_user_message(
goal=self.coordinator.task_spec.goal,
constraints=self.coordinator.task_spec.constraints,
budget=self.coordinator.task_spec.budget.to_dict(),
state_summary=memory_summary,
project_profile=context_pack.project_profile,
relevant_files=context_pack.relevant_files,
recent_error=context_pack.recent_error,
current_diff=context_pack.current_diff,
test_results=context_pack.test_results,
verification_success=context_pack.verification_success,
)
logger.debug(f"[PlanPhase] User message built: length={len(user_message)} chars")
# Log LLM call
full_prompt_for_log = f"{self.coordinator.llm_client.system_prompt}\n\n{user_message}"
self.coordinator.event_logger.log_llm_call(
step=state.step,
prompt=full_prompt_for_log,
tokens_in=None,
model=self.coordinator.config.ai.completion.model,
)
logger.debug("[PlanPhase] LLM call logged")
# Call LLM
def stream_callback(delta: str):
pass
logger.debug("[PlanPhase] Calling LLM")
action_json, error, usage, full_output, file_contents = (
self.coordinator.llm_client.plan_and_act(
user_message,
stream_callback=stream_callback,
)
)
logger.debug(
f"[PlanPhase] LLM call completed: action_json={action_json is not None}, error={error}"
)
# Update budget
state.budget_used.llm_calls += 1
self.coordinator.budget_manager.budget_used.llm_calls += 1
logger.debug(f"[PlanPhase] Budget updated: llm_calls={state.budget_used.llm_calls}")
# Handle LLM error
if action_json is None:
logger.warning(f"[PlanPhase] LLM call failed: {error}")
# Check if it's a 400 Bad Request
if "400" in error and "Bad Request" in error:
logger.warning(
"[PlanPhase] 400 Bad Request detected, attempting to reduce memory summary size"
)
memory_config = getattr(self.coordinator.config, "memory", None)
if memory_config:
min_length = memory_config.summary_min_effective_length
default_max = memory_config.summary_max_length
else:
min_length = 16000
default_max = 64000
current_max = getattr(
self.coordinator, "_memory_summary_max_length", default_max
)
logger.warning(
f"[PlanPhase] 400 Bad Request detected. "
f"Current memory_summary_max_length: {current_max}. "
f"Reducing by 20% for next attempt."
)
self.coordinator._memory_summary_max_length = max(
min_length, int(current_max * 0.8)
)
logger.info(
f"[PlanPhase] New memory_summary_max_length: "
f"{self.coordinator._memory_summary_max_length}"
)
if self.coordinator._memory_summary_max_length <= 20000:
self.coordinator.event_logger.log_llm_result(
step=state.step,
actions=[],
stop_reason="error",
error=f"{error} (attempted to reduce prompt size but still failed)",
llm_output=full_output,
)
self.coordinator.state_manager.update(phase="FAIL")
self._transition(Phase.FAIL)
return PhaseResult(
success=False,
data={},
next_phase=Phase.FAIL,
error=f"LLM call failed: {error} (prompt may be too large)",
)
else:
logger.info(
"[PlanPhase] Continuing to next iteration with smaller memory summary"
)
return PhaseResult(
success=True,
data={},
next_phase=Phase.DISCOVER,
)
# For other errors, fail immediately
self.coordinator.event_logger.log_llm_result(
step=state.step,
actions=[],
stop_reason="error",
error=error,
llm_output=full_output,
)
self.coordinator.state_manager.update(phase="FAIL")
self._transition(Phase.FAIL)
return PhaseResult(
success=False,
data={},
next_phase=Phase.FAIL,
error=f"LLM call failed: {error}",
)
# Process actions
actions = action_json.actions
stop_reason = action_json.stop_reason
logger.debug(
f"[PlanPhase] LLM response: stop_reason={stop_reason}, actions={len(actions)}"
)
# Replace placeholders
logger.debug(
f"[PlanPhase] Preparing to replace placeholders, file_contents keys: "
f"{list(file_contents.keys())}"
)
if file_contents:
logger.info(
f"[PlanPhase] Received {len(file_contents)} file content placeholders: "
f"{list(file_contents.keys())}"
)
actions = self._replace_file_content_placeholders(actions, file_contents)
# Debug: Check for remaining placeholders
for action in actions:
if action.get("tool") in ["write_file", "append_file", "edit_file"]:
content = action.get("args", {}).get("content", "")
if content.startswith("FILE_CONTENT_#"):
logger.error(
f"[PlanPhase] Error: Action still has placeholder {content} that was not replaced!"
)
logger.error(
f"[PlanPhase] Available file_contents keys: "
f"{list(file_contents.keys())}"
)
# Log LLM result
self.coordinator.event_logger.log_llm_result(
step=state.step,
actions=[a.to_dict() if hasattr(a, "to_dict") else a for a in actions],
stop_reason=stop_reason,
tokens_out=usage.get("output_tokens") if usage else None,
llm_output=full_output,
)
# Store decision in memory
decision_record = {
"step": state.step,
"stop_reason": stop_reason,
"actions_count": len(actions),
"verification_success": state.artifacts.verification_success,
}
if action_json:
decision_record["thought_summary"] = action_json.thought_summary
decision_record["plan"] = action_json.plan
decision_record["actions"] = [
a.to_dict() if hasattr(a, "to_dict") else a for a in actions
]
if full_output:
decision_record["llm_output"] = full_output
state.memory.decisions.append(decision_record)
logger.info(
f"[PlanPhase] Stored decision to memory.decisions "
f"(Step {state.step}, stop_reason={stop_reason}, "
f"actions={len(actions)}, total decisions={len(state.memory.decisions)})"
)
# Store LLM response
if action_json and full_output:
llm_response_record = {
"step": state.step,
"thought_summary": action_json.thought_summary,
"plan": action_json.plan,
"actions": [a.to_dict() if hasattr(a, "to_dict") else a for a in actions],
"stop_reason": stop_reason,
"llm_output": full_output,
}
state.memory.llm_responses.append(llm_response_record)
logger.info(
f"[PlanPhase] Stored LLM response to memory.llm_responses "
f"(total responses={len(state.memory.llm_responses)})"
)
# Handle stop_reason using unified handler
next_phase, pending_stop_reason, phase_result = StopReasonHandler.process_stop_reason(
stop_reason=stop_reason,
actions=actions,
action_json=action_json,
verification_success=state.artifacts.verification_success,
step=state.step,
event_logger=self.coordinator.event_logger,
state_manager=self.coordinator.state_manager,
state_machine=self.coordinator.state_machine,
job_state=self.coordinator.job_state,
)
logger.debug(
f"[PlanPhase] Stop reason processed: stop_reason={stop_reason}, "
f"next_phase={next_phase}, pending_stop_reason={pending_stop_reason}"
)
return phase_result
except Exception as e:
import traceback
error_trace = traceback.format_exc()
logger.error(f"[PlanPhase] PLAN phase error: {e}")
logger.debug(
f"[PlanPhase] Exception details: {type(e).__name__}: {e}\n{error_trace}",
exc_info=True,
)
self.coordinator.state_manager.agent_state.last_error.summary = (
f"PLAN phase error: {e}\n{error_trace[:5000]}"
)
self.coordinator.state_manager.update(phase="FAIL")
self._transition(Phase.FAIL)
return PhaseResult(
success=False,
data={},
next_phase=Phase.FAIL,
error=str(e),
)
def _extract_keywords(self) -> list[str]:
"""Extract keywords from state."""
keywords = []
state = self.coordinator.state_manager.agent_state
if self.coordinator.task_spec.goal:
keywords.extend(
self.coordinator.indexer.extract_keywords(self.coordinator.task_spec.goal)
)
if state.last_error.summary:
keywords.extend(self.coordinator.indexer.extract_keywords(state.last_error.summary))
return keywords[:10]
def _replace_file_content_placeholders(
self, actions: list[dict], file_contents: dict[str, str]
) -> list[dict]:
"""Replace FILE_CONTENT_#N placeholders with actual content."""
modified_actions = []
for action in actions:
tool = action.get("tool")
args = action.get("args", {})
if tool == "write_file":
content = args.get("content", "")
if content in file_contents:
args = args.copy()
args["content"] = file_contents[content]
action = action.copy()
action["args"] = args
logger.info(
f"[PlanPhase] Replaced placeholder {content} with actual file content "
f"({len(file_contents[content])} chars)"
)
logger.debug(
f"[PlanPhase] File content preview: {file_contents[content][:200]}..."
)
elif content.startswith("FILE_CONTENT_#") and content not in file_contents:
logger.error(
f"[PlanPhase] Error: Placeholder {content} not found in file_contents!"
)
logger.error(
f"[PlanPhase] Available file_contents keys: {list(file_contents.keys())}"
)
elif tool == "append_file":
content = args.get("content", "")
if content in file_contents:
args = args.copy()
args["content"] = file_contents[content]
action = action.copy()
action["args"] = args
logger.info(
f"[PlanPhase] Replaced placeholder {content} with append_file actual content "
f"({len(file_contents[content])} chars)"
)
elif content.startswith("FILE_CONTENT_#") and content not in file_contents:
logger.warning(f"[PlanPhase] Placeholder {content} not found in file_contents")
elif tool == "edit_file":
content = args.get("content", "")
if content in file_contents:
args = args.copy()
args["content"] = file_contents[content]
action = action.copy()
action["args"] = args
logger.info(
f"[PlanPhase] Replaced placeholder {content} with edit_file actual content "
f"({len(file_contents[content])} chars)"
)
elif content.startswith("FILE_CONTENT_#") and content not in file_contents:
logger.warning(f"[PlanPhase] Placeholder {content} not found in file_contents")
modified_actions.append(action)
return modified_actions