Source code for atloop.llm.client

"""LLM client wrapper for lexilux."""

import logging
import time
import traceback
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple

from lexilux import Chat, ChatContinue, ChatHistory, ChatParams, ChatResult

# Note: ChatHistory is kept for backward compatibility but no longer used
from atloop.config.models import AtloopConfig
from atloop.llm.prompts import PromptLoader
from atloop.llm.schema import (
    ActionJSON,
    parse_action_json,
)
from atloop.output.emitter import OutputEventEmitter
from atloop.output.events import LLMCallEvent, LLMResultEvent, LLMStreamEvent
from atloop.skills import EnhancedSkillLoader

logger = logging.getLogger(__name__)

# Import placeholder patterns from centralized module
from atloop.llm.placeholder_patterns import (
    PARTIAL_PLACEHOLDER_REGEX,
    PLACEHOLDER_DELIMITER_REGEX,
)


[docs] class LLMClient: """LLM client wrapper with cache-optimized history management."""
[docs] def __init__(self, config: AtloopConfig, workspace_root: Optional[str] = None): """ Initialize LLM client. Args: config: atloop configuration workspace_root: Optional workspace root path for project skills """ self.config = config self.chat = Chat( base_url=config.ai.completion.api_base, api_key=config.ai.completion.api_key, model=config.ai.completion.model, timeout_s=120.0, ) # Initialize prompt loader (default to English) language = getattr(config, "prompt_language", "en") self.prompt_loader = PromptLoader(language=language) logger.debug(f"[LLMClient] Initialized PromptLoader with language: {language}") # Initialize enhanced skill loader with multiple directories from atloop.config.loader import ConfigLoader builtin_skills_dir = Path(__file__).parent.parent / "skills" / "builtin" project_dir = Path(workspace_root) if workspace_root else None atloop_dir = ConfigLoader.get_atloop_dir() additional_dirs = [atloop_dir / "skills", Path.home() / ".atloop" / "skills"] # Filter out non-existent directories additional_dirs = [d for d in additional_dirs if d.exists()] self.skill_loader = EnhancedSkillLoader( builtin_skills_dir=builtin_skills_dir, project_dir=project_dir, additional_dirs=additional_dirs, ) # Initialize fixed system prompt (never changes - preserves cache) system_template = self.prompt_loader.load("system") tool_schema = self.generate_tool_schema() skills_info = self._get_skills_info() self.system_prompt = system_template.replace("{TOOL_SCHEMA}", tool_schema) if skills_info: self.system_prompt += f"\n\nAvailable Skills:\n{skills_info}\n\nIf a task matches a skill description, use `load_skill` to get the skill's main content and resource list, then use `load_skill_resource` to load specific resources when needed." logger.info( f"[LLMClient] Added {len(self.skill_loader.skills) if self.skill_loader else 0} skills to system prompt" ) logger.debug(f"[LLMClient] Skills list:\n{skills_info}") else: logger.warning( f"[LLMClient] No skills found (skill_loader={self.skill_loader is not None})" )
[docs] def generate_tool_schema(self) -> str: """ Generate comprehensive tool schema description for prompt. Dynamically extracts detailed descriptions from tool classes, including: - Usage guidelines - Important warnings - Parameters - Examples - When to use vs other tools Returns: Formatted tool schema description string """ tools_desc = ["## Available Tools\n"] try: from atloop.runtime.sandbox_adapter import SandboxAdapter from atloop.tools.registry import ToolRegistry dummy_sandbox = SandboxAdapter(self.config.sandbox, "dummy") registry = ToolRegistry(dummy_sandbox, skill_loader=self.skill_loader) # Collect tools with their detailed descriptions tool_info = {} for tool_name, tool in registry.tools.items(): try: detailed_desc = tool.get_detailed_description() tool_info[tool_name] = { "description": detailed_desc, "simple_desc": tool.description, } except Exception as e: logger.warning( f"[LLMClient] Failed to get detailed description for {tool_name}: {e}" ) tool_info[tool_name] = { "description": tool.description, "simple_desc": tool.description, } logger.debug( f"[LLMClient] Auto-extracted detailed descriptions for {len(tool_info)} tools" ) except Exception as e: logger.warning( f"[LLMClient] Failed to auto-extract tool descriptions: {e}. Using fallback." ) tool_info = {} # Group tools by category for better organization file_tools = ["write_file", "append_file", "edit_file", "multi_edit_file", "read_file", "read_skill_file"] execution_tools = ["run", "run_shell_script_string", "run_python_script_string"] search_tools = ["search", "glob"] interaction_tools = ["todo_write", "todo_read", "load_skill", "load_skill_resource"] # Priority order: most commonly used tools first priority_order = [ "edit_file", # Most common for modifications "append_file", # Common for large files "write_file", # Common for new files "read_file", # Common for reading "run", # Common for commands ] # Get all tools all_tools = sorted(tool_info.keys()) # Organize tools categorized = { "File Operations": [], "Execution Tools": [], "Search & Discovery": [], "Interaction & Skills": [], "Other": [], } for tool_name in all_tools: if tool_name in file_tools: categorized["File Operations"].append(tool_name) elif tool_name in execution_tools: categorized["Execution Tools"].append(tool_name) elif tool_name in search_tools: categorized["Search & Discovery"].append(tool_name) elif tool_name in interaction_tools: categorized["Interaction & Skills"].append(tool_name) else: categorized["Other"].append(tool_name) # Sort within categories by priority for category in categorized: tools = categorized[category] # Sort: priority tools first, then alphabetically tools.sort(key=lambda t: (t not in priority_order, t)) # Generate formatted output for category, tool_names in categorized.items(): if not tool_names: continue tools_desc.append(f"\n### {category}\n") for tool_name in tool_names: if tool_name not in tool_info: continue info = tool_info[tool_name] detailed_desc = info["description"] # Format the description nicely tools_desc.append(f"#### `{tool_name}`") tools_desc.append("") tools_desc.append(detailed_desc) tools_desc.append("") return "\n".join(tools_desc)
def _get_skills_info(self) -> str: """ Get skills information for system prompt (Layer 1: metadata only). Returns: Formatted string with skill descriptions, or empty string if no skills """ if not self.skill_loader: return "" return self.skill_loader.get_descriptions()
[docs] def load_prompt_template(self, template_name: str) -> str: """ Load prompt template using PromptLoader. Args: template_name: Template name (system, developer) Returns: Template content """ logger.debug(f"[LLMClient] Loading prompt template: {template_name}") return self.prompt_loader.load(template_name)
[docs] def build_user_message( self, goal: str, budget: Dict[str, int], state_summary: Optional[str] = None, memory_context: Optional[str] = None, # New parameter project_profile: Optional[str] = None, relevant_files: Optional[str] = None, recent_error: Optional[str] = None, current_diff: Optional[str] = None, test_results: Optional[str] = None, verification_success: Optional[bool] = None, ) -> str: """ Build user message from context (append-only mode for cache optimization). This method builds a user message that will be appended to the conversation history. The system prompt is fixed and never changes, preserving LLM cache. Args: goal: Task goal budget: Budget dictionary state_summary: State summary project_profile: Project profile relevant_files: Relevant file snippets recent_error: Recent error current_diff: Current diff test_results: Test results verification_success: Verification success status Returns: User message string (to be appended to history) """ developer_prompt = self.load_prompt_template("developer") test_results_section = "" if test_results: if verification_success is True: test_status = "✓ **Tests Passed**" # Guide LLM to make correct decision when verification passes completion_reminder = """ 🚨 **DECISION REQUIRED**: Tests have PASSED. You must now decide: 1. **If task goal is achieved**: Set stop_reason="done" immediately. Do not use "continue". 2. **If task goal is NOT yet achieved**: Set stop_reason="continue" and explain what remains. **Key principle**: When verification passes AND goal is achieved, the task is complete. Use stop_reason="done" to signal completion. """ elif verification_success is False: test_status = "❌ **Tests Failed**" completion_reminder = "" else: test_status = "⚠️ **Test Status Unknown**" completion_reminder = "" test_results_section = f""" ### Latest Test/Verification Results {test_status} {test_results} {completion_reminder} """ # Use memory_context if provided, otherwise fall back to state_summary (backward compatibility) memory_content = ( memory_context if memory_context is not None else (state_summary or "Initial state") ) logger.debug( f"[LLMClient] build_user_message: memory_content length={len(memory_content) if memory_content else 0}" ) replacements = { "{GOAL}": goal, "{MAX_LLM_CALLS}": str(budget.get("max_llm_calls", 30)), "{MAX_TOOL_CALLS}": str(budget.get("max_tool_calls", 200)), "{MAX_WALL_TIME_SEC}": str(budget.get("max_wall_time_sec", 1800)), "{STATE_SUMMARY}": memory_content, # Use memory_context or state_summary "{PROJECT_PROFILE}": project_profile or "Not identified", "{RELEVANT_FILES}": relevant_files or "None", "{RECENT_ERROR}": recent_error or "None", "{CURRENT_DIFF}": current_diff or "No changes", "{TEST_RESULTS}": test_results_section, } for placeholder, value in replacements.items(): if placeholder == "{STATE_SUMMARY}": logger.debug( f"[LLMClient] Replacing {{STATE_SUMMARY}}: length={len(str(value))}, preview={str(value)[:100] if value else 'None'}..." ) developer_prompt = developer_prompt.replace(placeholder, str(value)) if "{STATE_SUMMARY}" in developer_prompt: logger.error("[LLMClient] Error: {STATE_SUMMARY} placeholder was not replaced!") else: logger.debug("[LLMClient] {STATE_SUMMARY} placeholder successfully replaced") return developer_prompt
[docs] def build_prompt( self, goal: str, budget: Dict[str, int], state_summary: Optional[str] = None, project_profile: Optional[str] = None, relevant_files: Optional[str] = None, recent_error: Optional[str] = None, current_diff: Optional[str] = None, test_results: Optional[str] = None, verification_success: Optional[bool] = None, ) -> str: """ Build complete prompt from templates (DEPRECATED - for backward compatibility). This method is kept for backward compatibility but should be replaced with build_user_message() for cache optimization. Args: goal: Task goal budget: Budget dictionary state_summary: State summary project_profile: Project profile relevant_files: Relevant file snippets recent_error: Recent error current_diff: Current diff test_results: Test results verification_success: Verification success status Returns: Complete prompt string """ user_message = self.build_user_message( goal=goal, budget=budget, state_summary=state_summary, project_profile=project_profile, relevant_files=relevant_files, recent_error=recent_error, current_diff=current_diff, test_results=test_results, verification_success=verification_success, ) return f"{self.system_prompt}\n\n{user_message}"
[docs] def plan_and_act( self, user_message: str, max_retries: int = 2, stream_callback: Optional[Callable[[str], None]] = None, step: Optional[int] = None, task_id: Optional[str] = None, ) -> Tuple[Optional[ActionJSON], Optional[str], Dict[str, Any], Optional[str], Dict[str, str]]: """ Call LLM to plan and generate actions. Memory-only mode (no ChatHistory): - Fixed system prompt (set once, never changes - preserves cache) - Each call is independent with full context in user_message - All history is managed through Memory, not conversation history - Uses lexilux 2.0.0 stream() for ensuring complete responses Args: user_message: Required user message content containing all context (including memory summary) max_retries: Maximum retry attempts if JSON parsing fails stream_callback: Optional callback function to receive streaming output chunks step: Optional step number for event emission task_id: Optional task ID for event emission Returns: Tuple of (ActionJSON or None, error_message, usage_info, full_output_text, file_contents_dict) file_contents_dict maps placeholder names to actual file content """ if not user_message or not user_message.strip(): raise ValueError("user_message is required and cannot be empty (no history mode)") usage_info = {"total_tokens": 0, "input_tokens": 0, "output_tokens": 0} full_output = "" error = None self._log_prompt_size(user_message) # Get event emitter if step and task_id are provided event_emitter = None if step is not None and task_id: event_emitter = OutputEventEmitter() llm_call_start_time = None for attempt in range(max_retries + 1): try: current_message = self._build_retry_message(user_message, attempt, error) self._print_streaming_status(stream_callback) chat_params = ChatParams( temperature=0.3, max_tokens=self.config.ai.performance.max_tokens_output, ) # Emit LLM call event before calling if event_emitter: llm_call_start_time = time.time() event_emitter.emit( LLMCallEvent( step=step, task_id=task_id, model=self.config.ai.completion.model, prompt_length=len(current_message), ) ) initial_result = self._stream_initial_response( current_message, chat_params, stream_callback, step, task_id, event_emitter ) result = self._handle_truncation( initial_result, current_message, stream_callback, step, task_id, event_emitter ) usage_info["total_tokens"] = result.usage.total_tokens usage_info["input_tokens"] = result.usage.input_tokens usage_info["output_tokens"] = result.usage.output_tokens full_output = result.text self._log_final_output(full_output, initial_result) action_json, error, file_contents = parse_action_json(full_output) self._log_file_contents_extraction(file_contents, action_json, full_output) # Emit LLM result event if event_emitter and action_json and llm_call_start_time: # ActionJSON.actions is List[Dict[str, Any]], not objects with attributes actions_list = [ {"tool": action.get("tool", ""), "args": action.get("args", {})} for action in action_json.actions ] duration_ms = int((time.time() - llm_call_start_time) * 1000) event_emitter.emit( LLMResultEvent( step=step, task_id=task_id, model=self.config.ai.completion.model, tokens_in=result.usage.input_tokens, tokens_out=result.usage.output_tokens, actions=actions_list, stop_reason=action_json.stop_reason, duration_ms=duration_ms, full_response=full_output, ) ) if action_json: return action_json, None, usage_info, full_output, file_contents else: if attempt < max_retries: continue else: return ( None, f"Failed to parse Action JSON after {max_retries + 1} attempts: {error}", usage_info, full_output, {}, ) except Exception as e: error_str = str(e) if "400" in error_str and "Bad Request" in error_str: logger.warning( f"[LLMClient] 400 Bad Request on attempt {attempt + 1}/{max_retries + 1}. " f"This may indicate the prompt is too large. Error: {error_str[:200]}" ) if attempt < max_retries: continue return None, f"LLM call failed: {e}", usage_info, full_output, {} return None, "Max retries exceeded", usage_info, full_output, {}
def _log_prompt_size(self, user_message: str) -> None: """Log prompt size for monitoring.""" system_chars = len(self.system_prompt) if self.system_prompt else 0 user_chars = len(user_message) total_with_system = user_chars + system_chars logger.info( f"[LLMClient] Prompt size estimate: ~{total_with_system} characters (~{total_with_system // 4} tokens) " f"(user_message: {user_chars} chars, system: {system_chars} chars)" ) def _build_retry_message(self, user_message: str, attempt: int, error: Optional[str]) -> str: """Build message for retry attempt with error correction if needed.""" if attempt > 0 and error: error_correction = f""" [IMPORTANT] JSON parsing failed - please fix the following issue: Error details: {error} Please ensure your response is valid JSON format with the following required fields: - "actions": array containing tool call objects - "stop_reason": string, value must be "continue", "done", or "fail" Each action must contain: - "tool": tool name ("run" or "write_file") - "args": parameter object Example format: {{ "actions": [ {{"tool": "run", "args": {{"cmd": "command"}}}}, {{"tool": "write_file", "args": {{"path": "file.py", "content": "..."}}}} ], "stop_reason": "continue" }} Please output only valid JSON, do not add any other text, comments, or explanations.""" return user_message + error_correction return user_message def _print_streaming_status(self, stream_callback: Optional[Callable[[str], None]]) -> None: """Print streaming status message. Note: This is kept for backward compatibility but output system handles LLM status display via events. This may be removed in future. """ # Suppress direct print in favor of event-driven output # Output system will handle status display via LLMCallEvent and LLMStreamEvent pass def _stream_initial_response( self, current_message: str, chat_params: ChatParams, stream_callback: Optional[Callable[[str], None]], step: Optional[int] = None, task_id: Optional[str] = None, event_emitter: Optional[OutputEventEmitter] = None, ) -> ChatResult: """Stream initial LLM response and return result.""" stream_iterator = self.chat.stream( current_message, system=self.system_prompt, include_usage=True, params=chat_params, ) chunk_count = 0 total_delta_length = 0 time.time() try: for chunk in stream_iterator: chunk_count += 1 if chunk.delta: total_delta_length += len(chunk.delta) if stream_callback: # Don't print directly - let output system handle it via events stream_callback(chunk.delta) # Emit stream event if event_emitter: event_emitter.emit( LLMStreamEvent( step=step or 0, task_id=task_id or "", chunk=chunk.delta, is_complete=False, ) ) if chunk.done: # Emit stream complete event if event_emitter: event_emitter.emit( LLMStreamEvent( step=step or 0, task_id=task_id or "", chunk="", is_complete=True, ) ) break except Exception as stream_error: logger.error(f"[LLMClient] Error during streaming: {stream_error}") logger.debug(f"[LLMClient] Exception traceback: {traceback.format_exc()}") logger.debug( f"[LLMClient] Streaming stats: {chunk_count} chunks, total delta length: {total_delta_length} chars" ) # Don't print directly - let output system handle it via events # Status is shown via LLMResultEvent initial_result = stream_iterator.result.to_chat_result() logger.info( f"[LLMClient] Initial streaming result: finish_reason={initial_result.finish_reason}, " f"length={len(initial_result.text)} chars, chunks={chunk_count}, " f"delta_total_length={total_delta_length}" ) if len(initial_result.text) == 0: logger.error( f"[LLMClient] Critical error: Initial result text is empty! " f"chunks={chunk_count}, delta_total_length={total_delta_length}" ) elif len(initial_result.text) != total_delta_length: logger.warning( f"[LLMClient] Warning: Initial result length ({len(initial_result.text)}) " f"does not match delta total length ({total_delta_length})!" ) else: preview = initial_result.text[:100].replace("\n", "\\n") logger.debug(f"[LLMClient] Initial result preview: {preview}...") return initial_result def _handle_truncation( self, initial_result: ChatResult, current_message: str, stream_callback: Optional[Callable[[str], None]], step: Optional[int] = None, task_id: Optional[str] = None, event_emitter: Optional[OutputEventEmitter] = None, ) -> ChatResult: """Handle response truncation by continuing generation.""" if initial_result.finish_reason == "length": max_continue_attempts = 5 # Truncation handling - output system will show this via events if needed # Suppress direct print in favor of event-driven output try: result = self._continue_with_streaming( self.chat, initial_result, max_continue_attempts, original_user_message=current_message, stream_callback=stream_callback, ) # Don't print directly - output system handles it logger.info( f"[LLMClient] Continue generation successful, final result length: {len(result.text)} chars" ) except Exception as continue_error: logger.error(f"[LLMClient] Error during continue generation: {continue_error}") logger.warning( f"[LLMClient] Using initial result (may be incomplete), length: {len(initial_result.text)} chars" ) result = initial_result placeholders_in_initial = [ match.group(0) for match in PLACEHOLDER_DELIMITER_REGEX.finditer(initial_result.text) ] if placeholders_in_initial: logger.warning( f"[LLMClient] Initial result contains placeholders {placeholders_in_initial}, " f"but file content may be incomplete due to continue failure!" ) else: result = initial_result return result def _log_final_output(self, full_output: str, initial_result: ChatResult) -> None: """Log final output statistics.""" logger.info( f"[LLMClient] Final full_output length: {len(full_output)} chars, " f"finish_reason={initial_result.finish_reason}" ) if len(full_output) == 0: logger.error( f"[LLMClient] Critical error: Final full_output is empty! " f"Initial result length: {len(initial_result.text)} chars" ) elif len(full_output) < len(initial_result.text): logger.warning( f"[LLMClient] Warning: Final full_output ({len(full_output)} chars) " f"is shorter than initial result ({len(initial_result.text)} chars)!" ) if initial_result.finish_reason == "length": logger.info( f"[LLMClient] Continued after truncation, final full_output length: {len(full_output)} chars" ) placeholders_found = [ match.group(0) for match in PLACEHOLDER_DELIMITER_REGEX.finditer(full_output) ] logger.info(f"[LLMClient] Found placeholders: {placeholders_found}") def _log_file_contents_extraction( self, file_contents: Dict[str, str], action_json: Optional[ActionJSON], full_output: str, ) -> None: """Log placeholder contents extraction results. Placeholder contents (---((TYPE_name))--- blocks) are expected when the LLM uses tools with placeholders. For tools without placeholders, empty file_contents is normal. """ # Collect placeholders that actions expect from atloop.orchestrator.phases.placeholder_replacer import PlaceholderReplacer expected_placeholders = [] if action_json: for action in action_json.actions: tool = action.get("tool") args = action.get("args", {}) # Use shared method to get placeholder field value field_name, value = PlaceholderReplacer.get_placeholder_field_value(tool, args) if field_name and PlaceholderReplacer._is_valid_placeholder(value): expected_placeholders.append(value) if file_contents: logger.info( f"[LLMClient] Extracted {len(file_contents)} file content placeholders: {list(file_contents.keys())}" ) for placeholder, content in file_contents.items(): logger.debug(f"[LLMClient] {placeholder}: {len(content)} chars") # Check if any expected placeholders are missing missing = [p for p in expected_placeholders if p not in file_contents] if missing: # This is a normal business case - LLM may provide placeholders in next iteration # Agent loop can handle this gracefully logger.debug( f"[LLMClient] Actions reference placeholders {missing} " f"but corresponding ---((TYPE_name))--- blocks not found in LLM output " f"(will retry in next iteration)" ) elif expected_placeholders: # Actions use placeholders but we extracted nothing - LLM didn't provide content blocks # This is a normal business case - LLM may provide placeholders in next iteration logger.debug( f"[LLMClient] Actions reference placeholders {expected_placeholders} " f"but no ---((TYPE_name))--- blocks found in LLM output " f"(full_output length: {len(full_output)}, will retry in next iteration)" ) # else: No file-writing actions with placeholders (e.g., only run/read_file) - # empty file_contents is expected, no need to log def _continue_with_streaming( self, chat: Chat, initial_result: ChatResult, max_continues: int, original_user_message: str, stream_callback: Optional[Callable[[str], None]] = None, ) -> ChatResult: """ Continue generation with streaming output when response is truncated. Simple approach: construct a one-round history (user prompt + AI response), then use a continue prompt to ask LLM to continue output. Args: chat: Chat client instance initial_result: Initial result (finish_reason == "length") max_continues: Maximum number of continuation attempts original_user_message: Original user message with full context stream_callback: Optional callback function to receive streaming output chunks Returns: Merged complete result """ start_time = time.time() continue_prompt = ( "Your output was truncated. Please continue outputting the remaining content." ) all_results = [initial_result] current_result = initial_result continue_count = 0 accumulated_text = initial_result.text logger.info( f"[LLMClient] Starting streaming continue generation (max {max_continues} attempts)..." ) while current_result.finish_reason == "length" and continue_count < max_continues: continue_count += 1 logger.info(f"[LLMClient] Starting continue generation attempt {continue_count}...") if stream_callback: print( f"\n [Continue generation {continue_count}/{max_continues}] ", end="", flush=True, ) continue_history = ChatHistory() continue_history.add_user(original_user_message) continue_history.add_assistant(accumulated_text) continue_history.add_user(continue_prompt) chat_params = ChatParams( temperature=0.3, max_tokens=self.config.ai.performance.max_tokens_output, ) try: continue_stream = chat.stream_with_history( continue_history, message=None, include_usage=True, params=chat_params, ) for chunk in continue_stream: if chunk.delta: if stream_callback: print(chunk.delta, end="", flush=True) stream_callback(chunk.delta) if chunk.done: break continue_result = continue_stream.result.to_chat_result() all_results.append(continue_result) current_result = continue_result accumulated_text += continue_result.text logger.info( f"[LLMClient] Continue generation attempt {continue_count} completed, " f"length: {len(continue_result.text)} chars, " f"finish_reason: {continue_result.finish_reason}, " f"accumulated text length: {len(accumulated_text)} chars" ) placeholders_in_continue = [ match.group(0) for match in PLACEHOLDER_DELIMITER_REGEX.finditer(continue_result.text) ] if placeholders_in_continue: logger.info( f"[LLMClient] Continue result contains placeholders: {placeholders_in_continue}" ) # Detect partial placeholders (may be cut off in streaming) partial_matches = [ match.group(0) for match in PARTIAL_PLACEHOLDER_REGEX.finditer(continue_result.text) ] if partial_matches: logger.warning( f"[LLMClient] Continue result may contain partial placeholders: {partial_matches}" ) except Exception as e: logger.error( f"[LLMClient] Continue generation attempt {continue_count} failed: {e}" ) logger.error(f"[LLMClient] Exception details: {type(e).__name__}: {e}") logger.debug(f"[LLMClient] Exception traceback: {traceback.format_exc()}") if len(all_results) > 1: merged = ChatContinue.merge_results(*all_results) logger.warning( f"[LLMClient] Continue failed, returning partial merged result (may be incomplete), " f"length: {len(merged.text)} chars, contains {len(all_results)} results" ) placeholders_in_merged = [ match.group(0) for match in PLACEHOLDER_DELIMITER_REGEX.finditer(merged.text) ] if placeholders_in_merged: logger.warning( f"[LLMClient] Partial result contains placeholders {placeholders_in_merged}, " f"but file content may be incomplete due to continue failure!" ) return merged else: logger.warning( f"[LLMClient] Continue failed and only initial result available, " f"returning initial result (may be incomplete), length: {len(initial_result.text)} chars" ) return initial_result if len(all_results) == 1: full_result = all_results[0] else: full_result = ChatContinue.merge_results(*all_results) logger.debug( f"[LLMClient] Merged result: {len(all_results)} results, " f"total length: {len(full_result.text)} chars" ) for i, r in enumerate(all_results): logger.debug( f"[LLMClient] Result {i + 1}: {len(r.text)} chars, finish_reason: {r.finish_reason}" ) elapsed_time = time.time() - start_time logger.info( f"[LLMClient] Streaming continue generation completed, elapsed: {elapsed_time:.2f}s, " f"total length: {len(full_result.text)} chars, {continue_count} continues" ) all_placeholders = [ match.group(0) for match in PLACEHOLDER_DELIMITER_REGEX.finditer(full_result.text) ] logger.info(f"[LLMClient] Merged complete text contains placeholders: {all_placeholders}") return full_result
[docs] def reset_history(self): """ Reset conversation history (for new task). NOTE: This method is kept for backward compatibility but does nothing since we no longer use ChatHistory. All history is managed through Memory. """ pass
[docs] def add_tool_results_to_history( self, actions: List[Dict[str, Any]], results: List[Dict[str, Any]] ): """ Add tool execution results to conversation history. NOTE: This method is kept for backward compatibility but does nothing since we no longer use ChatHistory. Tool results are stored in Memory and included in the next memory summary. Args: actions: List of actions that were executed results: List of tool execution results """ pass