Source code for atloop.orchestrator.coordinator
"""Workflow coordinator - manages all components."""
import logging
from typing import Optional
from atloop.config.loop_detection import DEFAULT_LOOP_DETECTION_CONFIG
from atloop.config.models import AtloopConfig, TaskSpec
from atloop.llm import LLMClient
from atloop.logging import EventLogger
from atloop.memory.progress_tracker import ProgressTracker
from atloop.orchestrator.budget import BudgetManager
from atloop.orchestrator.job_state import JobState
from atloop.orchestrator.loop_detector import LoopDetector
from atloop.orchestrator.state.manager import StateManager
from atloop.orchestrator.state_machine import Phase, StateMachine
from atloop.orchestrator.verifier import Verifier
from atloop.retrieval import (
ContextPackBuilder,
ProjectProfileDetector,
WorkspaceIndexer,
)
from atloop.runtime import SandboxAdapter
from atloop.tools.runtime import ToolRuntime
logger = logging.getLogger(__name__)
[docs]
class WorkflowCoordinator:
"""Workflow coordinator - single entry point for all components."""
[docs]
def __init__(
self,
task_spec: TaskSpec,
config: AtloopConfig,
agent_session_id: Optional[str] = None,
):
"""Initialize coordinator."""
logger.debug(f"[Coordinator] Initializing for task: {task_spec.task_id}")
self.task_spec = task_spec
self.config = config
# Sandbox session: use config default or task_id
sandbox_session_id = config.sandbox.default_session_id
if not sandbox_session_id:
sandbox_session_id = task_spec.task_id
logger.debug(f"[Coordinator] Using sandbox session: {sandbox_session_id}")
# Agent session: for resuming/continuing runs (optional)
self.agent_session_id = agent_session_id
if agent_session_id:
logger.debug(f"[Coordinator] Using agent session: {agent_session_id}")
# Infrastructure
logger.debug("[Coordinator] Creating sandbox adapter")
self.sandbox = SandboxAdapter(config.sandbox, sandbox_session_id)
logger.debug("[Coordinator] Creating LLM client")
self.llm_client = LLMClient(config, workspace_root=task_spec.workspace_root)
logger.debug("[Coordinator] Creating tool runtime")
self.tool_runtime = ToolRuntime(self.sandbox, skill_loader=self.llm_client.skill_loader)
# Retrieval
logger.debug("[Coordinator] Creating retrieval components")
self.indexer = WorkspaceIndexer(self.tool_runtime)
self.profile_detector = ProjectProfileDetector(self.tool_runtime)
self.context_builder: Optional[ContextPackBuilder] = None
self.verifier: Optional[Verifier] = None
# State
logger.debug("[Coordinator] Creating state manager")
from atloop.config.loader import ConfigLoader
atloop_dir = ConfigLoader.get_atloop_dir()
runs_dir = atloop_dir / "runs"
job_state = JobState(flow_id=f"atloop-{task_spec.task_id}")
state_file = runs_dir / task_spec.task_id / "agent_state.json"
self.state_manager = StateManager(state_file, job_state)
self.state_manager.load()
# State machine
logger.debug("[Coordinator] Creating state machine")
self.state_machine = StateMachine()
if self.state_manager.agent_state.step == 0:
logger.debug("[Coordinator] Initial step is 0, setting phase to DISCOVER")
self.state_manager.update(phase="DISCOVER")
self.state_machine.current_phase = Phase.DISCOVER
# Budget
logger.debug("[Coordinator] Creating budget manager")
self.budget_manager = BudgetManager(task_spec.budget)
# Logging
logger.debug("[Coordinator] Creating event logger")
self.event_logger = EventLogger(
task_id=task_spec.task_id,
)
# Loop detection and progress tracking
logger.debug("[Coordinator] Creating progress tracker and loop detector")
loop_detection_config = getattr(config, "loop_detection", None)
if loop_detection_config is None:
loop_detection_config = DEFAULT_LOOP_DETECTION_CONFIG
self.progress_tracker = ProgressTracker()
self.loop_detector = LoopDetector(loop_detection_config)
# Restore progress tracker from saved state if available
if self.state_manager.agent_state.memory.action_history:
self.progress_tracker = ProgressTracker.from_dict(
{
"action_history": self.state_manager.agent_state.memory.action_history,
"created_files": list(self.state_manager.agent_state.memory.created_files)
if self.state_manager.agent_state.memory.created_files
else [],
"modified_files": [],
}
)
logger.debug(
f"[Coordinator] Restored progress tracker with "
f"{len(self.progress_tracker.action_history)} actions"
)
logger.info(f"[Coordinator] Initialization complete for task: {task_spec.task_id}")
# Initialize long-term memory with task context
self._initialize_long_term_memory()
def _initialize_long_term_memory(self) -> None:
"""Initialize long-term memory with task context at startup."""
from atloop.memory.memory_manager import MemoryManager
state = self.state_manager.agent_state
# Only initialize if not already set (to support resume)
if not state.memory.task_summary:
# Create task summary from goal
summary_parts = [f"**Goal**: {self.task_spec.goal}"]
task_summary = "\n".join(summary_parts)
MemoryManager.update_task_summary(state, task_summary)
logger.info("[Coordinator] Initialized task_summary in long-term memory")
# Save state after initialization
self.state_manager.save()
[docs]
def initialize(self) -> bool:
"""Initialize workspace - single method."""
logger.debug("[Coordinator] Starting workspace initialization")
try:
logger.debug("[Coordinator] Bootstrapping indexer")
self.indexer.bootstrap()
logger.debug("[Coordinator] Detecting project profile")
profile = self.profile_detector.detect()
logger.debug(f"[Coordinator] Detected profile: {profile}")
logger.debug("[Coordinator] Creating context builder and verifier")
self.context_builder = ContextPackBuilder(self.indexer, profile)
self.verifier = Verifier(self.tool_runtime, profile)
logger.debug("[Coordinator] Resetting LLM history")
self.llm_client.reset_history()
logger.info("[Coordinator] Workspace initialization complete")
return True
except Exception as e:
logger.error(f"[Coordinator] Initialize failed: {e}")
logger.debug(f"[Coordinator] Exception details: {type(e).__name__}: {e}", exc_info=True)
return False
@property
def job_state(self) -> JobState:
"""Get job state."""
return self.state_manager._job_state