"""LLM client wrapper for lexilux."""
import logging
import time
import traceback
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from lexilux import Chat, ChatContinue, ChatHistory, ChatParams, ChatResult
# Note: ChatHistory is kept for backward compatibility but no longer used
from atloop.config.models import AtloopConfig
from atloop.llm.prompts import PromptLoader
from atloop.llm.schema import (
ActionJSON,
parse_action_json,
)
from atloop.output.emitter import OutputEventEmitter
from atloop.output.events import LLMCallEvent, LLMResultEvent, LLMStreamEvent
from atloop.skills import EnhancedSkillLoader
logger = logging.getLogger(__name__)
# Import placeholder patterns from centralized module
from atloop.llm.placeholder_patterns import (
PARTIAL_PLACEHOLDER_REGEX,
PLACEHOLDER_DELIMITER_REGEX,
)
[docs]
class LLMClient:
"""LLM client wrapper with cache-optimized history management."""
[docs]
def __init__(self, config: AtloopConfig, workspace_root: Optional[str] = None):
"""
Initialize LLM client.
Args:
config: atloop configuration
workspace_root: Optional workspace root path for project skills
"""
self.config = config
self.chat = Chat(
base_url=config.ai.completion.api_base,
api_key=config.ai.completion.api_key,
model=config.ai.completion.model,
timeout_s=120.0,
)
# Initialize prompt loader (default to English)
language = getattr(config, "prompt_language", "en")
self.prompt_loader = PromptLoader(language=language)
logger.debug(f"[LLMClient] Initialized PromptLoader with language: {language}")
# Initialize enhanced skill loader with multiple directories
from atloop.config.loader import ConfigLoader
builtin_skills_dir = Path(__file__).parent.parent / "skills" / "builtin"
project_dir = Path(workspace_root) if workspace_root else None
atloop_dir = ConfigLoader.get_atloop_dir()
additional_dirs = [atloop_dir / "skills", Path.home() / ".atloop" / "skills"]
# Filter out non-existent directories
additional_dirs = [d for d in additional_dirs if d.exists()]
self.skill_loader = EnhancedSkillLoader(
builtin_skills_dir=builtin_skills_dir,
project_dir=project_dir,
additional_dirs=additional_dirs,
)
# Initialize fixed system prompt (never changes - preserves cache)
system_template = self.prompt_loader.load("system")
tool_schema = self.generate_tool_schema()
skills_info = self._get_skills_info()
self.system_prompt = system_template.replace("{TOOL_SCHEMA}", tool_schema)
if skills_info:
self.system_prompt += f"\n\nAvailable Skills:\n{skills_info}\n\nIf a task matches a skill description, use `load_skill` to get the skill's main content and resource list, then use `load_skill_resource` to load specific resources when needed."
logger.info(
f"[LLMClient] Added {len(self.skill_loader.skills) if self.skill_loader else 0} skills to system prompt"
)
logger.debug(f"[LLMClient] Skills list:\n{skills_info}")
else:
logger.warning(
f"[LLMClient] No skills found (skill_loader={self.skill_loader is not None})"
)
def _get_skills_info(self) -> str:
"""
Get skills information for system prompt (Layer 1: metadata only).
Returns:
Formatted string with skill descriptions, or empty string if no skills
"""
if not self.skill_loader:
return ""
return self.skill_loader.get_descriptions()
[docs]
def load_prompt_template(self, template_name: str) -> str:
"""
Load prompt template using PromptLoader.
Args:
template_name: Template name (system, developer)
Returns:
Template content
"""
logger.debug(f"[LLMClient] Loading prompt template: {template_name}")
return self.prompt_loader.load(template_name)
[docs]
def build_user_message(
self,
goal: str,
budget: Dict[str, int],
state_summary: Optional[str] = None,
memory_context: Optional[str] = None, # New parameter
project_profile: Optional[str] = None,
relevant_files: Optional[str] = None,
recent_error: Optional[str] = None,
current_diff: Optional[str] = None,
test_results: Optional[str] = None,
verification_success: Optional[bool] = None,
) -> str:
"""
Build user message from context (append-only mode for cache optimization).
This method builds a user message that will be appended to the conversation history.
The system prompt is fixed and never changes, preserving LLM cache.
Args:
goal: Task goal
budget: Budget dictionary
state_summary: State summary
project_profile: Project profile
relevant_files: Relevant file snippets
recent_error: Recent error
current_diff: Current diff
test_results: Test results
verification_success: Verification success status
Returns:
User message string (to be appended to history)
"""
developer_prompt = self.load_prompt_template("developer")
test_results_section = ""
if test_results:
if verification_success is True:
test_status = "✓ **Tests Passed**"
# Guide LLM to make correct decision when verification passes
completion_reminder = """
🚨 **DECISION REQUIRED**: Tests have PASSED. You must now decide:
1. **If task goal is achieved**: Set stop_reason="done" immediately. Do not use "continue".
2. **If task goal is NOT yet achieved**: Set stop_reason="continue" and explain what remains.
**Key principle**: When verification passes AND goal is achieved, the task is complete. Use stop_reason="done" to signal completion.
"""
elif verification_success is False:
test_status = "❌ **Tests Failed**"
completion_reminder = ""
else:
test_status = "⚠️ **Test Status Unknown**"
completion_reminder = ""
test_results_section = f"""
### Latest Test/Verification Results
{test_status}
{test_results}
{completion_reminder}
"""
# Use memory_context if provided, otherwise fall back to state_summary (backward compatibility)
memory_content = (
memory_context if memory_context is not None else (state_summary or "Initial state")
)
logger.debug(
f"[LLMClient] build_user_message: memory_content length={len(memory_content) if memory_content else 0}"
)
replacements = {
"{GOAL}": goal,
"{MAX_LLM_CALLS}": str(budget.get("max_llm_calls", 30)),
"{MAX_TOOL_CALLS}": str(budget.get("max_tool_calls", 200)),
"{MAX_WALL_TIME_SEC}": str(budget.get("max_wall_time_sec", 1800)),
"{STATE_SUMMARY}": memory_content, # Use memory_context or state_summary
"{PROJECT_PROFILE}": project_profile or "Not identified",
"{RELEVANT_FILES}": relevant_files or "None",
"{RECENT_ERROR}": recent_error or "None",
"{CURRENT_DIFF}": current_diff or "No changes",
"{TEST_RESULTS}": test_results_section,
}
for placeholder, value in replacements.items():
if placeholder == "{STATE_SUMMARY}":
logger.debug(
f"[LLMClient] Replacing {{STATE_SUMMARY}}: length={len(str(value))}, preview={str(value)[:100] if value else 'None'}..."
)
developer_prompt = developer_prompt.replace(placeholder, str(value))
if "{STATE_SUMMARY}" in developer_prompt:
logger.error("[LLMClient] Error: {STATE_SUMMARY} placeholder was not replaced!")
else:
logger.debug("[LLMClient] {STATE_SUMMARY} placeholder successfully replaced")
return developer_prompt
[docs]
def build_prompt(
self,
goal: str,
budget: Dict[str, int],
state_summary: Optional[str] = None,
project_profile: Optional[str] = None,
relevant_files: Optional[str] = None,
recent_error: Optional[str] = None,
current_diff: Optional[str] = None,
test_results: Optional[str] = None,
verification_success: Optional[bool] = None,
) -> str:
"""
Build complete prompt from templates (DEPRECATED - for backward compatibility).
This method is kept for backward compatibility but should be replaced with
build_user_message() for cache optimization.
Args:
goal: Task goal
budget: Budget dictionary
state_summary: State summary
project_profile: Project profile
relevant_files: Relevant file snippets
recent_error: Recent error
current_diff: Current diff
test_results: Test results
verification_success: Verification success status
Returns:
Complete prompt string
"""
user_message = self.build_user_message(
goal=goal,
budget=budget,
state_summary=state_summary,
project_profile=project_profile,
relevant_files=relevant_files,
recent_error=recent_error,
current_diff=current_diff,
test_results=test_results,
verification_success=verification_success,
)
return f"{self.system_prompt}\n\n{user_message}"
[docs]
def plan_and_act(
self,
user_message: str,
max_retries: int = 2,
stream_callback: Optional[Callable[[str], None]] = None,
step: Optional[int] = None,
task_id: Optional[str] = None,
) -> Tuple[Optional[ActionJSON], Optional[str], Dict[str, Any], Optional[str], Dict[str, str]]:
"""
Call LLM to plan and generate actions.
Memory-only mode (no ChatHistory):
- Fixed system prompt (set once, never changes - preserves cache)
- Each call is independent with full context in user_message
- All history is managed through Memory, not conversation history
- Uses lexilux 2.0.0 stream() for ensuring complete responses
Args:
user_message: Required user message content containing all context (including memory summary)
max_retries: Maximum retry attempts if JSON parsing fails
stream_callback: Optional callback function to receive streaming output chunks
step: Optional step number for event emission
task_id: Optional task ID for event emission
Returns:
Tuple of (ActionJSON or None, error_message, usage_info, full_output_text, file_contents_dict)
file_contents_dict maps placeholder names to actual file content
"""
if not user_message or not user_message.strip():
raise ValueError("user_message is required and cannot be empty (no history mode)")
usage_info = {"total_tokens": 0, "input_tokens": 0, "output_tokens": 0}
full_output = ""
error = None
self._log_prompt_size(user_message)
# Get event emitter if step and task_id are provided
event_emitter = None
if step is not None and task_id:
event_emitter = OutputEventEmitter()
llm_call_start_time = None
for attempt in range(max_retries + 1):
try:
current_message = self._build_retry_message(user_message, attempt, error)
self._print_streaming_status(stream_callback)
chat_params = ChatParams(
temperature=0.3,
max_tokens=self.config.ai.performance.max_tokens_output,
)
# Emit LLM call event before calling
if event_emitter:
llm_call_start_time = time.time()
event_emitter.emit(
LLMCallEvent(
step=step,
task_id=task_id,
model=self.config.ai.completion.model,
prompt_length=len(current_message),
)
)
initial_result = self._stream_initial_response(
current_message, chat_params, stream_callback, step, task_id, event_emitter
)
result = self._handle_truncation(
initial_result, current_message, stream_callback, step, task_id, event_emitter
)
usage_info["total_tokens"] = result.usage.total_tokens
usage_info["input_tokens"] = result.usage.input_tokens
usage_info["output_tokens"] = result.usage.output_tokens
full_output = result.text
self._log_final_output(full_output, initial_result)
action_json, error, file_contents = parse_action_json(full_output)
self._log_file_contents_extraction(file_contents, action_json, full_output)
# Emit LLM result event
if event_emitter and action_json and llm_call_start_time:
# ActionJSON.actions is List[Dict[str, Any]], not objects with attributes
actions_list = [
{"tool": action.get("tool", ""), "args": action.get("args", {})}
for action in action_json.actions
]
duration_ms = int((time.time() - llm_call_start_time) * 1000)
event_emitter.emit(
LLMResultEvent(
step=step,
task_id=task_id,
model=self.config.ai.completion.model,
tokens_in=result.usage.input_tokens,
tokens_out=result.usage.output_tokens,
actions=actions_list,
stop_reason=action_json.stop_reason,
duration_ms=duration_ms,
full_response=full_output,
)
)
if action_json:
return action_json, None, usage_info, full_output, file_contents
else:
if attempt < max_retries:
continue
else:
return (
None,
f"Failed to parse Action JSON after {max_retries + 1} attempts: {error}",
usage_info,
full_output,
{},
)
except Exception as e:
error_str = str(e)
if "400" in error_str and "Bad Request" in error_str:
logger.warning(
f"[LLMClient] 400 Bad Request on attempt {attempt + 1}/{max_retries + 1}. "
f"This may indicate the prompt is too large. Error: {error_str[:200]}"
)
if attempt < max_retries:
continue
return None, f"LLM call failed: {e}", usage_info, full_output, {}
return None, "Max retries exceeded", usage_info, full_output, {}
def _log_prompt_size(self, user_message: str) -> None:
"""Log prompt size for monitoring."""
system_chars = len(self.system_prompt) if self.system_prompt else 0
user_chars = len(user_message)
total_with_system = user_chars + system_chars
logger.info(
f"[LLMClient] Prompt size estimate: ~{total_with_system} characters (~{total_with_system // 4} tokens) "
f"(user_message: {user_chars} chars, system: {system_chars} chars)"
)
def _build_retry_message(self, user_message: str, attempt: int, error: Optional[str]) -> str:
"""Build message for retry attempt with error correction if needed."""
if attempt > 0 and error:
error_correction = f"""
[IMPORTANT] JSON parsing failed - please fix the following issue:
Error details: {error}
Please ensure your response is valid JSON format with the following required fields:
- "actions": array containing tool call objects
- "stop_reason": string, value must be "continue", "done", or "fail"
Each action must contain:
- "tool": tool name ("run" or "write_file")
- "args": parameter object
Example format:
{{
"actions": [
{{"tool": "run", "args": {{"cmd": "command"}}}},
{{"tool": "write_file", "args": {{"path": "file.py", "content": "..."}}}}
],
"stop_reason": "continue"
}}
Please output only valid JSON, do not add any other text, comments, or explanations."""
return user_message + error_correction
return user_message
def _print_streaming_status(self, stream_callback: Optional[Callable[[str], None]]) -> None:
"""Print streaming status message.
Note: This is kept for backward compatibility but output system
handles LLM status display via events. This may be removed in future.
"""
# Suppress direct print in favor of event-driven output
# Output system will handle status display via LLMCallEvent and LLMStreamEvent
pass
def _stream_initial_response(
self,
current_message: str,
chat_params: ChatParams,
stream_callback: Optional[Callable[[str], None]],
step: Optional[int] = None,
task_id: Optional[str] = None,
event_emitter: Optional[OutputEventEmitter] = None,
) -> ChatResult:
"""Stream initial LLM response and return result."""
stream_iterator = self.chat.stream(
current_message,
system=self.system_prompt,
include_usage=True,
params=chat_params,
)
chunk_count = 0
total_delta_length = 0
time.time()
try:
for chunk in stream_iterator:
chunk_count += 1
if chunk.delta:
total_delta_length += len(chunk.delta)
if stream_callback:
# Don't print directly - let output system handle it via events
stream_callback(chunk.delta)
# Emit stream event
if event_emitter:
event_emitter.emit(
LLMStreamEvent(
step=step or 0,
task_id=task_id or "",
chunk=chunk.delta,
is_complete=False,
)
)
if chunk.done:
# Emit stream complete event
if event_emitter:
event_emitter.emit(
LLMStreamEvent(
step=step or 0,
task_id=task_id or "",
chunk="",
is_complete=True,
)
)
break
except Exception as stream_error:
logger.error(f"[LLMClient] Error during streaming: {stream_error}")
logger.debug(f"[LLMClient] Exception traceback: {traceback.format_exc()}")
logger.debug(
f"[LLMClient] Streaming stats: {chunk_count} chunks, total delta length: {total_delta_length} chars"
)
# Don't print directly - let output system handle it via events
# Status is shown via LLMResultEvent
initial_result = stream_iterator.result.to_chat_result()
logger.info(
f"[LLMClient] Initial streaming result: finish_reason={initial_result.finish_reason}, "
f"length={len(initial_result.text)} chars, chunks={chunk_count}, "
f"delta_total_length={total_delta_length}"
)
if len(initial_result.text) == 0:
logger.error(
f"[LLMClient] Critical error: Initial result text is empty! "
f"chunks={chunk_count}, delta_total_length={total_delta_length}"
)
elif len(initial_result.text) != total_delta_length:
logger.warning(
f"[LLMClient] Warning: Initial result length ({len(initial_result.text)}) "
f"does not match delta total length ({total_delta_length})!"
)
else:
preview = initial_result.text[:100].replace("\n", "\\n")
logger.debug(f"[LLMClient] Initial result preview: {preview}...")
return initial_result
def _handle_truncation(
self,
initial_result: ChatResult,
current_message: str,
stream_callback: Optional[Callable[[str], None]],
step: Optional[int] = None,
task_id: Optional[str] = None,
event_emitter: Optional[OutputEventEmitter] = None,
) -> ChatResult:
"""Handle response truncation by continuing generation."""
if initial_result.finish_reason == "length":
max_continue_attempts = 5
# Truncation handling - output system will show this via events if needed
# Suppress direct print in favor of event-driven output
try:
result = self._continue_with_streaming(
self.chat,
initial_result,
max_continue_attempts,
original_user_message=current_message,
stream_callback=stream_callback,
)
# Don't print directly - output system handles it
logger.info(
f"[LLMClient] Continue generation successful, final result length: {len(result.text)} chars"
)
except Exception as continue_error:
logger.error(f"[LLMClient] Error during continue generation: {continue_error}")
logger.warning(
f"[LLMClient] Using initial result (may be incomplete), length: {len(initial_result.text)} chars"
)
result = initial_result
placeholders_in_initial = [
match.group(0)
for match in PLACEHOLDER_DELIMITER_REGEX.finditer(initial_result.text)
]
if placeholders_in_initial:
logger.warning(
f"[LLMClient] Initial result contains placeholders {placeholders_in_initial}, "
f"but file content may be incomplete due to continue failure!"
)
else:
result = initial_result
return result
def _log_final_output(self, full_output: str, initial_result: ChatResult) -> None:
"""Log final output statistics."""
logger.info(
f"[LLMClient] Final full_output length: {len(full_output)} chars, "
f"finish_reason={initial_result.finish_reason}"
)
if len(full_output) == 0:
logger.error(
f"[LLMClient] Critical error: Final full_output is empty! "
f"Initial result length: {len(initial_result.text)} chars"
)
elif len(full_output) < len(initial_result.text):
logger.warning(
f"[LLMClient] Warning: Final full_output ({len(full_output)} chars) "
f"is shorter than initial result ({len(initial_result.text)} chars)!"
)
if initial_result.finish_reason == "length":
logger.info(
f"[LLMClient] Continued after truncation, final full_output length: {len(full_output)} chars"
)
placeholders_found = [
match.group(0) for match in PLACEHOLDER_DELIMITER_REGEX.finditer(full_output)
]
logger.info(f"[LLMClient] Found placeholders: {placeholders_found}")
def _log_file_contents_extraction(
self,
file_contents: Dict[str, str],
action_json: Optional[ActionJSON],
full_output: str,
) -> None:
"""Log placeholder contents extraction results.
Placeholder contents (---((TYPE_name))--- blocks) are expected when the LLM
uses tools with placeholders. For tools without placeholders, empty file_contents is normal.
"""
# Collect placeholders that actions expect
from atloop.orchestrator.phases.placeholder_replacer import PlaceholderReplacer
expected_placeholders = []
if action_json:
for action in action_json.actions:
tool = action.get("tool")
args = action.get("args", {})
# Use shared method to get placeholder field value
field_name, value = PlaceholderReplacer.get_placeholder_field_value(tool, args)
if field_name and PlaceholderReplacer._is_valid_placeholder(value):
expected_placeholders.append(value)
if file_contents:
logger.info(
f"[LLMClient] Extracted {len(file_contents)} file content placeholders: {list(file_contents.keys())}"
)
for placeholder, content in file_contents.items():
logger.debug(f"[LLMClient] {placeholder}: {len(content)} chars")
# Check if any expected placeholders are missing
missing = [p for p in expected_placeholders if p not in file_contents]
if missing:
# This is a normal business case - LLM may provide placeholders in next iteration
# Agent loop can handle this gracefully
logger.debug(
f"[LLMClient] Actions reference placeholders {missing} "
f"but corresponding ---((TYPE_name))--- blocks not found in LLM output "
f"(will retry in next iteration)"
)
elif expected_placeholders:
# Actions use placeholders but we extracted nothing - LLM didn't provide content blocks
# This is a normal business case - LLM may provide placeholders in next iteration
logger.debug(
f"[LLMClient] Actions reference placeholders {expected_placeholders} "
f"but no ---((TYPE_name))--- blocks found in LLM output "
f"(full_output length: {len(full_output)}, will retry in next iteration)"
)
# else: No file-writing actions with placeholders (e.g., only run/read_file) -
# empty file_contents is expected, no need to log
def _continue_with_streaming(
self,
chat: Chat,
initial_result: ChatResult,
max_continues: int,
original_user_message: str,
stream_callback: Optional[Callable[[str], None]] = None,
) -> ChatResult:
"""
Continue generation with streaming output when response is truncated.
Simple approach: construct a one-round history (user prompt + AI response),
then use a continue prompt to ask LLM to continue output.
Args:
chat: Chat client instance
initial_result: Initial result (finish_reason == "length")
max_continues: Maximum number of continuation attempts
original_user_message: Original user message with full context
stream_callback: Optional callback function to receive streaming output chunks
Returns:
Merged complete result
"""
start_time = time.time()
continue_prompt = (
"Your output was truncated. Please continue outputting the remaining content."
)
all_results = [initial_result]
current_result = initial_result
continue_count = 0
accumulated_text = initial_result.text
logger.info(
f"[LLMClient] Starting streaming continue generation (max {max_continues} attempts)..."
)
while current_result.finish_reason == "length" and continue_count < max_continues:
continue_count += 1
logger.info(f"[LLMClient] Starting continue generation attempt {continue_count}...")
if stream_callback:
print(
f"\n [Continue generation {continue_count}/{max_continues}] ",
end="",
flush=True,
)
continue_history = ChatHistory()
continue_history.add_user(original_user_message)
continue_history.add_assistant(accumulated_text)
continue_history.add_user(continue_prompt)
chat_params = ChatParams(
temperature=0.3,
max_tokens=self.config.ai.performance.max_tokens_output,
)
try:
continue_stream = chat.stream_with_history(
continue_history,
message=None,
include_usage=True,
params=chat_params,
)
for chunk in continue_stream:
if chunk.delta:
if stream_callback:
print(chunk.delta, end="", flush=True)
stream_callback(chunk.delta)
if chunk.done:
break
continue_result = continue_stream.result.to_chat_result()
all_results.append(continue_result)
current_result = continue_result
accumulated_text += continue_result.text
logger.info(
f"[LLMClient] Continue generation attempt {continue_count} completed, "
f"length: {len(continue_result.text)} chars, "
f"finish_reason: {continue_result.finish_reason}, "
f"accumulated text length: {len(accumulated_text)} chars"
)
placeholders_in_continue = [
match.group(0)
for match in PLACEHOLDER_DELIMITER_REGEX.finditer(continue_result.text)
]
if placeholders_in_continue:
logger.info(
f"[LLMClient] Continue result contains placeholders: {placeholders_in_continue}"
)
# Detect partial placeholders (may be cut off in streaming)
partial_matches = [
match.group(0)
for match in PARTIAL_PLACEHOLDER_REGEX.finditer(continue_result.text)
]
if partial_matches:
logger.warning(
f"[LLMClient] Continue result may contain partial placeholders: {partial_matches}"
)
except Exception as e:
logger.error(
f"[LLMClient] Continue generation attempt {continue_count} failed: {e}"
)
logger.error(f"[LLMClient] Exception details: {type(e).__name__}: {e}")
logger.debug(f"[LLMClient] Exception traceback: {traceback.format_exc()}")
if len(all_results) > 1:
merged = ChatContinue.merge_results(*all_results)
logger.warning(
f"[LLMClient] Continue failed, returning partial merged result (may be incomplete), "
f"length: {len(merged.text)} chars, contains {len(all_results)} results"
)
placeholders_in_merged = [
match.group(0)
for match in PLACEHOLDER_DELIMITER_REGEX.finditer(merged.text)
]
if placeholders_in_merged:
logger.warning(
f"[LLMClient] Partial result contains placeholders {placeholders_in_merged}, "
f"but file content may be incomplete due to continue failure!"
)
return merged
else:
logger.warning(
f"[LLMClient] Continue failed and only initial result available, "
f"returning initial result (may be incomplete), length: {len(initial_result.text)} chars"
)
return initial_result
if len(all_results) == 1:
full_result = all_results[0]
else:
full_result = ChatContinue.merge_results(*all_results)
logger.debug(
f"[LLMClient] Merged result: {len(all_results)} results, "
f"total length: {len(full_result.text)} chars"
)
for i, r in enumerate(all_results):
logger.debug(
f"[LLMClient] Result {i + 1}: {len(r.text)} chars, finish_reason: {r.finish_reason}"
)
elapsed_time = time.time() - start_time
logger.info(
f"[LLMClient] Streaming continue generation completed, elapsed: {elapsed_time:.2f}s, "
f"total length: {len(full_result.text)} chars, {continue_count} continues"
)
all_placeholders = [
match.group(0) for match in PLACEHOLDER_DELIMITER_REGEX.finditer(full_result.text)
]
logger.info(f"[LLMClient] Merged complete text contains placeholders: {all_placeholders}")
return full_result
[docs]
def reset_history(self):
"""
Reset conversation history (for new task).
NOTE: This method is kept for backward compatibility but does nothing
since we no longer use ChatHistory. All history is managed through Memory.
"""
pass
[docs]
def add_tool_results_to_history(
self, actions: List[Dict[str, Any]], results: List[Dict[str, Any]]
):
"""
Add tool execution results to conversation history.
NOTE: This method is kept for backward compatibility but does nothing
since we no longer use ChatHistory. Tool results are stored in Memory
and included in the next memory summary.
Args:
actions: List of actions that were executed
results: List of tool execution results
"""
pass