Source code for atloop.api.runner

"""Task runner API - single execution method (uses varlord via ConfigLoader)."""

import logging
from typing import Any, Dict, Optional

from atloop.config.loader import ConfigLoader  # Uses varlord internally
from atloop.config.models import Budget, TaskSpec
from atloop.orchestrator import AgentLoop

logger = logging.getLogger(__name__)


[docs] def load_task_spec( goal: str, workspace_root: str, budget: Optional[Dict[str, int]] = None, ) -> TaskSpec: """ Load task specification. Args: goal: Task goal workspace_root: Workspace root directory budget: Budget dictionary Returns: TaskSpec instance """ import uuid from pathlib import Path task_id = str(uuid.uuid4()) # Get default budget from config config = ConfigLoader.get() default_budget = config.runtime.default_budget # Override with provided budget if budget: budget_obj = Budget( max_llm_calls=budget.get("max_llm_calls", default_budget.max_llm_calls), max_tool_calls=budget.get("max_tool_calls", default_budget.max_tool_calls), max_wall_time_sec=budget.get("max_wall_time_sec", default_budget.max_wall_time_sec), ) else: budget_obj = default_budget return TaskSpec( task_id=task_id, goal=goal, workspace_root=str(Path(workspace_root).resolve()), budget=budget_obj, )
[docs] class TaskRunner: """Task runner - single responsibility: execute tasks (uses varlord via ConfigLoader)."""
[docs] def __init__(self, atloop_dir: Optional[str] = None): """Initialize runner.""" logger.debug(f"[TaskRunner] Initializing with atloop_dir: {atloop_dir}") self.atloop_dir = atloop_dir # Setup config once ConfigLoader.setup(atloop_dir=atloop_dir) logger.debug("[TaskRunner] Config setup complete")
[docs] def execute( self, goal: str, workspace_root: Optional[str] = None, upload_workspace: Optional[bool] = None, budget: Optional[Dict[str, int]] = None, ) -> Dict[str, Any]: """ Execute task - single method. Args: goal: Task goal/prompt workspace_root: Workspace root directory (overrides config default) upload_workspace: Whether to upload workspace files to sandbox (overrides config default) budget: Optional budget override Returns: Execution result """ logger.debug( f"[TaskRunner] Execute called with goal (length: {len(goal)}), " f"workspace_root: {workspace_root}, upload_workspace: {upload_workspace}" ) try: # Get config (uses varlord global config) config = ConfigLoader.get() logger.debug(f"[TaskRunner] Config loaded: ai={config.ai.completion.model}") # Use workspace_root from parameter or config (defaults to current directory) if workspace_root is None: workspace_root = config.runtime.workspace_root if workspace_root is None: from pathlib import Path workspace_root = str(Path.cwd()) logger.debug( f"[TaskRunner] Using current directory as workspace: {workspace_root}" ) else: from pathlib import Path workspace_root = str(Path(workspace_root).resolve()) logger.debug(f"[TaskRunner] Using workspace_root from config: {workspace_root}") else: from pathlib import Path workspace_root = str(Path(workspace_root).resolve()) logger.debug(f"[TaskRunner] Using workspace_root from parameter: {workspace_root}") # Use upload_workspace from parameter or config if upload_workspace is None: upload_workspace = config.runtime.upload_workspace logger.debug(f"[TaskRunner] Using upload_workspace from config: {upload_workspace}") else: logger.debug( f"[TaskRunner] Using upload_workspace from parameter: {upload_workspace}" ) # Create task spec logger.debug("[TaskRunner] Creating task spec") task_spec = load_task_spec( goal=goal, workspace_root=workspace_root, budget=budget, ) logger.debug(f"[TaskRunner] Task spec created: task_id={task_spec.task_id}") # Sandbox session: from config (will fallback to task_id in Coordinator) # Agent session: from config agent_session_id = config.runtime.default_agent_session_id # Create agent loop (creates coordinator and sandbox adapter) logger.info("[TaskRunner] Creating agent loop") loop = AgentLoop(task_spec, config, agent_session_id=agent_session_id) # Upload workspace files to sandbox before execution (only if requested) if upload_workspace: try: logger.info("[TaskRunner] Uploading workspace files to sandbox") sandbox_adapter = loop.coordinator.sandbox if not sandbox_adapter.upload_workspace(task_spec.workspace_root): logger.error("[TaskRunner] Failed to upload workspace to sandbox") return {"success": False, "error": "Failed to upload workspace to sandbox"} logger.info("[TaskRunner] Workspace files uploaded to sandbox successfully") except Exception as e: logger.error(f"[TaskRunner] Error uploading workspace: {e}") logger.debug( f"[TaskRunner] Exception details: {type(e).__name__}: {e}", exc_info=True ) return {"success": False, "error": f"Failed to upload workspace: {e}"} else: logger.info( "[TaskRunner] Skipping workspace upload (configure via runtime.upload_workspace)" ) # Execute logger.info("[TaskRunner] Starting agent loop") report = loop.run() logger.info(f"[TaskRunner] Agent loop completed: status={report.get('status')}") # Sync files back from sandbox to local workspace # Required for both local_test and remote modes: files created/modified in sandbox # need to be downloaded to local workspace. noxrunner 2.0.0+ provides unified # download_workspace() that works correctly for all backends. try: logger.info("[TaskRunner] Syncing files from sandbox to workspace") # AgentLoop -> WorkflowCoordinator -> SandboxAdapter (always present) sandbox_adapter = loop.coordinator.sandbox # Ensure sandbox is initialized (may not be if loop.run() failed early) if not sandbox_adapter._initialized: sandbox_adapter.initialize() success = sandbox_adapter.download_workspace(task_spec.workspace_root) if success: logger.info( f"[TaskRunner] Files synced successfully from sandbox to {task_spec.workspace_root}" ) else: logger.warning("[TaskRunner] File sync failed, but continuing") except Exception as e: logger.error(f"[TaskRunner] Error syncing files from sandbox: {e}") logger.debug( f"[TaskRunner] Exception details: {type(e).__name__}: {e}", exc_info=True ) # Don't fail the task if sync fails, but log the error return { "success": report.get("status") == "success", "task_id": task_spec.task_id, "status": report.get("status"), "report": report, } except Exception as e: logger.error(f"[TaskRunner] Execute failed: {e}") logger.debug(f"[TaskRunner] Exception details: {type(e).__name__}: {e}", exc_info=True) return {"success": False, "error": str(e)}