Source code for atloop.orchestrator.agent_loop
"""Agent loop - thin wrapper."""
import logging
from typing import Any, Dict, Optional
from atloop.config.models import AtloopConfig, TaskSpec
from atloop.orchestrator.coordinator import WorkflowCoordinator
from atloop.orchestrator.workflow import Workflow
logger = logging.getLogger(__name__)
[docs]
class AgentLoop:
"""Agent loop - single responsibility: coordinate workflow execution."""
[docs]
def __init__(
self,
task_spec: TaskSpec,
config: AtloopConfig,
agent_session_id: Optional[str] = None,
):
"""Initialize agent loop."""
logger.debug(f"[AgentLoop] Initializing for task: {task_spec.task_id}")
self.coordinator = WorkflowCoordinator(task_spec, config, agent_session_id=agent_session_id)
self.workflow = Workflow(self.coordinator)
logger.debug("[AgentLoop] Initialization complete")
[docs]
def run(self) -> Dict[str, Any]:
"""Run agent - single method."""
logger.info(f"[AgentLoop] Starting task: {self.coordinator.task_spec.task_id}")
try:
result = self.workflow.run()
status = result.get("status")
logger.info(f"[AgentLoop] Task completed: status={status}, step={result.get('step')}")
logger.debug(f"[AgentLoop] Result details: {result}")
return result
except Exception as e:
logger.error(f"[AgentLoop] Task failed: {e}")
logger.debug(f"[AgentLoop] Exception details: {type(e).__name__}: {e}", exc_info=True)
return {
"status": "failure",
"task_id": self.coordinator.task_spec.task_id,
"reason": f"Execution error: {e}",
}