Source code for atloop.llm.schema

"""Action JSON schema definition and validation."""

import json
import logging
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

if TYPE_CHECKING:
    from atloop.tools.registry import ToolRegistry

logger = logging.getLogger(__name__)

# Try to import optional JSON repair libraries
try:
    from json_repair import repair_json

    JSON_REPAIR_AVAILABLE = True
except ImportError:
    JSON_REPAIR_AVAILABLE = False
    logger.debug("json-repair not available, will use fallback JSON repair methods")

try:
    import json5

    JSON5_AVAILABLE = True
except ImportError:
    JSON5_AVAILABLE = False
    logger.debug("json5 not available, will use standard JSON parsing only")

# Action JSON Schema
# Note: Tool enum is now dynamic - generated from ToolRegistry at runtime
# This schema is used for JSON structure validation only, not tool enumeration
ACTION_JSON_SCHEMA = {
    "type": "object",
    "required": ["actions", "stop_reason"],
    "properties": {
        "current_step_thoughts": {"type": "string"},
        "plan": {"type": "array", "items": {"type": "string"}},
        "actions": {
            "type": "array",
            "items": {
                "type": "object",
                "required": ["tool", "args"],
                "properties": {
                    "tool": {"type": "string"},  # No enum - validated dynamically via ToolRegistry
                    "args": {"type": "object"},
                },
            },
        },
        "stop_reason": {
            "type": "string",
            "enum": ["continue", "done", "fail"],
        },
        "result_message": {"type": "string"},
    },
}

# DEPRECATED: VALID_TOOLS is no longer used for validation
# Tool validation is now done dynamically via ToolRegistry
# Kept for backward compatibility in error messages only
VALID_TOOLS = set()  # Empty set - will be populated dynamically if needed


[docs] class ActionJSONValidationError(ValueError): """Exception raised when ActionJSON validation fails."""
[docs] def __init__(self, message: str, data: Optional[Dict[str, Any]] = None): """ Initialize validation error. Args: message: Validation error message data: The invalid data that failed validation (for debugging) """ super().__init__(message) self.message = message self.data = data
[docs] class ActionJSON: """Action JSON data structure. Design principle: Fail Fast - Data validation happens at construction time - Invalid data is rejected immediately with clear error messages - Downstream code can trust that ActionJSON instances are valid """
[docs] def __init__( self, actions: List[Dict[str, Any]], stop_reason: str, current_step_thoughts: Optional[str] = None, plan: Optional[List[str]] = None, result_message: Optional[str] = None, ): """ Initialize Action JSON. Args: actions: List of action dictionaries stop_reason: Stop reason (continue, done, fail) current_step_thoughts: Optional current step thoughts (not a summary) plan: Optional plan steps result_message: Optional result message Raises: ActionJSONValidationError: If data is invalid """ # Type checks for constructor arguments (defensive programming at API boundary) if not isinstance(actions, list): raise ActionJSONValidationError( f"'actions' must be a list, but got {type(actions).__name__}." ) if not isinstance(stop_reason, str): raise ActionJSONValidationError( f"'stop_reason' must be a string, but got {type(stop_reason).__name__}." ) if stop_reason not in ["continue", "done", "fail"]: raise ActionJSONValidationError( f"Invalid stop_reason: '{stop_reason}'. Must be one of: 'continue', 'done', 'fail'." ) # Validate each action for i, action in enumerate(actions): if not isinstance(action, dict): raise ActionJSONValidationError( f"action[{i}] must be a dictionary, but got {type(action).__name__}." ) if "tool" not in action: raise ActionJSONValidationError(f"action[{i}] missing required field: 'tool'.") if "args" not in action: raise ActionJSONValidationError(f"action[{i}] missing required field: 'args'.") self.actions = actions self.stop_reason = stop_reason self.current_step_thoughts = current_step_thoughts self.plan = plan or [] self.result_message = result_message
[docs] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" result = { "actions": self.actions, "stop_reason": self.stop_reason, } if self.current_step_thoughts: result["current_step_thoughts"] = self.current_step_thoughts if self.plan: result["plan"] = self.plan if self.result_message: result["result_message"] = self.result_message return result
[docs] @classmethod def from_dict( cls, data: Dict[str, Any], validate: bool = True, tool_registry: Optional["ToolRegistry"] = None, ) -> "ActionJSON": """ Create from dictionary with validation. Design principle: Validate at the boundary - Data entering the system is validated immediately - Invalid data is rejected with clear error messages - Downstream code can trust the data structure Args: data: Dictionary containing action JSON data validate: Whether to validate the data (default: True) tool_registry: Optional ToolRegistry instance for dynamic tool validation. If provided, validates tool existence and delegates argument validation to tool.validate_args(). If not provided, only performs structural checks. Returns: ActionJSON instance Raises: ActionJSONValidationError: If data is invalid and validate=True TypeError: If data is not a dictionary """ # Type check at boundary if not isinstance(data, dict): raise TypeError( f"ActionJSON.from_dict() expects a dict, but got {type(data).__name__}." ) # Validate data structure if requested if validate: is_valid, error_msg = validate_action_json(data, tool_registry=tool_registry) if not is_valid: raise ActionJSONValidationError(error_msg, data=data) # Extract and construct (data is now guaranteed to be valid) # Support both old and new field names for backward compatibility current_step_thoughts = data.get("current_step_thoughts") or data.get("thought_summary") return cls( actions=data.get("actions", []), stop_reason=data.get("stop_reason", "continue"), current_step_thoughts=current_step_thoughts, plan=data.get("plan"), result_message=data.get("result_message"), )
[docs] def validate_action_json( data: Dict[str, Any], tool_registry: Optional["ToolRegistry"] = None ) -> Tuple[bool, Optional[str]]: """ Validate Action JSON structure with detailed error messages. This function performs structural validation only. Tool-specific argument validation is delegated to each tool's validate_args() method. Args: data: Action JSON dictionary tool_registry: Optional ToolRegistry instance for dynamic tool validation. If provided, validates tool existence and delegates argument validation to tool.validate_args(). If not provided, only performs structural checks. Returns: Tuple of (is_valid, error_message) """ # Check required fields if "actions" not in data: return ( False, "Missing required field: 'actions'. Your JSON must include an 'actions' array.", ) if "stop_reason" not in data: return ( False, "Missing required field: 'stop_reason'. Your JSON must include a 'stop_reason' field (one of: 'continue', 'done', 'fail').", ) # Check stop_reason if data["stop_reason"] not in ["continue", "done", "fail"]: return ( False, f"Invalid stop_reason: '{data['stop_reason']}'. Must be one of: 'continue', 'done', 'fail'.", ) # Check actions if not isinstance(data["actions"], list): return False, f"'actions' must be a list/array, but got {type(data['actions']).__name__}." # Get available tools from registry if provided available_tools = None if tool_registry: available_tools = set(tool_registry.list_tools()) # Count write_file actions - only one allowed per response write_file_count = 0 for i, action in enumerate(data["actions"]): if not isinstance(action, dict): return ( False, f"action[{i}] must be a dictionary/object, but got {type(action).__name__}.", ) if "tool" not in action: tool_list_msg = ( f" (one of: {sorted(available_tools)})" if available_tools else "" ) return ( False, f"action[{i}] missing required field: 'tool'. Each action must have a 'tool' field{tool_list_msg}.", ) if "args" not in action: return ( False, f"action[{i}] missing required field: 'args'. Each action must have an 'args' object/dictionary.", ) tool = action["tool"] if not isinstance(tool, str): return False, f"action[{i}].tool must be a string, but got {type(tool).__name__}." # Check args type BEFORE tool validation (type check is structural, not tool-specific) if not isinstance(action["args"], dict): return ( False, f"action[{i}].args must be a dictionary/object, but got {type(action['args']).__name__}.", ) # Validate tool existence if registry is available if tool_registry: if tool not in available_tools: return ( False, f"action[{i}] invalid tool: '{tool}'. Valid tools are: {sorted(available_tools)}.", ) # Delegate tool-specific argument validation to the tool itself tool_instance = tool_registry.get(tool) if tool_instance: is_valid, error_msg = tool_instance.validate_args(action["args"]) if not is_valid: return ( False, f"action[{i}] (tool='{tool}') invalid arguments: {error_msg or 'Validation failed'}.", ) # Track write_file actions for enforcement rule if tool == "write_file": write_file_count += 1 # Enforce single file creation per response if write_file_count > 1: return ( False, f"Only one 'write_file' action allowed per response (found {write_file_count}). Create files one at a time to avoid token limit issues.", ) return True, None
def extract_json_from_text(text: str) -> Optional[str]: """ Extract JSON from text (handles cases where LLM adds extra text). Improved extraction logic: 1. Try to find JSON object markers (```json, ```, {) 2. Handle nested braces correctly 3. Handle strings with escaped quotes 4. Try multiple extraction strategies 5. If multiple codeblocks found, try all and return the first valid one Args: text: Text that may contain JSON Returns: Extracted JSON string or None """ # Strategy 1: Look for code block markers (```json or ```) # Try all codeblocks and find the best one json_block_markers = [ ("```json", "```"), ("```", "```"), ] candidates = [] for start_marker, end_marker in json_block_markers: # Find all occurrences of this marker type start_idx = 0 while True: start_idx = text.find(start_marker, start_idx) if start_idx == -1: break # Find the end marker after start marker content_start = start_idx + len(start_marker) end_idx = text.find(end_marker, content_start) if end_idx != -1: json_candidate = text[content_start:end_idx].strip() candidates.append((json_candidate, start_marker)) start_idx = end_idx + len(end_marker) else: break # Try all candidates and return the first valid one (with required fields) valid_json = None if candidates: logger.debug(f"[extract_json_from_text] Found {len(candidates)} codeblock(s), trying all to find valid JSON") for json_candidate, marker_type in candidates: # Try to parse it try: # Fast path: try direct parsing first parsed = json.loads(json_candidate) # Validate it has required fields for ActionJSON if isinstance(parsed, dict) and "actions" in parsed and "stop_reason" in parsed: logger.debug(f"[extract_json_from_text] Found valid JSON in {marker_type} codeblock") valid_json = json_candidate break # Found valid one, stop searching except json.JSONDecodeError: # If direct parsing fails, try json repair if available if JSON_REPAIR_AVAILABLE: try: repaired_json = repair_json(json_candidate) # Verify the repaired JSON is valid parsed = json.loads(repaired_json) # Validate it has required fields if isinstance(parsed, dict) and "actions" in parsed and "stop_reason" in parsed: logger.debug(f"[extract_json_from_text] Found valid JSON in {marker_type} codeblock (repaired)") valid_json = repaired_json break # Found valid one, stop searching except Exception: # Repair failed, continue to next candidate pass if valid_json: return valid_json # Strategy 2: Find first { and match braces (handling strings) # Skip codeblocks we already tried - look for JSON outside codeblocks start_idx = 0 while True: start_idx = text.find("{", start_idx) if start_idx == -1: return None # Check if this { is inside a codeblock we already tried in_codeblock = False for start_marker, end_marker in json_block_markers: # Find the codeblock that contains this position codeblock_start = text.rfind(start_marker, 0, start_idx) if codeblock_start != -1: codeblock_end = text.find(end_marker, codeblock_start + len(start_marker)) if codeblock_end != -1 and start_idx < codeblock_end: # This { is inside a codeblock, skip it in_codeblock = True start_idx = codeblock_end + len(end_marker) break if not in_codeblock: break # Found a { outside codeblocks # Find matching closing brace, handling strings with escaped quotes brace_count = 0 in_string = False escape_next = False for i in range(start_idx, len(text)): char = text[i] if escape_next: escape_next = False continue if char == "\\": escape_next = True continue if char == '"' and not escape_next: in_string = not in_string continue if not in_string: if char == "{": brace_count += 1 elif char == "}": brace_count -= 1 if brace_count == 0: # Found complete JSON object json_candidate = text[start_idx : i + 1] # Validate it has required fields try: parsed = json.loads(json_candidate) if isinstance(parsed, dict) and "actions" in parsed and "stop_reason" in parsed: logger.debug("[extract_json_from_text] Found valid JSON outside codeblocks") return json_candidate except json.JSONDecodeError: pass # If validation fails, still return it (will be handled by caller) return json_candidate return None
[docs] def parse_action_json( text: str, max_retries: int = 2, tool_registry: Optional["ToolRegistry"] = None, ) -> Tuple[Optional[ActionJSON], Optional[str], Dict[str, str]]: """ Parse Action JSON from text with improved error handling. Also extracts file contents from placeholders (TYPE_descriptive-name format) that follow the JSON in the format: ---((WRITE_FILE_CONTENT_descriptive-name))--- <file content> ---((SHELL_COMMAND_descriptive-name))--- <command> ... Tries multiple strategies: 1. Direct JSON parsing 2. Extract JSON from code blocks (```json or ```) 3. Extract JSON by matching braces (handling strings) 4. Fix common JSON errors 5. Use json-repair if available 6. Use json5 if available Args: text: Text containing JSON and optionally file contents max_retries: Maximum number of retries (unused, kept for compatibility) tool_registry: Optional ToolRegistry instance for dynamic tool validation. If provided, validates tool existence and delegates argument validation to tool.validate_args(). If not provided, only performs structural checks. Returns: Tuple of (ActionJSON or None, error_message, file_contents_dict) file_contents_dict maps placeholder names (e.g., "WRITE_FILE_CONTENT_descriptive-name") to actual content """ if not text or not text.strip(): return None, "Empty text provided. Your response must contain valid JSON.", {} # Extract file contents from placeholders (e.g., ---(FILE_CONTENT_#1)--- ... ---(FILE_CONTENT_#2)---) file_contents = _extract_file_contents(text) if file_contents: logger.debug( f"[parse_action_json] Extracted {len(file_contents)} file contents: " f"keys={list(file_contents.keys())}" ) # Remove file content sections from text to get pure JSON json_text = _remove_file_content_sections(text) # Strategy 1: Try direct JSON parsing first try: data = json.loads(json_text) is_valid, error = validate_action_json(data, tool_registry=tool_registry) if is_valid: # Data already validated, skip validation in from_dict() for performance return ActionJSON.from_dict(data, validate=False, tool_registry=tool_registry), None, file_contents else: return None, error, file_contents # Return detailed validation error except json.JSONDecodeError as e: # Store the JSON decode error for later use json_decode_error = str(e) except Exception as e: return None, f"Unexpected error during JSON parsing: {e}", file_contents # Strategy 2: Try to extract JSON from text (handles code blocks, extra text) json_str = extract_json_from_text(json_text) if json_str: try: data = json.loads(json_str) is_valid, error = validate_action_json(data, tool_registry=tool_registry) if is_valid: return ActionJSON.from_dict(data, tool_registry=tool_registry), None, file_contents else: return None, error, file_contents # Return detailed validation error except json.JSONDecodeError as e: return ( None, f"Extracted JSON is invalid: {e}. Please ensure your JSON is properly formatted with matching braces and quotes.", file_contents, ) except Exception as e: return None, f"Unexpected error while parsing extracted JSON: {e}", file_contents # Strategy 3: Try to fix common JSON errors (especially for long text content) # This is critical for handling long text in write_file content that may have unescaped characters fixed_json_str = _fix_json_errors(json_text if json_str is None else json_str) if fixed_json_str: try: data = json.loads(fixed_json_str) is_valid, error = validate_action_json(data, tool_registry=tool_registry) if is_valid: logger.info("[parse_action_json] ✓ 使用JSON修复成功解析") return ActionJSON.from_dict(data, tool_registry=tool_registry), None, file_contents else: return None, error, file_contents except json.JSONDecodeError: pass # Strategy 4: Try json-repair if available (most powerful) - prioritize this if JSON_REPAIR_AVAILABLE: try: json_to_repair = json_text if json_str is None else json_str repaired_json = repair_json(json_to_repair) data = json.loads(repaired_json) is_valid, error = validate_action_json(data, tool_registry=tool_registry) if is_valid: logger.info("[parse_action_json] ✓ 使用json-repair成功修复并解析") return ActionJSON.from_dict(data, tool_registry=tool_registry), None, file_contents else: return None, error, file_contents except Exception as e: logger.debug(f"[parse_action_json] json-repair修复失败: {e}") # Strategy 5: Try json5 if available (supports more lenient JSON) if JSON5_AVAILABLE: try: json_to_parse = json_text if json_str is None else json_str data = json5.loads(json_to_parse) is_valid, error = validate_action_json(data, tool_registry=tool_registry) if is_valid: logger.info("[parse_action_json] ✓ 使用json5成功解析") return ActionJSON.from_dict(data, tool_registry=tool_registry), None, file_contents else: return None, error, file_contents except Exception as e: logger.debug(f"[parse_action_json] json5解析失败: {e}") # If all strategies fail, return detailed error error_msg = "Could not extract valid JSON from text. " if "json_decode_error" in locals(): error_msg += f"JSON parse error: {json_decode_error}. " error_msg += "Please ensure your response is valid JSON with the required fields: 'actions' (array) and 'stop_reason' (string: 'continue', 'done', or 'fail')." error_msg += " For long text content, use placeholders (FILE_CONTENT_#1, FILE_CONTENT_#2, etc.) and provide content after the JSON." return None, error_msg, file_contents
def _fix_json_errors(json_str: str) -> Optional[str]: """ Fix common JSON errors in LLM output, especially for long text content. Fixes: 1. Unescaped quotes in strings (especially in long text content) 2. Unescaped newlines, tabs, and control characters 3. Trailing commas 4. Missing commas 5. Single quotes (convert to double quotes where safe) Args: json_str: JSON string that may contain errors Returns: Fixed JSON string, or None if fixing is not possible """ if not json_str or not json_str.strip(): return None try: # Quick check: if already valid, return as-is json.loads(json_str) return json_str except json.JSONDecodeError: pass # Try to fix common errors fixed = json_str # 1. Remove comments (single-line and multi-line) lines = fixed.split("\n") fixed_lines = [] for line in lines: if "//" in line: # Only remove comment if we're not inside a string quote_count = line.count('"') - line.count('\\"') if quote_count % 2 == 0: # Even number of quotes = not in string line = line.split("//")[0].rstrip() fixed_lines.append(line) fixed = "\n".join(fixed_lines) fixed = re.sub(r"/\*.*?\*/", "", fixed, flags=re.DOTALL) # 2. Fix trailing commas fixed = re.sub(r",\s*}", "}", fixed) fixed = re.sub(r",\s*]", "]", fixed) # 3. Fix missing commas between objects/arrays fixed = re.sub(r"}\s*{", "}, {", fixed) fixed = re.sub(r"]\s*{", "], {", fixed) fixed = re.sub(r'}\s*"', '}, "', fixed) fixed = re.sub(r']\s*"', '], "', fixed) # 4. Most critical: Fix unescaped control characters in strings # This is the main issue with long text content (newlines, tabs, etc.) fixed = _escape_control_chars_safe(fixed) # 5. Try to fix unescaped quotes in strings (very carefully, conservative approach) # This is risky, so we do it last and only if the JSON is still invalid # Only fix quotes that are clearly inside string values and clearly problematic try: json.loads(fixed) # Already valid after control char fix, don't risk breaking it return fixed except json.JSONDecodeError: # Still invalid, try fixing quotes (but be very conservative) fixed = _fix_unescaped_quotes_in_strings(fixed) # Verify the fix worked try: json.loads(fixed) return fixed except json.JSONDecodeError: # Fix didn't work, but return it anyway for json5 to try return fixed def _escape_control_chars_safe(text: str) -> str: """ Safely escape control characters in JSON strings. Only escapes control characters that are inside string values, not in keys or outside strings. Args: text: JSON text Returns: Text with control characters escaped """ result = [] i = 0 in_string = False escape_next = False while i < len(text): char = text[i] if escape_next: result.append(char) escape_next = False i += 1 continue if char == "\\": result.append(char) escape_next = True i += 1 continue if char == '"': in_string = not in_string result.append(char) i += 1 continue if in_string: # Inside a string: escape control characters if char == "\n": result.append("\\n") elif char == "\t": result.append("\\t") elif char == "\r": result.append("\\r") elif ord(char) < 32: # Other control characters result.append(f"\\u{ord(char):04x}") else: result.append(char) else: # Outside string: keep as-is result.append(char) i += 1 return "".join(result) def _fix_unescaped_quotes_in_strings(text: str) -> str: """ Fix unescaped quotes inside string values. This is very tricky - we need to be conservative to avoid breaking valid JSON. Only fix quotes that are clearly inside string values and clearly unescaped. Strategy: When we encounter a quote inside a string, check if it's followed by valid JSON structure. If not, it's likely an unescaped quote in content. Args: text: JSON text Returns: Text with unescaped quotes in strings fixed (conservatively) """ result = [] i = 0 in_string = False escape_next = False while i < len(text): char = text[i] if escape_next: result.append(char) escape_next = False i += 1 continue if char == "\\": result.append(char) escape_next = True i += 1 continue if char == '"': if not in_string: # Starting a new string in_string = True result.append(char) else: # Inside a string - check if this is the closing quote # Look ahead to see if this is followed by valid JSON structure lookahead = text[i + 1 :].lstrip() # Check for common patterns that indicate this is a closing quote: # - Followed by : (key-value separator) # - Followed by , (array/object separator) # - Followed by } or ] (structure end) # - Followed by whitespace then one of the above is_closing_quote = ( lookahead.startswith(":") or lookahead.startswith(",") or lookahead.startswith("}") or lookahead.startswith("]") or not lookahead # End of text ) if is_closing_quote: # This is a closing quote in_string = False result.append(char) else: # This might be an unescaped quote inside the string # But be conservative - only escape if it's clearly wrong # Check if next non-whitespace char is a letter/digit (likely content) next_char = lookahead[0] if lookahead else "" if next_char.isalnum() or next_char in ".,;:!?": # Likely an unescaped quote in content - escape it result.append('\\"') else: # Might be valid - keep as-is result.append(char) i += 1 continue result.append(char) i += 1 return "".join(result) def _clean_extracted_content(content: str, placeholder_type: str) -> str: """ Clean extracted placeholder content by removing markdown artifacts and trailing whitespace. This function removes: - Markdown code block markers (```) at the start or end for executable types (SHELL_COMMAND, etc.) - Trailing whitespace and newlines - Common markdown artifacts that LLMs sometimes include Important: For file content types (WRITE_FILE_CONTENT, EDIT_FILE_CONTENT, APPEND_FILE_CONTENT), code blocks in the middle are preserved (they're part of the actual file content, e.g., markdown files with code examples). However, code block markers at the very start or end are removed as they are markdown artifacts, not part of the file content. Args: content: Raw extracted content placeholder_type: Type of placeholder (SHELL_COMMAND, PYTHON_SCRIPT, etc.) Returns: Cleaned content """ import re if not content: return content # Remove leading/trailing whitespace first content = content.rstrip() # For file content types, we need to be careful: # - Remove code block markers at the very start/end (they're artifacts) # - But preserve code blocks in the middle (they're part of the content) file_content_types = ("WRITE_FILE_CONTENT", "EDIT_FILE_CONTENT", "APPEND_FILE_CONTENT") if placeholder_type in file_content_types: # For file content, remove code block markers only at the very start and end # This prevents markdown artifacts from polluting the file content original_content = content # Remove code block markers at the very start (must be at beginning of content) # Pattern: ``` optionally followed by language identifier, then optional whitespace/newline content = re.sub(r'^```[a-z]*\s*\n?', '', content, flags=re.MULTILINE) # Remove code block markers at the very end (must be at end of content) # Pattern: optional whitespace/newline, then ``` content = re.sub(r'\n?\s*```\s*$', '', content, flags=re.MULTILINE) # Remove any trailing backticks that might be left over (but preserve content in the middle) while content and content.endswith('`') and len(content) > 1: # Check if removing the backtick would leave valid content test_content = content[:-1].rstrip() if test_content and not test_content.endswith('`'): content = test_content else: break # Log if we removed codeblock markers if content != original_content: logger.debug(f"[_clean_extracted_content] Removed codeblock markers from {placeholder_type} content") return content.rstrip() # For executable types (SHELL_COMMAND, PYTHON_SCRIPT, SHELL_SCRIPT), remove code block markers # These are markdown artifacts, not part of the actual code # Remove markdown code block markers (```) that might be at the very end # Pattern: optional whitespace, then ```, then optional language identifier, then end of string content = re.sub(r'\s*```[a-z]*\s*$', '', content, flags=re.MULTILINE) # Remove markdown code block markers at the very start # Only match if it's at the beginning of the entire content content = re.sub(r'^```[a-z]*\s*\n?', '', content, flags=re.MULTILINE) # Remove trailing backticks that might be left over (but preserve content in the middle) # Only remove if they're at the very end while content and content.endswith('`') and not content.rstrip('`').endswith('`'): content = content[:-1] # Be more aggressive with trailing cleanup for executable types content = content.rstrip() # Remove trailing backticks and newlines (with safety limit) max_iterations = 10 # Prevent infinite loop iterations = 0 while iterations < max_iterations and (content.endswith('`') or content.endswith('\n')): new_content = content.rstrip('`\n') if new_content == content: # No change, break to avoid infinite loop break content = new_content iterations += 1 return content def _extract_file_contents(text: str) -> Dict[str, str]: """ Extract contents from type-specific placeholders in the format: ---((WRITE_FILE_CONTENT_descriptive-name))--- <file content> ---((EDIT_FILE_CONTENT_descriptive-name))--- <old>old_string</old><new>new_string</new> ---((SHELL_COMMAND_descriptive-name))--- <command> ---((PYTHON_SCRIPT_descriptive-name))--- <python code> ... Args: text: Full text containing JSON and placeholder contents Returns: Dictionary mapping placeholder names (e.g., "WRITE_FILE_CONTENT_descriptive-name") to content Content is cleaned to remove markdown artifacts while preserving indentation and formatting. """ from atloop.llm.placeholder_patterns import ( extract_placeholder_name, find_placeholder_delimiters, ) file_contents = {} matches = find_placeholder_delimiters(text) for i, match in enumerate(matches): placeholder_type, placeholder = extract_placeholder_name(match) # Start position is after the delimiter (skip the newline if present) start_pos = match.end() # Skip leading newline if present if start_pos < len(text) and text[start_pos] == "\n": start_pos += 1 # Find the end position (next placeholder or end of text) if i + 1 < len(matches): end_pos = matches[i + 1].start() else: end_pos = len(text) # Extract content content = text[start_pos:end_pos] # Clean content to remove markdown artifacts (especially for commands/scripts) # placeholder_type is already the type (e.g., "SHELL_COMMAND"), placeholder is the full name content = _clean_extracted_content(content, placeholder_type) file_contents[placeholder] = content return file_contents def _remove_file_content_sections(text: str) -> str: """ Remove placeholder content sections from text, leaving only JSON. Removes sections like: ---((WRITE_FILE_CONTENT_descriptive-name))--- <content> ---((SHELL_COMMAND_descriptive-name))--- <command> ... Also handles cases where the entire output is wrapped in code blocks (```json or ```). This ensures compatibility even if LLM doesn't follow the no-codeblock rule. Args: text: Full text containing JSON and placeholder contents Returns: Text with placeholder content sections removed, and outer code blocks stripped if present """ from atloop.llm.placeholder_patterns import PLACEHOLDER_SECTION_REGEX # First, try to strip outer code blocks if the entire text is wrapped # This handles cases where LLM wraps everything in ```json ... ``` or ``` ... ``` import re stripped_text = text.strip() # Check for code block markers at start and end # Try ```json first (more specific), then generic ``` if stripped_text.startswith('```json'): # Remove ```json at start (with optional whitespace/newline) stripped_text = re.sub(r'^```json\s*\n?', '', stripped_text, count=1) # Remove ``` at end (with optional whitespace/newline before it) stripped_text = re.sub(r'\n?```\s*$', '', stripped_text, count=1) stripped_text = stripped_text.strip() elif stripped_text.startswith('```'): # Remove ``` at start (with optional whitespace/newline) stripped_text = re.sub(r'^```\s*\n?', '', stripped_text, count=1) # Remove ``` at end (with optional whitespace/newline before it) stripped_text = re.sub(r'\n?```\s*$', '', stripped_text, count=1) stripped_text = stripped_text.strip() # Remove all placeholder content sections result = PLACEHOLDER_SECTION_REGEX.sub("", stripped_text) return result.strip()