Source code for atloop.orchestrator.coordinator

"""Workflow coordinator - manages all components."""

import logging
from typing import Optional

from atloop.config.loop_detection import DEFAULT_LOOP_DETECTION_CONFIG
from atloop.config.models import AtloopConfig, TaskSpec
from atloop.llm import LLMClient
from atloop.logging import EventLogger
from atloop.memory.progress_tracker import ProgressTracker
from atloop.orchestrator.budget import BudgetManager
from atloop.orchestrator.job_state import JobState
from atloop.orchestrator.loop_detector import LoopDetector
from atloop.orchestrator.state.manager import StateManager
from atloop.orchestrator.state_machine import Phase, StateMachine
from atloop.orchestrator.verifier import Verifier
from atloop.retrieval import (
    ContextPackBuilder,
    ProjectProfileDetector,
    WorkspaceIndexer,
)
from atloop.runtime import SandboxAdapter
from atloop.tools.runtime import ToolRuntime

logger = logging.getLogger(__name__)


[docs] class WorkflowCoordinator: """Workflow coordinator - single entry point for all components."""
[docs] def __init__( self, task_spec: TaskSpec, config: AtloopConfig, agent_session_id: Optional[str] = None, ): """Initialize coordinator.""" logger.debug(f"[Coordinator] Initializing for task: {task_spec.task_id}") self.task_spec = task_spec self.config = config # Sandbox session: use config default or task_id sandbox_session_id = config.sandbox.default_session_id if not sandbox_session_id: sandbox_session_id = task_spec.task_id logger.debug(f"[Coordinator] Using sandbox session: {sandbox_session_id}") # Agent session: for resuming/continuing runs (optional) self.agent_session_id = agent_session_id if agent_session_id: logger.debug(f"[Coordinator] Using agent session: {agent_session_id}") # Infrastructure logger.debug("[Coordinator] Creating sandbox adapter") self.sandbox = SandboxAdapter(config.sandbox, sandbox_session_id) logger.debug("[Coordinator] Creating LLM client") self.llm_client = LLMClient(config, workspace_root=task_spec.workspace_root) logger.debug("[Coordinator] Creating tool runtime") self.tool_runtime = ToolRuntime(self.sandbox, skill_loader=self.llm_client.skill_loader) # Retrieval logger.debug("[Coordinator] Creating retrieval components") self.indexer = WorkspaceIndexer(self.tool_runtime) self.profile_detector = ProjectProfileDetector(self.tool_runtime) self.context_builder: Optional[ContextPackBuilder] = None self.verifier: Optional[Verifier] = None # State logger.debug("[Coordinator] Creating state manager") from atloop.config.loader import ConfigLoader atloop_dir = ConfigLoader.get_atloop_dir() runs_dir = atloop_dir / "runs" job_state = JobState(flow_id=f"atloop-{task_spec.task_id}") state_file = runs_dir / task_spec.task_id / "agent_state.json" self.state_manager = StateManager(state_file, job_state) self.state_manager.load() # State machine logger.debug("[Coordinator] Creating state machine") self.state_machine = StateMachine() if self.state_manager.agent_state.step == 0: logger.debug("[Coordinator] Initial step is 0, setting phase to DISCOVER") self.state_manager.update(phase="DISCOVER") self.state_machine.current_phase = Phase.DISCOVER # Budget logger.debug("[Coordinator] Creating budget manager") self.budget_manager = BudgetManager(task_spec.budget) # Logging logger.debug("[Coordinator] Creating event logger") self.event_logger = EventLogger( task_id=task_spec.task_id, ) # Loop detection and progress tracking logger.debug("[Coordinator] Creating progress tracker and loop detector") loop_detection_config = getattr(config, "loop_detection", None) if loop_detection_config is None: loop_detection_config = DEFAULT_LOOP_DETECTION_CONFIG self.progress_tracker = ProgressTracker() self.loop_detector = LoopDetector(loop_detection_config) # Restore progress tracker from saved state if available if self.state_manager.agent_state.memory.action_history: self.progress_tracker = ProgressTracker.from_dict( { "action_history": self.state_manager.agent_state.memory.action_history, "created_files": list(self.state_manager.agent_state.memory.created_files) if self.state_manager.agent_state.memory.created_files else [], "modified_files": [], } ) logger.debug( f"[Coordinator] Restored progress tracker with " f"{len(self.progress_tracker.action_history)} actions" ) logger.info(f"[Coordinator] Initialization complete for task: {task_spec.task_id}") # Initialize long-term memory with task context self._initialize_long_term_memory()
def _initialize_long_term_memory(self) -> None: """Initialize long-term memory with task context at startup.""" from atloop.memory.memory_manager import MemoryManager state = self.state_manager.agent_state # Only initialize if not already set (to support resume) if not state.memory.task_summary: # Create task summary from goal summary_parts = [f"**Goal**: {self.task_spec.goal}"] task_summary = "\n".join(summary_parts) MemoryManager.update_task_summary(state, task_summary) logger.info("[Coordinator] Initialized task_summary in long-term memory") # Save state after initialization self.state_manager.save()
[docs] def initialize(self) -> bool: """Initialize workspace - single method.""" logger.debug("[Coordinator] Starting workspace initialization") try: logger.debug("[Coordinator] Bootstrapping indexer") self.indexer.bootstrap() logger.debug("[Coordinator] Detecting project profile") profile = self.profile_detector.detect() logger.debug(f"[Coordinator] Detected profile: {profile}") logger.debug("[Coordinator] Creating context builder and verifier") self.context_builder = ContextPackBuilder(self.indexer, profile) self.verifier = Verifier(self.tool_runtime, profile) logger.debug("[Coordinator] Resetting LLM history") self.llm_client.reset_history() logger.info("[Coordinator] Workspace initialization complete") return True except Exception as e: logger.error(f"[Coordinator] Initialize failed: {e}") logger.debug(f"[Coordinator] Exception details: {type(e).__name__}: {e}", exc_info=True) return False
@property def job_state(self) -> JobState: """Get job state.""" return self.state_manager._job_state