"""LLM client wrapper for lexilux."""
import logging
import re
import time
import traceback
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from lexilux import Chat, ChatContinue, ChatHistory, ChatParams, ChatResult
# Note: ChatHistory is kept for backward compatibility but no longer used
from atloop.config.models import AtloopConfig
from atloop.llm.prompts import PromptLoader
from atloop.llm.schema import (
VALID_TOOLS,
ActionJSON,
parse_action_json,
)
from atloop.skills import EnhancedSkillLoader
logger = logging.getLogger(__name__)
[docs]
class LLMClient:
"""LLM client wrapper with cache-optimized history management."""
[docs]
def __init__(self, config: AtloopConfig, workspace_root: Optional[str] = None):
"""
Initialize LLM client.
Args:
config: atloop configuration
workspace_root: Optional workspace root path for project skills
"""
self.config = config
self.chat = Chat(
base_url=config.ai.completion.api_base,
api_key=config.ai.completion.api_key,
model=config.ai.completion.model,
timeout_s=120.0,
)
# Initialize prompt loader (default to English)
language = getattr(config, "prompt_language", "en")
self.prompt_loader = PromptLoader(language=language)
logger.debug(f"[LLMClient] Initialized PromptLoader with language: {language}")
# Initialize enhanced skill loader with multiple directories
builtin_skills_dir = Path(__file__).parent.parent / "skills" / "builtin"
project_dir = Path(workspace_root) if workspace_root else None
additional_dirs = [Path(d) for d in config.skills_dirs]
self.skill_loader = EnhancedSkillLoader(
builtin_skills_dir=builtin_skills_dir,
project_dir=project_dir,
additional_dirs=additional_dirs,
)
# Initialize fixed system prompt (never changes - preserves cache)
system_template = self.prompt_loader.load("system")
tool_schema = self.generate_tool_schema()
skills_info = self._get_skills_info()
self.system_prompt = system_template.replace("{TOOL_SCHEMA}", tool_schema)
if skills_info:
self.system_prompt += f"\n\n⚠️ Available Skills:\n{skills_info}\n\n**Important**: If a task matches a Skill description, you must immediately use the 'skill' tool to load the Skill's detailed content. Skills provide professional domain knowledge and best practice guidance."
logger.info(
f"[LLMClient] Added {len(self.skill_loader.skills) if self.skill_loader else 0} skills to system prompt"
)
logger.debug(f"[LLMClient] Skills list:\n{skills_info}")
else:
logger.warning(
f"[LLMClient] No skills found (skill_loader={self.skill_loader is not None})"
)
def _get_skills_info(self) -> str:
"""
Get skills information for system prompt (Layer 1: metadata only).
Returns:
Formatted string with skill descriptions, or empty string if no skills
"""
if not self.skill_loader:
return ""
return self.skill_loader.get_descriptions()
[docs]
def get_skill_content(self, skill_name: str) -> Optional[str]:
"""
Get full skill content (Layer 2: full SKILL.md body).
This is called when the 'skill' tool is used. The content is returned
as a tool result (user message), preserving cache.
Args:
skill_name: Skill name
Returns:
Full skill content, or None if skill not found
"""
if not self.skill_loader:
return None
return self.skill_loader.get_skill_content(skill_name)
[docs]
def load_prompt_template(self, template_name: str) -> str:
"""
Load prompt template using PromptLoader.
Args:
template_name: Template name (system, developer)
Returns:
Template content
"""
logger.debug(f"[LLMClient] Loading prompt template: {template_name}")
return self.prompt_loader.load(template_name)
[docs]
def build_user_message(
self,
goal: str,
constraints: List[str],
budget: Dict[str, int],
state_summary: Optional[str] = None,
project_profile: Optional[str] = None,
relevant_files: Optional[str] = None,
recent_error: Optional[str] = None,
current_diff: Optional[str] = None,
test_results: Optional[str] = None,
verification_success: Optional[bool] = None,
) -> str:
"""
Build user message from context (append-only mode for cache optimization).
This method builds a user message that will be appended to the conversation history.
The system prompt is fixed and never changes, preserving LLM cache.
Args:
goal: Task goal
constraints: Task constraints
budget: Budget dictionary
state_summary: State summary
project_profile: Project profile
relevant_files: Relevant file snippets
recent_error: Recent error
current_diff: Current diff
test_results: Test results
verification_success: Verification success status
Returns:
User message string (to be appended to history)
"""
developer_prompt = self.load_prompt_template("developer")
test_results_section = ""
if test_results:
if verification_success is True:
test_status = "✅ **Tests Passed**"
# Guide LLM to make correct decision when verification passes
completion_reminder = """
🚨 **DECISION REQUIRED**: Tests have PASSED. You must now decide:
1. **If task goal is achieved**: Set stop_reason="done" immediately. Do not use "continue".
2. **If task goal is NOT yet achieved**: Set stop_reason="continue" and explain what remains.
**Key principle**: When verification passes AND goal is achieved, the task is complete. Use stop_reason="done" to signal completion.
"""
elif verification_success is False:
test_status = "❌ **Tests Failed**"
completion_reminder = ""
else:
test_status = "⚠️ **Test Status Unknown**"
completion_reminder = ""
test_results_section = f"""
### Latest Test/Verification Results
{test_status}
{test_results}
{completion_reminder}
"""
logger.debug(
f"[LLMClient] build_user_message: state_summary length={len(state_summary) if state_summary else 0}"
)
replacements = {
"{GOAL}": goal,
"{CONSTRAINTS}": "\n".join(f"- {c}" for c in constraints) if constraints else "None",
"{MAX_LLM_CALLS}": str(budget.get("max_llm_calls", 30)),
"{MAX_TOOL_CALLS}": str(budget.get("max_tool_calls", 200)),
"{MAX_WALL_TIME_SEC}": str(budget.get("max_wall_time_sec", 1800)),
"{STATE_SUMMARY}": state_summary or "Initial state",
"{PROJECT_PROFILE}": project_profile or "Not identified",
"{RELEVANT_FILES}": relevant_files or "None",
"{RECENT_ERROR}": recent_error or "None",
"{CURRENT_DIFF}": current_diff or "No changes",
"{TEST_RESULTS}": test_results_section,
}
for placeholder, value in replacements.items():
if placeholder == "{STATE_SUMMARY}":
logger.debug(
f"[LLMClient] Replacing {{STATE_SUMMARY}}: length={len(str(value))}, preview={str(value)[:100] if value else 'None'}..."
)
developer_prompt = developer_prompt.replace(placeholder, str(value))
if "{STATE_SUMMARY}" in developer_prompt:
logger.error("[LLMClient] Error: {STATE_SUMMARY} placeholder was not replaced!")
else:
logger.debug("[LLMClient] {STATE_SUMMARY} placeholder successfully replaced")
return developer_prompt
[docs]
def build_prompt(
self,
goal: str,
constraints: List[str],
budget: Dict[str, int],
state_summary: Optional[str] = None,
project_profile: Optional[str] = None,
relevant_files: Optional[str] = None,
recent_error: Optional[str] = None,
current_diff: Optional[str] = None,
test_results: Optional[str] = None,
verification_success: Optional[bool] = None,
) -> str:
"""
Build complete prompt from templates (DEPRECATED - for backward compatibility).
This method is kept for backward compatibility but should be replaced with
build_user_message() for cache optimization.
Args:
goal: Task goal
constraints: Task constraints
budget: Budget dictionary
state_summary: State summary
project_profile: Project profile
relevant_files: Relevant file snippets
recent_error: Recent error
current_diff: Current diff
test_results: Test results
verification_success: Verification success status
Returns:
Complete prompt string
"""
user_message = self.build_user_message(
goal=goal,
constraints=constraints,
budget=budget,
state_summary=state_summary,
project_profile=project_profile,
relevant_files=relevant_files,
recent_error=recent_error,
current_diff=current_diff,
test_results=test_results,
verification_success=verification_success,
)
return f"{self.system_prompt}\n\n{user_message}"
[docs]
def plan_and_act(
self,
user_message: str,
max_retries: int = 2,
stream_callback: Optional[Callable[[str], None]] = None,
) -> Tuple[Optional[ActionJSON], Optional[str], Dict[str, Any], Optional[str], Dict[str, str]]:
"""
Call LLM to plan and generate actions.
Memory-only mode (no ChatHistory):
- Fixed system prompt (set once, never changes - preserves cache)
- Each call is independent with full context in user_message
- All history is managed through Memory, not conversation history
- Uses lexilux 2.0.0 stream() for ensuring complete responses
Args:
user_message: Required user message content containing all context (including memory summary)
max_retries: Maximum retry attempts if JSON parsing fails
stream_callback: Optional callback function to receive streaming output chunks
Returns:
Tuple of (ActionJSON or None, error_message, usage_info, full_output_text, file_contents_dict)
file_contents_dict maps placeholder names to actual file content
"""
if not user_message or not user_message.strip():
raise ValueError("user_message is required and cannot be empty (no history mode)")
usage_info = {"total_tokens": 0, "input_tokens": 0, "output_tokens": 0}
full_output = ""
error = None
self._log_prompt_size(user_message)
for attempt in range(max_retries + 1):
try:
current_message = self._build_retry_message(user_message, attempt, error)
self._print_streaming_status(stream_callback)
chat_params = ChatParams(
temperature=0.3,
max_tokens=self.config.ai.performance.max_tokens_output,
)
initial_result = self._stream_initial_response(
current_message, chat_params, stream_callback
)
result = self._handle_truncation(initial_result, current_message, stream_callback)
usage_info["total_tokens"] = result.usage.total_tokens
usage_info["input_tokens"] = result.usage.input_tokens
usage_info["output_tokens"] = result.usage.output_tokens
full_output = result.text
self._log_final_output(full_output, initial_result)
action_json, error, file_contents = parse_action_json(full_output)
self._log_file_contents_extraction(file_contents, action_json, full_output)
if action_json:
return action_json, None, usage_info, full_output, file_contents
else:
if attempt < max_retries:
continue
else:
return (
None,
f"Failed to parse Action JSON after {max_retries + 1} attempts: {error}",
usage_info,
full_output,
{},
)
except Exception as e:
error_str = str(e)
if "400" in error_str and "Bad Request" in error_str:
logger.warning(
f"[LLMClient] 400 Bad Request on attempt {attempt + 1}/{max_retries + 1}. "
f"This may indicate the prompt is too large. Error: {error_str[:200]}"
)
if attempt < max_retries:
continue
return None, f"LLM call failed: {e}", usage_info, full_output, {}
return None, "Max retries exceeded", usage_info, full_output, {}
def _log_prompt_size(self, user_message: str) -> None:
"""Log prompt size for monitoring."""
system_chars = len(self.system_prompt) if self.system_prompt else 0
user_chars = len(user_message)
total_with_system = user_chars + system_chars
logger.info(
f"[LLMClient] Prompt size estimate: ~{total_with_system} characters (~{total_with_system // 4} tokens) "
f"(user_message: {user_chars} chars, system: {system_chars} chars)"
)
def _build_retry_message(self, user_message: str, attempt: int, error: Optional[str]) -> str:
"""Build message for retry attempt with error correction if needed."""
if attempt > 0 and error:
error_correction = f"""
[IMPORTANT] JSON parsing failed - please fix the following issue:
Error details: {error}
Please ensure your response is valid JSON format with the following required fields:
- "actions": array containing tool call objects
- "stop_reason": string, value must be "continue", "done", or "fail"
Each action must contain:
- "tool": tool name ("run" or "write_file")
- "args": parameter object
Example format:
{{
"actions": [
{{"tool": "run", "args": {{"cmd": "command"}}}},
{{"tool": "write_file", "args": {{"path": "file.py", "content": "..."}}}}
],
"stop_reason": "continue"
}}
Please output only valid JSON, do not add any other text, comments, or explanations."""
return user_message + error_correction
return user_message
def _print_streaming_status(self, stream_callback: Optional[Callable[[str], None]]) -> None:
"""Print streaming status message."""
if stream_callback:
print(" [Streaming] ", end="", flush=True)
else:
print(" [Generating...] ", end="", flush=True)
def _stream_initial_response(
self,
current_message: str,
chat_params: ChatParams,
stream_callback: Optional[Callable[[str], None]],
) -> ChatResult:
"""Stream initial LLM response and return result."""
stream_iterator = self.chat.stream(
current_message,
system=self.system_prompt,
include_usage=True,
params=chat_params,
)
chunk_count = 0
total_delta_length = 0
try:
for chunk in stream_iterator:
chunk_count += 1
if chunk.delta:
total_delta_length += len(chunk.delta)
if stream_callback:
print(chunk.delta, end="", flush=True)
stream_callback(chunk.delta)
if chunk.done:
break
except Exception as stream_error:
logger.error(f"[LLMClient] Error during streaming: {stream_error}")
logger.debug(f"[LLMClient] Exception traceback: {traceback.format_exc()}")
logger.debug(
f"[LLMClient] Streaming stats: {chunk_count} chunks, total delta length: {total_delta_length} chars"
)
if stream_callback:
print()
else:
print("Done")
initial_result = stream_iterator.result.to_chat_result()
logger.info(
f"[LLMClient] Initial streaming result: finish_reason={initial_result.finish_reason}, "
f"length={len(initial_result.text)} chars, chunks={chunk_count}, "
f"delta_total_length={total_delta_length}"
)
if len(initial_result.text) == 0:
logger.error(
f"[LLMClient] Critical error: Initial result text is empty! "
f"chunks={chunk_count}, delta_total_length={total_delta_length}"
)
elif len(initial_result.text) != total_delta_length:
logger.warning(
f"[LLMClient] Warning: Initial result length ({len(initial_result.text)}) "
f"does not match delta total length ({total_delta_length})!"
)
else:
preview = initial_result.text[:100].replace("\n", "\\n")
logger.debug(f"[LLMClient] Initial result preview: {preview}...")
return initial_result
def _handle_truncation(
self,
initial_result: ChatResult,
current_message: str,
stream_callback: Optional[Callable[[str], None]],
) -> ChatResult:
"""Handle response truncation by continuing generation."""
if initial_result.finish_reason == "length":
max_continue_attempts = 5
if stream_callback:
print(" [Truncation detected, continuing generation...] ", end="", flush=True)
try:
result = self._continue_with_streaming(
self.chat,
initial_result,
max_continue_attempts,
original_user_message=current_message,
stream_callback=stream_callback,
)
if stream_callback:
print()
logger.info(
f"[LLMClient] Continue generation successful, final result length: {len(result.text)} chars"
)
except Exception as continue_error:
logger.error(f"[LLMClient] Error during continue generation: {continue_error}")
logger.warning(
f"[LLMClient] Using initial result (may be incomplete), length: {len(initial_result.text)} chars"
)
result = initial_result
placeholders_in_initial = re.findall(
r"---\(FILE_CONTENT_#\d+\)---", initial_result.text
)
if placeholders_in_initial:
logger.warning(
f"[LLMClient] Initial result contains placeholders {placeholders_in_initial}, "
f"but file content may be incomplete due to continue failure!"
)
else:
result = initial_result
return result
def _log_final_output(self, full_output: str, initial_result: ChatResult) -> None:
"""Log final output statistics."""
logger.info(
f"[LLMClient] Final full_output length: {len(full_output)} chars, "
f"finish_reason={initial_result.finish_reason}"
)
if len(full_output) == 0:
logger.error(
f"[LLMClient] Critical error: Final full_output is empty! "
f"Initial result length: {len(initial_result.text)} chars"
)
elif len(full_output) < len(initial_result.text):
logger.warning(
f"[LLMClient] Warning: Final full_output ({len(full_output)} chars) "
f"is shorter than initial result ({len(initial_result.text)} chars)!"
)
if initial_result.finish_reason == "length":
logger.info(
f"[LLMClient] Continued after truncation, final full_output length: {len(full_output)} chars"
)
placeholders_found = re.findall(r"---\(FILE_CONTENT_#\d+\)---", full_output)
logger.info(f"[LLMClient] Found FILE_CONTENT placeholders: {placeholders_found}")
def _log_file_contents_extraction(
self,
file_contents: Dict[str, str],
action_json: Optional[ActionJSON],
full_output: str,
) -> None:
"""Log file contents extraction results.
File contents (---(FILE_CONTENT_#N)--- blocks) are only expected when the LLM
uses write_file/append_file/edit_file tools with FILE_CONTENT_#N placeholders.
For run/read_file/etc. actions, empty file_contents is normal - not a warning.
"""
# Collect placeholders that file-writing actions expect
expected_placeholders = []
if action_json:
for action in action_json.actions:
if action.get("tool") in ["write_file", "append_file", "edit_file"]:
content = action.get("args", {}).get("content", "")
if isinstance(content, str) and content.startswith("FILE_CONTENT_#"):
expected_placeholders.append(content)
if file_contents:
logger.info(
f"[LLMClient] Extracted {len(file_contents)} file content placeholders: {list(file_contents.keys())}"
)
for placeholder, content in file_contents.items():
logger.debug(f"[LLMClient] {placeholder}: {len(content)} chars")
# Check if any expected placeholders are missing
missing = [p for p in expected_placeholders if p not in file_contents]
if missing:
logger.warning(
f"[LLMClient] Actions reference placeholders {missing} "
f"but corresponding ---(FILE_CONTENT_#N)--- blocks not found in LLM output!"
)
elif expected_placeholders:
# File-writing actions use placeholders but we extracted nothing - LLM didn't provide content blocks
logger.warning(
f"[LLMClient] Actions reference placeholders {expected_placeholders} "
f"but no ---(FILE_CONTENT_#N)--- blocks found in LLM output! "
f"full_output length: {len(full_output)}"
)
# else: No file-writing actions with placeholders (e.g., only run/read_file) -
# empty file_contents is expected, no need to log
def _continue_with_streaming(
self,
chat: Chat,
initial_result: ChatResult,
max_continues: int,
original_user_message: str,
stream_callback: Optional[Callable[[str], None]] = None,
) -> ChatResult:
"""
Continue generation with streaming output when response is truncated.
Simple approach: construct a one-round history (user prompt + AI response),
then use a continue prompt to ask LLM to continue output.
Args:
chat: Chat client instance
initial_result: Initial result (finish_reason == "length")
max_continues: Maximum number of continuation attempts
original_user_message: Original user message with full context
stream_callback: Optional callback function to receive streaming output chunks
Returns:
Merged complete result
"""
start_time = time.time()
continue_prompt = (
"Your output was truncated. Please continue outputting the remaining content."
)
all_results = [initial_result]
current_result = initial_result
continue_count = 0
accumulated_text = initial_result.text
logger.info(
f"[LLMClient] Starting streaming continue generation (max {max_continues} attempts)..."
)
while current_result.finish_reason == "length" and continue_count < max_continues:
continue_count += 1
logger.info(f"[LLMClient] Starting continue generation attempt {continue_count}...")
if stream_callback:
print(
f"\n [Continue generation {continue_count}/{max_continues}] ",
end="",
flush=True,
)
continue_history = ChatHistory()
continue_history.add_user(original_user_message)
continue_history.add_assistant(accumulated_text)
continue_history.add_user(continue_prompt)
chat_params = ChatParams(
temperature=0.3,
max_tokens=self.config.ai.performance.max_tokens_output,
)
try:
continue_stream = chat.stream_with_history(
continue_history,
message=None,
include_usage=True,
params=chat_params,
)
for chunk in continue_stream:
if chunk.delta:
if stream_callback:
print(chunk.delta, end="", flush=True)
stream_callback(chunk.delta)
if chunk.done:
break
continue_result = continue_stream.result.to_chat_result()
all_results.append(continue_result)
current_result = continue_result
accumulated_text += continue_result.text
logger.info(
f"[LLMClient] Continue generation attempt {continue_count} completed, "
f"length: {len(continue_result.text)} chars, "
f"finish_reason: {continue_result.finish_reason}, "
f"accumulated text length: {len(accumulated_text)} chars"
)
placeholders_in_continue = re.findall(
r"---\(FILE_CONTENT_#\d+\)---", continue_result.text
)
if placeholders_in_continue:
logger.info(
f"[LLMClient] Continue result contains placeholders: {placeholders_in_continue}"
)
partial_pattern = r"---\(FILE_CONTENT_#\d+\)?---?"
partial_matches = re.findall(partial_pattern, continue_result.text)
if partial_matches:
logger.warning(
f"[LLMClient] Continue result may contain partial placeholders: {partial_matches}"
)
except Exception as e:
logger.error(
f"[LLMClient] Continue generation attempt {continue_count} failed: {e}"
)
logger.error(f"[LLMClient] Exception details: {type(e).__name__}: {e}")
logger.debug(f"[LLMClient] Exception traceback: {traceback.format_exc()}")
if len(all_results) > 1:
merged = ChatContinue.merge_results(*all_results)
logger.warning(
f"[LLMClient] Continue failed, returning partial merged result (may be incomplete), "
f"length: {len(merged.text)} chars, contains {len(all_results)} results"
)
placeholders_in_merged = re.findall(r"---\(FILE_CONTENT_#\d+\)---", merged.text)
if placeholders_in_merged:
logger.warning(
f"[LLMClient] Partial result contains placeholders {placeholders_in_merged}, "
f"but file content may be incomplete due to continue failure!"
)
return merged
else:
logger.warning(
f"[LLMClient] Continue failed and only initial result available, "
f"returning initial result (may be incomplete), length: {len(initial_result.text)} chars"
)
return initial_result
if len(all_results) == 1:
full_result = all_results[0]
else:
full_result = ChatContinue.merge_results(*all_results)
logger.debug(
f"[LLMClient] Merged result: {len(all_results)} results, "
f"total length: {len(full_result.text)} chars"
)
for i, r in enumerate(all_results):
logger.debug(
f"[LLMClient] Result {i + 1}: {len(r.text)} chars, finish_reason: {r.finish_reason}"
)
elapsed_time = time.time() - start_time
logger.info(
f"[LLMClient] Streaming continue generation completed, elapsed: {elapsed_time:.2f}s, "
f"total length: {len(full_result.text)} chars, {continue_count} continues"
)
all_placeholders = re.findall(r"---\(FILE_CONTENT_#\d+\)---", full_result.text)
logger.info(f"[LLMClient] Merged complete text contains placeholders: {all_placeholders}")
return full_result
[docs]
def reset_history(self):
"""
Reset conversation history (for new task).
NOTE: This method is kept for backward compatibility but does nothing
since we no longer use ChatHistory. All history is managed through Memory.
"""
pass
[docs]
def add_tool_results_to_history(
self, actions: List[Dict[str, Any]], results: List[Dict[str, Any]]
):
"""
Add tool execution results to conversation history.
NOTE: This method is kept for backward compatibility but does nothing
since we no longer use ChatHistory. Tool results are stored in Memory
and included in the next memory summary.
Args:
actions: List of actions that were executed
results: List of tool execution results
"""
pass