"""Sandbox adapter for noxrunner."""
from pathlib import Path
from typing import Dict, List, Optional
from noxrunner import NoxRunnerClient
from atloop.config.models import SandboxConfig
[docs]
class SandboxAdapter:
"""Adapter for noxrunner sandbox execution."""
[docs]
def __init__(self, config: SandboxConfig, session_id: str):
"""
Initialize sandbox adapter.
Args:
config: Sandbox configuration
session_id: Unique session identifier
"""
self.config = config
self.session_id = session_id
self.client = NoxRunnerClient(
base_url=config.base_url,
timeout=config.timeout,
local_test=config.local_test,
)
self._initialized = False
[docs]
def initialize(self) -> bool:
"""
Initialize sandbox (create if needed).
Returns:
True if successful
"""
if self._initialized:
return True
try:
# Create sandbox
self.client.create_sandbox(
session_id=self.session_id,
ttl_seconds=self.config.session_ttl_seconds,
image=self.config.image,
cpu_limit=self.config.cpu_limit,
memory_limit=self.config.memory_limit,
ephemeral_storage_limit=self.config.ephemeral_storage_limit,
)
self._initialized = True
return True
except Exception as e:
print(f"Failed to create sandbox: {e}")
return False
[docs]
def upload_workspace(self, workspace_path: str) -> bool:
"""
Upload workspace to sandbox.
Args:
workspace_path: Local workspace path
Returns:
True if successful
"""
if not self._initialized:
if not self.initialize():
return False
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
try:
# Upload all files in workspace
files = {}
for file_path in workspace.rglob("*"):
if file_path.is_file():
# Skip .git directory
if ".git" in file_path.parts:
continue
# Get relative path
rel_path = file_path.relative_to(workspace)
# Read file content
with open(file_path, "rb") as f:
files[str(rel_path)] = f.read()
if files:
return self.client.upload_files(
session_id=self.session_id,
files=files,
dest="/workspace",
)
return True
except Exception as e:
print(f"Failed to upload workspace: {e}")
return False
[docs]
def initialize_git(self) -> bool:
"""
Initialize git repository in sandbox if not exists.
Returns:
True if successful
"""
if not self._initialized:
if not self.initialize():
return False
try:
# Check if git exists
result = self.client.exec_shell(
self.session_id,
"git rev-parse --git-dir 2>/dev/null || echo 'not-git'",
workdir="/workspace",
)
if result["exitCode"] == 0 and "not-git" not in result["stdout"]:
# Git already initialized
return True
# Initialize git
result = self.client.exec_shell(
self.session_id,
"git init",
workdir="/workspace",
)
if result["exitCode"] != 0:
return False
# Add all files
result = self.client.exec_shell(
self.session_id,
"git add .",
workdir="/workspace",
)
if result["exitCode"] != 0:
return False
# Create initial commit
result = self.client.exec_shell(
self.session_id,
"git commit -m 'Initial commit' || git config user.email 'agent@atloop' && git config user.name 'atloop Agent' && git commit -m 'Initial commit'",
workdir="/workspace",
)
return result["exitCode"] == 0
except Exception as e:
print(f"Failed to initialize git: {e}")
return False
[docs]
def exec_shell(
self,
command: str,
workdir: str = "/workspace",
env: Optional[Dict[str, str]] = None,
timeout_seconds: int = 30,
) -> Dict:
"""
Execute shell command in sandbox.
Args:
command: Shell command to execute
workdir: Working directory
env: Environment variables
timeout_seconds: Command timeout
Returns:
Dict with exitCode, stdout, stderr, durationMs
"""
if not self._initialized:
if not self.initialize():
return {
"exitCode": 1,
"stdout": "",
"stderr": "Sandbox not initialized",
"durationMs": 0,
}
return self.client.exec_shell(
session_id=self.session_id,
command=command,
workdir=workdir,
env=env,
timeout_seconds=timeout_seconds,
)
[docs]
def exec(
self,
cmd: List[str],
workdir: str = "/workspace",
env: Optional[Dict[str, str]] = None,
timeout_seconds: int = 30,
) -> Dict:
"""
Execute a command in the sandbox (using exec, not exec_shell).
This returns the correct exit code from the command itself, not from sh.
Args:
cmd: Command to execute (list of strings)
workdir: Working directory
env: Environment variables
timeout_seconds: Command timeout
Returns:
Dict with exitCode, stdout, stderr, durationMs
"""
if not self._initialized:
if not self.initialize():
return {
"exitCode": 1,
"stdout": "",
"stderr": "Sandbox not initialized",
"durationMs": 0,
}
return self.client.exec(
session_id=self.session_id,
cmd=cmd,
workdir=workdir,
env=env,
timeout_seconds=timeout_seconds,
)
[docs]
def download_workspace(self, workspace_path: str) -> bool:
"""
Download workspace from sandbox to local directory.
This method delegates to noxrunner's download_workspace method,
which handles all the details of downloading and extracting files
regardless of the backend type (local or remote).
Args:
workspace_path: Local workspace path to download to
Returns:
True if successful
"""
if not self._initialized:
# If sandbox was never initialized, nothing to download
import logging
logger = logging.getLogger(__name__)
logger.debug("Sandbox not initialized, skipping download")
return True
try:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Downloading workspace from sandbox session {self.session_id}")
# Use noxrunner's download_workspace method
# This handles both local and remote backends transparently
success = self.client.download_workspace(
session_id=self.session_id,
local_dir=workspace_path,
src="/workspace",
)
if success:
logger.info(f"Downloaded workspace from sandbox to {workspace_path}")
else:
logger.warning("Failed to download workspace from sandbox")
return success
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to download workspace: {e}")
logger.debug(f"Exception details: {type(e).__name__}: {e}", exc_info=True)
return False