"""Action JSON schema definition and validation."""
import json
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Try to import optional JSON repair libraries
try:
from json_repair import repair_json
JSON_REPAIR_AVAILABLE = True
except ImportError:
JSON_REPAIR_AVAILABLE = False
logger.debug("json-repair not available, will use fallback JSON repair methods")
try:
import json5
JSON5_AVAILABLE = True
except ImportError:
JSON5_AVAILABLE = False
logger.debug("json5 not available, will use standard JSON parsing only")
# Action JSON Schema
ACTION_JSON_SCHEMA = {
"type": "object",
"required": ["actions", "stop_reason"],
"properties": {
"thought_summary": {"type": "string"},
"plan": {"type": "array", "items": {"type": "string"}},
"actions": {
"type": "array",
"items": {
"type": "object",
"required": ["tool", "args"],
"properties": {
"tool": {
"type": "string",
"enum": [
"run",
"write_file",
"append_file",
"read_file",
"read_skill_file",
"edit_file",
"multi_edit_file",
"glob",
"search",
"todo_write",
"todo_read",
"skill",
],
},
"args": {"type": "object"},
},
},
},
"stop_reason": {
"type": "string",
"enum": ["continue", "done", "fail"],
},
"result_message": {"type": "string"},
},
}
# Valid tool names
VALID_TOOLS = {
"run",
"write_file",
"append_file", # Append content to files
"read_file", # Enhanced file reading with type detection (sandbox files)
"read_skill_file", # Read files from skill directories (skill files only, stored locally)
"edit_file", # Git-style diff editing
"multi_edit_file", # Batch multi-file editing
"glob", # File matching with glob patterns
"search", # Enhanced search with regex, context lines, file filtering
"todo_write", # Write and manage todo lists
"todo_read", # Read todo lists
"skill", # Load skill knowledge on-demand
}
[docs]
class ActionJSON:
"""Action JSON data structure."""
[docs]
def __init__(
self,
actions: List[Dict[str, Any]],
stop_reason: str,
thought_summary: Optional[str] = None,
plan: Optional[List[str]] = None,
result_message: Optional[str] = None,
):
"""
Initialize Action JSON.
Args:
actions: List of action dictionaries
stop_reason: Stop reason (continue, done, fail)
thought_summary: Optional thought summary
plan: Optional plan steps
result_message: Optional result message
"""
self.actions = actions
self.stop_reason = stop_reason
self.thought_summary = thought_summary
self.plan = plan or []
self.result_message = result_message
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result = {
"actions": self.actions,
"stop_reason": self.stop_reason,
}
if self.thought_summary:
result["thought_summary"] = self.thought_summary
if self.plan:
result["plan"] = self.plan
if self.result_message:
result["result_message"] = self.result_message
return result
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ActionJSON":
"""Create from dictionary."""
return cls(
actions=data.get("actions", []),
stop_reason=data.get("stop_reason", "continue"),
thought_summary=data.get("thought_summary"),
plan=data.get("plan"),
result_message=data.get("result_message"),
)
[docs]
def validate_action_json(data: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate Action JSON structure with detailed error messages.
Args:
data: Action JSON dictionary
Returns:
Tuple of (is_valid, error_message)
"""
# Check required fields
if "actions" not in data:
return (
False,
"Missing required field: 'actions'. Your JSON must include an 'actions' array.",
)
if "stop_reason" not in data:
return (
False,
"Missing required field: 'stop_reason'. Your JSON must include a 'stop_reason' field (one of: 'continue', 'done', 'fail').",
)
# Check stop_reason
if data["stop_reason"] not in ["continue", "done", "fail"]:
return (
False,
f"Invalid stop_reason: '{data['stop_reason']}'. Must be one of: 'continue', 'done', 'fail'.",
)
# Check actions
if not isinstance(data["actions"], list):
return False, f"'actions' must be a list/array, but got {type(data['actions']).__name__}."
# Count write_file actions - only one allowed per response
write_file_count = 0
for i, action in enumerate(data["actions"]):
if not isinstance(action, dict):
return (
False,
f"action[{i}] must be a dictionary/object, but got {type(action).__name__}.",
)
if "tool" not in action:
return (
False,
f"action[{i}] missing required field: 'tool'. Each action must have a 'tool' field (one of: {sorted(VALID_TOOLS)}).",
)
if "args" not in action:
return (
False,
f"action[{i}] missing required field: 'args'. Each action must have an 'args' object/dictionary.",
)
tool = action["tool"]
if not isinstance(tool, str):
return False, f"action[{i}].tool must be a string, but got {type(tool).__name__}."
if tool not in VALID_TOOLS:
return (
False,
f"action[{i}] invalid tool: '{tool}'. Valid tools are: {sorted(VALID_TOOLS)}.",
)
if not isinstance(action["args"], dict):
return (
False,
f"action[{i}].args must be a dictionary/object, but got {type(action['args']).__name__}.",
)
# Validate tool-specific args
if tool == "run":
if "cmd" not in action["args"]:
return (
False,
f"action[{i}] (tool='run') missing required arg: 'cmd'. The 'run' tool requires a 'cmd' string argument.",
)
elif tool == "write_file":
if "path" not in action["args"]:
return (
False,
f"action[{i}] (tool='write_file') missing required arg: 'path'. The 'write_file' tool requires a 'path' string argument.",
)
if "content" not in action["args"]:
return (
False,
f"action[{i}] (tool='write_file') missing required arg: 'content'. The 'write_file' tool requires a 'content' string argument.",
)
write_file_count += 1
elif tool == "append_file":
if "path" not in action["args"]:
return (
False,
f"action[{i}] (tool='append_file') missing required arg: 'path'. The 'append_file' tool requires a 'path' string argument.",
)
if "content" not in action["args"]:
return (
False,
f"action[{i}] (tool='append_file') missing required arg: 'content'. The 'append_file' tool requires a 'content' string argument.",
)
elif tool == "read_file":
if "path" not in action["args"]:
return (
False,
f"action[{i}] (tool='read_file') missing required arg: 'path'. The 'read_file' tool requires a 'path' string argument.",
)
elif tool == "read_local_file":
if "path" not in action["args"]:
return (
False,
f"action[{i}] (tool='read_skill_file') missing required arg: 'path'. The 'read_skill_file' tool requires a 'path' string argument.",
)
elif tool == "edit_file":
if "path" not in action["args"]:
return (
False,
f"action[{i}] (tool='edit_file') missing required arg: 'path'. The 'edit_file' tool requires a 'path' string argument.",
)
if "content" not in action["args"]:
return (
False,
f"action[{i}] (tool='edit_file') missing required arg: 'content'. The 'edit_file' tool requires a 'content' string argument in format: <old>old_string</old><new>new_string</new>.",
)
elif tool == "multi_edit_file":
if "edits" not in action["args"]:
return (
False,
f"action[{i}] (tool='multi_edit_file') missing required arg: 'edits'. The 'multi_edit_file' tool requires an 'edits' array argument.",
)
if not isinstance(action["args"].get("edits"), list):
return (
False,
f"action[{i}] (tool='multi_edit_file') invalid arg: 'edits' must be an array of edit objects.",
)
if len(action["args"].get("edits", [])) == 0:
return (
False,
f"action[{i}] (tool='multi_edit_file') invalid arg: 'edits' array must contain at least one edit.",
)
elif tool == "glob":
if "pattern" not in action["args"]:
return (
False,
f"action[{i}] (tool='glob') missing required arg: 'pattern'. The 'glob' tool requires a 'pattern' string argument.",
)
elif tool == "search":
if "query" not in action["args"]:
return (
False,
f"action[{i}] (tool='search') missing required arg: 'query'. The 'search' tool requires a 'query' string argument.",
)
if "output_mode" in action["args"] and action["args"]["output_mode"] not in [
"content",
"files_with_matches",
"count",
]:
return (
False,
f"action[{i}] (tool='search') invalid arg: 'output_mode' must be one of 'content', 'files_with_matches', 'count'.",
)
elif tool == "todo_write":
if "todos" not in action["args"]:
return (
False,
f"action[{i}] (tool='todo_write') missing required arg: 'todos'. The 'todo_write' tool requires a 'todos' array argument.",
)
if not isinstance(action["args"].get("todos"), list):
return (
False,
f"action[{i}] (tool='todo_write') invalid arg: 'todos' must be an array.",
)
if len(action["args"].get("todos", [])) == 0:
return (
False,
f"action[{i}] (tool='todo_write') invalid arg: 'todos' array cannot be empty.",
)
for j, todo in enumerate(action["args"].get("todos", [])):
if not isinstance(todo, dict):
return False, f"action[{i}] (tool='todo_write') todo[{j}] must be a dictionary."
if "content" not in todo:
return (
False,
f"action[{i}] (tool='todo_write') todo[{j}] missing required field: 'content'.",
)
if "activeForm" not in todo:
return (
False,
f"action[{i}] (tool='todo_write') todo[{j}] missing required field: 'activeForm'.",
)
if "status" not in todo:
return (
False,
f"action[{i}] (tool='todo_write') todo[{j}] missing required field: 'status'.",
)
if todo.get("status") not in ["pending", "in_progress", "completed"]:
return (
False,
f"action[{i}] (tool='todo_write') todo[{j}] invalid status: must be 'pending', 'in_progress', or 'completed'.",
)
elif tool == "skill":
if "name" not in action["args"]:
return (
False,
f"action[{i}] (tool='skill') missing required arg: 'name'. The 'skill' tool requires a 'name' string argument.",
)
# Enforce single file creation per response
if write_file_count > 1:
return (
False,
f"Only one 'write_file' action allowed per response (found {write_file_count}). Create files one at a time to avoid token limit issues.",
)
return True, None
def extract_json_from_text(text: str) -> Optional[str]:
"""
Extract JSON from text (handles cases where LLM adds extra text).
Improved extraction logic:
1. Try to find JSON object markers (```json, ```, {)
2. Handle nested braces correctly
3. Handle strings with escaped quotes
4. Try multiple extraction strategies
Args:
text: Text that may contain JSON
Returns:
Extracted JSON string or None
"""
# Strategy 1: Look for code block markers (```json or ```)
json_block_markers = [
("```json", "```"),
("```", "```"),
]
for start_marker, end_marker in json_block_markers:
start_idx = text.find(start_marker)
if start_idx != -1:
# Find the end marker after start marker
content_start = start_idx + len(start_marker)
end_idx = text.find(end_marker, content_start)
if end_idx != -1:
json_candidate = text[content_start:end_idx].strip()
# Try to parse it
try:
json.loads(json_candidate)
return json_candidate
except json.JSONDecodeError:
pass
# Strategy 2: Find first { and match braces (handling strings)
start_idx = text.find("{")
if start_idx == -1:
return None
# Find matching closing brace, handling strings with escaped quotes
brace_count = 0
in_string = False
escape_next = False
for i in range(start_idx, len(text)):
char = text[i]
if escape_next:
escape_next = False
continue
if char == "\\":
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == "{":
brace_count += 1
elif char == "}":
brace_count -= 1
if brace_count == 0:
# Found complete JSON object
return text[start_idx : i + 1]
return None
[docs]
def parse_action_json(
text: str, max_retries: int = 2
) -> Tuple[Optional[ActionJSON], Optional[str], Dict[str, str]]:
"""
Parse Action JSON from text with improved error handling.
Also extracts file contents from placeholders (FILE_CONTENT_#1, FILE_CONTENT_#2, etc.)
that follow the JSON in the format:
---(FILE_CONTENT_#1)---
<file content>
---(FILE_CONTENT_#2)---
<file content>
...
Tries multiple strategies:
1. Direct JSON parsing
2. Extract JSON from code blocks (```json or ```)
3. Extract JSON by matching braces (handling strings)
4. Fix common JSON errors
5. Use json-repair if available
6. Use json5 if available
Args:
text: Text containing JSON and optionally file contents
max_retries: Maximum number of retries (unused, kept for compatibility)
Returns:
Tuple of (ActionJSON or None, error_message, file_contents_dict)
file_contents_dict maps placeholder names (e.g., "FILE_CONTENT_#1") to actual content
"""
if not text or not text.strip():
return None, "Empty text provided. Your response must contain valid JSON.", {}
# Extract file contents from placeholders (e.g., ---(FILE_CONTENT_#1)--- ... ---(FILE_CONTENT_#2)---)
file_contents = _extract_file_contents(text)
# Remove file content sections from text to get pure JSON
json_text = _remove_file_content_sections(text)
# Strategy 1: Try direct JSON parsing first
try:
data = json.loads(json_text)
is_valid, error = validate_action_json(data)
if is_valid:
return ActionJSON.from_dict(data), None, file_contents
else:
return None, error, file_contents # Return detailed validation error
except json.JSONDecodeError as e:
# Store the JSON decode error for later use
json_decode_error = str(e)
except Exception as e:
return None, f"Unexpected error during JSON parsing: {e}", file_contents
# Strategy 2: Try to extract JSON from text (handles code blocks, extra text)
json_str = extract_json_from_text(json_text)
if json_str:
try:
data = json.loads(json_str)
is_valid, error = validate_action_json(data)
if is_valid:
return ActionJSON.from_dict(data), None, file_contents
else:
return None, error, file_contents # Return detailed validation error
except json.JSONDecodeError as e:
return (
None,
f"Extracted JSON is invalid: {e}. Please ensure your JSON is properly formatted with matching braces and quotes.",
file_contents,
)
except Exception as e:
return None, f"Unexpected error while parsing extracted JSON: {e}", file_contents
# Strategy 3: Try to fix common JSON errors (especially for long text content)
# This is critical for handling long text in write_file content that may have unescaped characters
fixed_json_str = _fix_json_errors(json_text if json_str is None else json_str)
if fixed_json_str:
try:
data = json.loads(fixed_json_str)
is_valid, error = validate_action_json(data)
if is_valid:
logger.info("[parse_action_json] ✅ 使用JSON修复成功解析")
return ActionJSON.from_dict(data), None, file_contents
else:
return None, error, file_contents
except json.JSONDecodeError:
pass
# Strategy 4: Try json-repair if available (most powerful) - prioritize this
if JSON_REPAIR_AVAILABLE:
try:
json_to_repair = json_text if json_str is None else json_str
repaired_json = repair_json(json_to_repair)
data = json.loads(repaired_json)
is_valid, error = validate_action_json(data)
if is_valid:
logger.info("[parse_action_json] ✅ 使用json-repair成功修复并解析")
return ActionJSON.from_dict(data), None, file_contents
else:
return None, error, file_contents
except Exception as e:
logger.debug(f"[parse_action_json] json-repair修复失败: {e}")
# Strategy 5: Try json5 if available (supports more lenient JSON)
if JSON5_AVAILABLE:
try:
json_to_parse = json_text if json_str is None else json_str
data = json5.loads(json_to_parse)
is_valid, error = validate_action_json(data)
if is_valid:
logger.info("[parse_action_json] ✅ 使用json5成功解析")
return ActionJSON.from_dict(data), None, file_contents
else:
return None, error, file_contents
except Exception as e:
logger.debug(f"[parse_action_json] json5解析失败: {e}")
# If all strategies fail, return detailed error
error_msg = "Could not extract valid JSON from text. "
if "json_decode_error" in locals():
error_msg += f"JSON parse error: {json_decode_error}. "
error_msg += "Please ensure your response is valid JSON with the required fields: 'actions' (array) and 'stop_reason' (string: 'continue', 'done', or 'fail')."
error_msg += " For long text content, use placeholders (FILE_CONTENT_#1, FILE_CONTENT_#2, etc.) and provide content after the JSON."
return None, error_msg, file_contents
def _fix_json_errors(json_str: str) -> Optional[str]:
"""
Fix common JSON errors in LLM output, especially for long text content.
Fixes:
1. Unescaped quotes in strings (especially in long text content)
2. Unescaped newlines, tabs, and control characters
3. Trailing commas
4. Missing commas
5. Single quotes (convert to double quotes where safe)
Args:
json_str: JSON string that may contain errors
Returns:
Fixed JSON string, or None if fixing is not possible
"""
if not json_str or not json_str.strip():
return None
try:
# Quick check: if already valid, return as-is
json.loads(json_str)
return json_str
except json.JSONDecodeError:
pass
# Try to fix common errors
fixed = json_str
# 1. Remove comments (single-line and multi-line)
lines = fixed.split("\n")
fixed_lines = []
for line in lines:
if "//" in line:
# Only remove comment if we're not inside a string
quote_count = line.count('"') - line.count('\\"')
if quote_count % 2 == 0: # Even number of quotes = not in string
line = line.split("//")[0].rstrip()
fixed_lines.append(line)
fixed = "\n".join(fixed_lines)
fixed = re.sub(r"/\*.*?\*/", "", fixed, flags=re.DOTALL)
# 2. Fix trailing commas
fixed = re.sub(r",\s*}", "}", fixed)
fixed = re.sub(r",\s*]", "]", fixed)
# 3. Fix missing commas between objects/arrays
fixed = re.sub(r"}\s*{", "}, {", fixed)
fixed = re.sub(r"]\s*{", "], {", fixed)
fixed = re.sub(r'}\s*"', '}, "', fixed)
fixed = re.sub(r']\s*"', '], "', fixed)
# 4. Most critical: Fix unescaped control characters in strings
# This is the main issue with long text content (newlines, tabs, etc.)
fixed = _escape_control_chars_safe(fixed)
# 5. Try to fix unescaped quotes in strings (very carefully, conservative approach)
# This is risky, so we do it last and only if the JSON is still invalid
# Only fix quotes that are clearly inside string values and clearly problematic
try:
json.loads(fixed)
# Already valid after control char fix, don't risk breaking it
return fixed
except json.JSONDecodeError:
# Still invalid, try fixing quotes (but be very conservative)
fixed = _fix_unescaped_quotes_in_strings(fixed)
# Verify the fix worked
try:
json.loads(fixed)
return fixed
except json.JSONDecodeError:
# Fix didn't work, but return it anyway for json5 to try
return fixed
def _escape_control_chars_safe(text: str) -> str:
"""
Safely escape control characters in JSON strings.
Only escapes control characters that are inside string values,
not in keys or outside strings.
Args:
text: JSON text
Returns:
Text with control characters escaped
"""
result = []
i = 0
in_string = False
escape_next = False
while i < len(text):
char = text[i]
if escape_next:
result.append(char)
escape_next = False
i += 1
continue
if char == "\\":
result.append(char)
escape_next = True
i += 1
continue
if char == '"':
in_string = not in_string
result.append(char)
i += 1
continue
if in_string:
# Inside a string: escape control characters
if char == "\n":
result.append("\\n")
elif char == "\t":
result.append("\\t")
elif char == "\r":
result.append("\\r")
elif ord(char) < 32: # Other control characters
result.append(f"\\u{ord(char):04x}")
else:
result.append(char)
else:
# Outside string: keep as-is
result.append(char)
i += 1
return "".join(result)
def _fix_unescaped_quotes_in_strings(text: str) -> str:
"""
Fix unescaped quotes inside string values.
This is very tricky - we need to be conservative to avoid breaking valid JSON.
Only fix quotes that are clearly inside string values and clearly unescaped.
Strategy: When we encounter a quote inside a string, check if it's followed
by valid JSON structure. If not, it's likely an unescaped quote in content.
Args:
text: JSON text
Returns:
Text with unescaped quotes in strings fixed (conservatively)
"""
result = []
i = 0
in_string = False
escape_next = False
while i < len(text):
char = text[i]
if escape_next:
result.append(char)
escape_next = False
i += 1
continue
if char == "\\":
result.append(char)
escape_next = True
i += 1
continue
if char == '"':
if not in_string:
# Starting a new string
in_string = True
result.append(char)
else:
# Inside a string - check if this is the closing quote
# Look ahead to see if this is followed by valid JSON structure
lookahead = text[i + 1 :].lstrip()
# Check for common patterns that indicate this is a closing quote:
# - Followed by : (key-value separator)
# - Followed by , (array/object separator)
# - Followed by } or ] (structure end)
# - Followed by whitespace then one of the above
is_closing_quote = (
lookahead.startswith(":")
or lookahead.startswith(",")
or lookahead.startswith("}")
or lookahead.startswith("]")
or not lookahead # End of text
)
if is_closing_quote:
# This is a closing quote
in_string = False
result.append(char)
else:
# This might be an unescaped quote inside the string
# But be conservative - only escape if it's clearly wrong
# Check if next non-whitespace char is a letter/digit (likely content)
next_char = lookahead[0] if lookahead else ""
if next_char.isalnum() or next_char in ".,;:!?":
# Likely an unescaped quote in content - escape it
result.append('\\"')
else:
# Might be valid - keep as-is
result.append(char)
i += 1
continue
result.append(char)
i += 1
return "".join(result)
def _extract_file_contents(text: str) -> Dict[str, str]:
"""
Extract file contents from placeholders in the format:
---(FILE_CONTENT_#1)---
<file content>
---(FILE_CONTENT_#2)---
<file content>
...
For edit_file, the content format is:
---(FILE_CONTENT_#N)---
<old>old_string</old><new>new_string</new>
Args:
text: Full text containing JSON and file contents
Returns:
Dictionary mapping placeholder names (e.g., "FILE_CONTENT_#1") to content
"""
file_contents = {}
# Pattern to match: ---(FILE_CONTENT_#N)---
pattern = r"---\(FILE_CONTENT_#(\d+)\)---"
matches = list(re.finditer(pattern, text))
for i, match in enumerate(matches):
placeholder = f"FILE_CONTENT_#{match.group(1)}"
start_pos = match.end()
# Find the end position (next placeholder or end of text)
if i + 1 < len(matches):
end_pos = matches[i + 1].start()
else:
end_pos = len(text)
# Extract content (strip leading/trailing whitespace)
content = text[start_pos:end_pos].strip()
file_contents[placeholder] = content
return file_contents
def _remove_file_content_sections(text: str) -> str:
"""
Remove file content sections from text, leaving only JSON.
Removes sections like:
---(FILE_CONTENT_#1)---
<content>
---(FILE_CONTENT_#2)---
<content>
Args:
text: Full text containing JSON and file contents
Returns:
Text with file content sections removed
"""
# Pattern to match: ---(FILE_CONTENT_#N)--- ... (until next placeholder or end)
pattern = r"---\(FILE_CONTENT_#\d+\)---.*?(?=---\(FILE_CONTENT_#\d+\)---|$)"
# Remove all file content sections
result = re.sub(pattern, "", text, flags=re.DOTALL)
return result.strip()