local api = vim.api
local fn = vim.fn
local uv = vim.uv
local curl = require("plenary.curl")
local ACPClient = require("avante.libs.acp_client")
local Utils = require("avante.utils")
local Prompts = require("avante.utils.prompts")
local Config = require("avante.config")
local Path = require("avante.path")
local PPath = require("plenary.path")
local Providers = require("avante.providers")
local LLMToolHelpers = require("avante.llm_tools.helpers")
local LLMTools = require("avante.llm_tools")
local History = require("avante.history")
local HistoryRender = require("avante.history.render")
local ACPConfirmAdapter = require("avante.ui.acp_confirm_adapter")
---@class avante.LLM
local M = {}
M.CANCEL_PATTERN = "AvanteLLMEscape"
------------------------------Prompt and type------------------------------
local group = api.nvim_create_augroup("avante_llm", { clear = true })
---@param prev_memory string | nil
---@param history_messages avante.HistoryMessage[]
---@param cb fun(memory: avante.ChatMemory | nil): nil
function M.summarize_memory(prev_memory, history_messages, cb)
local system_prompt =
[[You are an expert coding assistant. Your goal is to generate a concise, structured summary of the conversation below that captures all essential information needed to continue development after context replacement. Include tasks performed, code areas modified or reviewed, key decisions or assumptions, test results or errors, and outstanding tasks or next steps.]]
if #history_messages == 0 then
cb(nil)
return
end
local latest_timestamp = nil
local latest_message_uuid = nil
for idx = #history_messages, 1, -1 do
local message = history_messages[idx]
if not message.is_dummy then
latest_timestamp = message.timestamp
latest_message_uuid = message.uuid
break
end
end
if not latest_timestamp or not latest_message_uuid then
cb(nil)
return
end
local conversation_items = vim
.iter(history_messages)
:map(function(msg) return msg.message.role .. ": " .. HistoryRender.message_to_text(msg, history_messages) end)
:totable()
local conversation_text = table.concat(conversation_items, "\n")
local user_prompt = "Here is the conversation so far:\n"
.. conversation_text
.. "\n\nPlease summarize this conversation, covering:\n1. Tasks performed and outcomes\n2. Code files, modules, or functions modified or examined\n3. Important decisions or assumptions made\n4. Errors encountered and test or build results\n5. Remaining tasks, open questions, or next steps\nProvide the summary in a clear, concise format."
if prev_memory then user_prompt = user_prompt .. "\n\nThe previous summary is:\n\n" .. prev_memory end
local messages = {
{
role = "user",
content = user_prompt,
},
}
local response_content = ""
local provider = Providers.get_memory_summary_provider()
M.curl({
provider = provider,
prompt_opts = {
system_prompt = system_prompt,
messages = messages,
},
handler_opts = {
on_start = function(_) end,
on_chunk = function(chunk)
if not chunk then return end
response_content = response_content .. chunk
end,
on_stop = function(stop_opts)
if stop_opts.error ~= nil then
Utils.error(string.format("summarize memory failed: %s", vim.inspect(stop_opts.error)))
return
end
if stop_opts.reason == "complete" then
response_content = Utils.trim_think_content(response_content)
local memory = {
content = response_content,
last_summarized_timestamp = latest_timestamp,
last_message_uuid = latest_message_uuid,
}
cb(memory)
else
cb(nil)
end
end,
},
})
end
---@param user_input string
---@param cb fun(error: string | nil): nil
function M.generate_todos(user_input, cb)
local system_prompt =
[[You are an expert coding assistant. Please generate a todo list to complete the task based on the user input and pass the todo list to the write_todos tool.]]
local messages = {
{ role = "user", content = user_input },
}
local provider = Providers[Config.provider]
local tools = {
require("avante.llm_tools.write_todos"),
}
local history_messages = {}
cb = Utils.call_once(cb)
M.curl({
provider = provider,
prompt_opts = {
system_prompt = system_prompt,
messages = messages,
tools = tools,
},
handler_opts = {
on_start = function() end,
on_chunk = function() end,
on_messages_add = function(msgs)
msgs = vim.islist(msgs) and msgs or { msgs }
for _, msg in ipairs(msgs) do
if not msg.uuid then msg.uuid = Utils.uuid() end
local idx = nil
for i, m in ipairs(history_messages) do
if m.uuid == msg.uuid then
idx = i
break
end
end
if idx ~= nil then
history_messages[idx] = msg
else
table.insert(history_messages, msg)
end
end
end,
on_stop = function(stop_opts)
if stop_opts.error ~= nil then
Utils.error(string.format("generate todos failed: %s", vim.inspect(stop_opts.error)))
return
end
if stop_opts.reason == "tool_use" then
local pending_tools = History.get_pending_tools(history_messages)
for _, pending_tool in ipairs(pending_tools) do
if pending_tool.state == "generated" and pending_tool.name == "write_todos" then
local result = LLMTools.process_tool_use(tools, pending_tool, {
session_ctx = {},
on_complete = function() cb() end,
tool_use_id = pending_tool.id,
})
if result ~= nil then cb() end
end
end
else
cb()
end
end,
},
})
end
---@class avante.AgentLoopOptions
---@field system_prompt string
---@field user_input string
---@field tools AvanteLLMTool[]
---@field on_complete fun(error: string | nil): nil
---@field session_ctx? table
---@field on_tool_log? fun(tool_id: string, tool_name: string, log: string, state: AvanteLLMToolUseState): nil
---@field on_start? fun(): nil
---@field on_chunk? fun(chunk: string): nil
---@field on_messages_add? fun(messages: avante.HistoryMessage[]): nil
---@param opts avante.AgentLoopOptions
function M.agent_loop(opts)
local messages = {}
table.insert(messages, { role = "user", content = "" .. opts.user_input .. "" })
local memory_content = nil
local history_messages = {}
local function no_op() end
local session_ctx = opts.session_ctx or {}
local stream_options = {
ask = true,
memory = memory_content,
code_lang = "unknown",
provider = Providers[Config.provider],
get_history_messages = function() return history_messages end,
on_tool_log = opts.on_tool_log or no_op,
on_messages_add = function(msgs)
msgs = vim.islist(msgs) and msgs or { msgs }
for _, msg in ipairs(msgs) do
local idx = nil
for i, m in ipairs(history_messages) do
if m.uuid == msg.uuid then
idx = i
break
end
end
if idx ~= nil then
history_messages[idx] = msg
else
table.insert(history_messages, msg)
end
end
if opts.on_messages_add then opts.on_messages_add(msgs) end
end,
session_ctx = session_ctx,
prompt_opts = {
system_prompt = opts.system_prompt,
tools = opts.tools,
messages = messages,
},
on_start = opts.on_start or no_op,
on_chunk = opts.on_chunk or no_op,
on_stop = function(stop_opts)
if stop_opts.error ~= nil then
local err = string.format("dispatch_agent failed: %s", vim.inspect(stop_opts.error))
opts.on_complete(err)
return
end
opts.on_complete(nil)
end,
}
local function on_memory_summarize(pending_compaction_history_messages)
local compaction_history_message_uuids = {}
for _, msg in ipairs(pending_compaction_history_messages or {}) do
compaction_history_message_uuids[msg.uuid] = true
end
M.summarize_memory(memory_content, pending_compaction_history_messages or {}, function(memory)
if memory then stream_options.memory = memory.content end
local new_history_messages = {}
for _, msg in ipairs(history_messages) do
if not compaction_history_message_uuids[msg.uuid] then table.insert(new_history_messages, msg) end
end
history_messages = new_history_messages
M._stream(stream_options)
end)
end
stream_options.on_memory_summarize = on_memory_summarize
M._stream(stream_options)
end
---@param opts AvanteGeneratePromptsOptions
---@return AvantePromptOptions
function M.generate_prompts(opts)
local project_instruction_file = Config.instructions_file or "avante.md"
local project_root = Utils.root.get()
local instruction_file_path = PPath:new(project_root, project_instruction_file)
if instruction_file_path:exists() then
local lines = Utils.read_file_from_buf_or_disk(instruction_file_path:absolute())
local instruction_content = lines and table.concat(lines, "\n") or ""
if instruction_content then opts.instructions = (opts.instructions or "") .. "\n" .. instruction_content end
end
local mode = opts.mode or Config.mode
-- Check if the instructions contains an image path
local image_paths = {}
if opts.prompt_opts and opts.prompt_opts.image_paths then
image_paths = vim.list_extend(image_paths, opts.prompt_opts.image_paths)
end
Path.prompts.initialize(Path.prompts.get_templates_dir(project_root), project_root)
local system_info = Utils.get_system_info()
local selected_files = opts.selected_files or {}
if opts.selected_filepaths then
for _, filepath in ipairs(opts.selected_filepaths) do
local lines, error = Utils.read_file_from_buf_or_disk(filepath)
if error ~= nil then
Utils.error("error reading file: " .. error)
else
local content = table.concat(lines or {}, "\n")
local filetype = Utils.get_filetype(filepath)
table.insert(selected_files, { path = filepath, content = content, file_type = filetype })
end
end
end
local viewed_files = {}
if opts.history_messages then
for _, message in ipairs(opts.history_messages) do
local use = History.Helpers.get_tool_use_data(message)
if use and use.name == "view" and use.input.path then
local uniform_path = Utils.uniform_path(use.input.path)
viewed_files[uniform_path] = use.id
end
end
end
selected_files = vim.iter(selected_files):filter(function(file) return viewed_files[file.path] == nil end):totable()
local is_acp_provider = false
if not opts.provider then is_acp_provider = Config.acp_providers[Config.provider] ~= nil end
local model_name = "unknown"
local context_window = nil
local use_react_prompt = false
if not is_acp_provider then
local provider = opts.provider or Providers[Config.provider]
model_name = provider.model or "unknown"
local provider_conf = Providers.parse_config(provider)
use_react_prompt = provider_conf.use_ReAct_prompt
context_window = provider.context_window
end
local template_opts = {
ask = opts.ask, -- TODO: add mode without ask instruction
code_lang = opts.code_lang,
selected_files = selected_files,
selected_code = opts.selected_code,
recently_viewed_files = opts.recently_viewed_files,
project_context = opts.project_context,
diagnostics = opts.diagnostics,
system_info = system_info,
model_name = model_name,
memory = opts.memory,
enable_fastapply = Config.behaviour.enable_fastapply,
use_react_prompt = use_react_prompt,
}
-- Removed the original todos processing logic, now handled in context_messages
local system_prompt
if opts.prompt_opts and opts.prompt_opts.system_prompt then
system_prompt = opts.prompt_opts.system_prompt
else
system_prompt = Path.prompts.render_mode(mode, template_opts)
end
if Config.system_prompt ~= nil then
local custom_system_prompt
if type(Config.system_prompt) == "function" then custom_system_prompt = Config.system_prompt() end
if type(Config.system_prompt) == "string" then custom_system_prompt = Config.system_prompt end
if custom_system_prompt ~= nil and custom_system_prompt ~= "" and custom_system_prompt ~= "null" then
system_prompt = system_prompt .. "\n\n" .. custom_system_prompt
end
end
---@type AvanteLLMMessage[]
local context_messages = {}
if opts.prompt_opts and opts.prompt_opts.messages then
context_messages = vim.list_extend(context_messages, opts.prompt_opts.messages)
end
if opts.project_context ~= nil and opts.project_context ~= "" and opts.project_context ~= "null" then
local project_context = Path.prompts.render_file("_project.avanterules", template_opts)
if project_context ~= "" then
table.insert(context_messages, { role = "user", content = project_context, visible = false, is_context = true })
end
end
if opts.diagnostics ~= nil and opts.diagnostics ~= "" and opts.diagnostics ~= "null" then
local diagnostics = Path.prompts.render_file("_diagnostics.avanterules", template_opts)
if diagnostics ~= "" then
table.insert(context_messages, { role = "user", content = diagnostics, visible = false, is_context = true })
end
end
if #selected_files > 0 or opts.selected_code ~= nil then
local code_context = Path.prompts.render_file("_context.avanterules", template_opts)
if code_context ~= "" then
table.insert(context_messages, { role = "user", content = code_context, visible = false, is_context = true })
end
end
if opts.memory ~= nil and opts.memory ~= "" and opts.memory ~= "null" then
local memory = Path.prompts.render_file("_memory.avanterules", template_opts)
if memory ~= "" then
table.insert(context_messages, { role = "user", content = memory, visible = false, is_context = true })
end
end
local pending_compaction_history_messages = {}
if opts.prompt_opts and opts.prompt_opts.pending_compaction_history_messages then
pending_compaction_history_messages =
vim.list_extend(pending_compaction_history_messages, opts.prompt_opts.pending_compaction_history_messages)
end
if context_window and context_window > 0 then
Utils.debug("Context window", context_window)
if opts.get_tokens_usage then
local tokens_usage = opts.get_tokens_usage()
if tokens_usage and tokens_usage.prompt_tokens ~= nil and tokens_usage.completion_tokens ~= nil then
local target_tokens = context_window * 0.9
local tokens_count = tokens_usage.prompt_tokens + tokens_usage.completion_tokens
Utils.debug("Tokens count", tokens_count)
if tokens_count > target_tokens then pending_compaction_history_messages = opts.history_messages end
end
end
end
---@type AvanteLLMMessage[]
local messages = vim.deepcopy(context_messages)
for _, msg in ipairs(opts.history_messages or {}) do
local message = msg.message
if msg.is_user_submission then
message = vim.deepcopy(message)
local content = message.content
if Config.mode == "agentic" then
if type(content) == "string" then
message.content = "" .. content .. ""
elseif type(content) == "table" then
for idx, item in ipairs(content) do
if type(item) == "string" then
item = "" .. item .. ""
content[idx] = item
elseif type(item) == "table" and item.type == "text" then
item.content = "" .. item.content .. ""
content[idx] = item
end
end
end
end
end
table.insert(messages, message)
end
messages = vim
.iter(messages)
:filter(function(msg) return type(msg.content) ~= "string" or msg.content ~= "" end)
:totable()
if opts.instructions ~= nil and opts.instructions ~= "" then
messages = vim.list_extend(messages, { { role = "user", content = opts.instructions } })
end
opts.session_ctx = opts.session_ctx or {}
opts.session_ctx.system_prompt = system_prompt
opts.session_ctx.messages = messages
local tools = {}
if opts.tools then tools = vim.list_extend(tools, opts.tools) end
if opts.prompt_opts and opts.prompt_opts.tools then tools = vim.list_extend(tools, opts.prompt_opts.tools) end
-- Set tools to nil if empty to avoid sending empty arrays to APIs that require
-- tools to be either non-existent or have at least one item
if #tools == 0 then tools = nil end
local agents_rules = Prompts.get_agents_rules_prompt()
if agents_rules then system_prompt = system_prompt .. "\n\n" .. agents_rules end
local cursor_rules = Prompts.get_cursor_rules_prompt(selected_files)
if cursor_rules then system_prompt = system_prompt .. "\n\n" .. cursor_rules end
---@type AvantePromptOptions
return {
system_prompt = system_prompt,
messages = messages,
image_paths = image_paths,
tools = tools,
pending_compaction_history_messages = pending_compaction_history_messages,
}
end
---@param opts AvanteGeneratePromptsOptions
---@return integer
function M.calculate_tokens(opts)
if Config.acp_providers[Config.provider] then return 0 end
local prompt_opts = M.generate_prompts(opts)
local tokens = Utils.tokens.calculate_tokens(prompt_opts.system_prompt)
for _, message in ipairs(prompt_opts.messages) do
tokens = tokens + Utils.tokens.calculate_tokens(message.content)
end
return tokens
end
local parse_headers = function(headers_file)
local headers = {}
local file = io.open(headers_file, "r")
if file then
for line in file:lines() do
line = line:gsub("\r$", "")
local key, value = line:match("^%s*(.-)%s*:%s*(.*)$")
if key and value then headers[key] = value end
end
if Config.debug then
-- Original header file was deleted by plenary.nvim
-- see https://github.com/nvim-lua/plenary.nvim/blob/b9fd5226c2f76c951fc8ed5923d85e4de065e509/lua/plenary/curl.lua#L268
local debug_headers_file = headers_file .. ".log"
Utils.debug("curl response headers file:", debug_headers_file)
local debug_file = io.open(debug_headers_file, "a")
if debug_file then
file:seek("set")
debug_file:write(file:read("*all"))
debug_file:close()
end
end
file:close()
end
return headers
end
---@param opts avante.CurlOpts
function M.curl(opts)
local provider = opts.provider
local prompt_opts = opts.prompt_opts
local handler_opts = opts.handler_opts
local orig_on_stop = handler_opts.on_stop
local stopped = false
---@param stop_opts AvanteLLMStopCallbackOptions
handler_opts.on_stop = function(stop_opts)
if stop_opts and not stop_opts.streaming_tool_use then
if stopped then return end
stopped = true
end
if orig_on_stop then return orig_on_stop(stop_opts) end
end
local spec = provider:parse_curl_args(prompt_opts)
if not spec then
handler_opts.on_stop({ reason = "error", error = "Provider configuration error" })
return
end
---@type string
local current_event_state = nil
local turn_ctx = {}
turn_ctx.turn_id = Utils.uuid()
local response_body = ""
---@param line string
local function parse_stream_data(line)
local event = line:match("^event:%s*(.+)$")
if event then
current_event_state = event
return
end
local data_match = line:match("^data:%s*(.+)$")
if data_match then
response_body = ""
provider:parse_response(turn_ctx, data_match, current_event_state, handler_opts)
else
response_body = response_body .. line
local ok, jsn = pcall(vim.json.decode, response_body)
if ok then
if jsn.error then
handler_opts.on_stop({ reason = "error", error = jsn.error })
else
provider:parse_response(turn_ctx, response_body, current_event_state, handler_opts)
end
response_body = ""
end
end
end
local function parse_response_without_stream(data)
provider:parse_response_without_stream(data, current_event_state, handler_opts)
end
local completed = false
local active_job ---@type Job|nil
local temp_file = fn.tempname()
local curl_body_file = temp_file .. "-request-body.json"
local resp_body_file = temp_file .. "-response-body.txt"
local headers_file = temp_file .. "-response-headers.txt"
-- Check if this is a multipart form request (specifically for watsonx)
local is_multipart_form = spec.headers and spec.headers["Content-Type"] == "multipart/form-data"
local curl_options
if is_multipart_form then
-- For multipart form data, use the form parameter
-- spec.body should be a table with form field data
curl_options = {
headers = spec.headers,
proxy = spec.proxy,
insecure = spec.insecure,
form = spec.body,
raw = spec.rawArgs,
}
else
-- For regular JSON requests, encode as JSON and write to file
local json_content = vim.json.encode(spec.body)
fn.writefile(vim.split(json_content, "\n"), curl_body_file)
curl_options = {
headers = spec.headers,
proxy = spec.proxy,
insecure = spec.insecure,
body = curl_body_file,
raw = spec.rawArgs,
}
end
Utils.debug("curl request body file:", curl_body_file)
Utils.debug("curl response body file:", resp_body_file)
local function cleanup()
if Config.debug then return end
vim.schedule(function()
fn.delete(curl_body_file)
pcall(fn.delete, resp_body_file)
end)
end
local headers_reported = false
local started_job, new_active_job = pcall(
curl.post,
spec.url,
vim.tbl_extend("force", curl_options, {
dump = { "-D", headers_file },
stream = function(err, data, _)
if not headers_reported and opts.on_response_headers then
headers_reported = true
opts.on_response_headers(parse_headers(headers_file))
end
if err then
completed = true
handler_opts.on_stop({ reason = "error", error = err })
return
end
if not data then return end
if Config.debug then
if type(data) == "string" then
local file = io.open(resp_body_file, "a")
if file then
file:write(data .. "\n")
file:close()
end
end
end
vim.schedule(function()
if provider.parse_stream_data ~= nil then
provider:parse_stream_data(turn_ctx, data, handler_opts)
else
parse_stream_data(data)
end
end)
end,
on_error = function(err)
if err.exit == 23 then
local xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
if not xdg_runtime_dir or fn.isdirectory(xdg_runtime_dir) == 0 then
Utils.error(
"$XDG_RUNTIME_DIR="
.. xdg_runtime_dir
.. " is set but does not exist. curl could not write output. Please make sure it exists, or unset.",
{ title = "Avante" }
)
elseif not uv.fs_access(xdg_runtime_dir, "w") then
Utils.error(
"$XDG_RUNTIME_DIR="
.. xdg_runtime_dir
.. " exists but is not writable. curl could not write output. Please make sure it is writable, or unset.",
{ title = "Avante" }
)
end
end
active_job = nil
if not completed then
completed = true
cleanup()
handler_opts.on_stop({ reason = "error", error = err })
end
end,
callback = function(result)
active_job = nil
cleanup()
local headers_map = vim.iter(result.headers):fold({}, function(acc, value)
local pieces = vim.split(value, ":")
local key = pieces[1]
local remain = vim.list_slice(pieces, 2)
if not remain then return acc end
local val = Utils.trim_spaces(table.concat(remain, ":"))
acc[key] = val
return acc
end)
if result.status >= 400 then
if provider.on_error then
provider.on_error(result)
else
Utils.error("API request failed with status " .. result.status, { once = true, title = "Avante" })
end
local retry_after = 10
if headers_map["retry-after"] then retry_after = tonumber(headers_map["retry-after"]) or 10 end
if result.status == 429 then
handler_opts.on_stop({ reason = "rate_limit", retry_after = retry_after })
return
end
vim.schedule(function()
if not completed then
completed = true
handler_opts.on_stop({
reason = "error",
error = "API request failed with status " .. result.status .. ". Body: " .. vim.inspect(result.body),
})
end
end)
end
-- If stream is not enabled, then handle the response here
if provider:is_disable_stream() and result.status == 200 then
vim.schedule(function()
completed = true
parse_response_without_stream(result.body)
end)
end
if result.status == 200 and spec.url:match("https://openrouter.ai") then
local content_type = headers_map["content-type"]
if content_type and content_type:match("text/html") then
handler_opts.on_stop({
reason = "error",
error = "Your openrouter endpoint setting is incorrect, please set it to https://openrouter.ai/api/v1",
})
end
end
end,
})
)
if not started_job then
local error_msg = vim.inspect(new_active_job)
Utils.error("Failed to make LLM request: " .. error_msg)
handler_opts.on_stop({ reason = "error", error = error_msg })
return
end
active_job = new_active_job
api.nvim_create_autocmd("User", {
group = group,
pattern = M.CANCEL_PATTERN,
once = true,
callback = function()
-- Error: cannot resume dead coroutine
if active_job then
-- Mark as completed first to prevent error handler from running
completed = true
-- 检查 active_job 的状态
local job_is_alive = pcall(function() return active_job:is_closing() == false end)
-- 只有当 job 仍然活跃时才尝试关闭它
if job_is_alive then
-- Attempt to shutdown the active job, but ignore any errors
xpcall(function() active_job:shutdown() end, function(err)
Utils.debug("Ignored error during job shutdown: " .. vim.inspect(err))
return err
end)
else
Utils.debug("Job already closed, skipping shutdown")
end
Utils.debug("LLM request cancelled")
active_job = nil
-- Clean up and notify of cancellation
cleanup()
vim.schedule(function() handler_opts.on_stop({ reason = "cancelled" }) end)
end
end,
})
return active_job
end
local retry_timer = nil
local abort_retry_timer = false
local function stop_retry_timer()
if retry_timer then
retry_timer:stop()
pcall(function() retry_timer:close() end)
retry_timer = nil
end
end
-- Intelligently truncate chat history for session recovery to avoid token limits
---@param history_messages table[]
---@return table[]
local function truncate_history_for_recovery(history_messages)
if not history_messages or #history_messages == 0 then return {} end
-- Get configuration parameters with validation and sensible defaults
local recovery_config = Config.session_recovery or {}
local MAX_RECOVERY_MESSAGES = math.max(1, math.min(recovery_config.max_history_messages or 20, 50)) -- Increased from 10 to 20
local MAX_MESSAGE_LENGTH = math.max(100, math.min(recovery_config.max_message_length or 1000, 10000))
-- Keep recent messages starting from the newest
local truncated = {}
local count = 0
-- CRITICAL: For session recovery, prioritize keeping conversation pairs (user+assistant)
-- This preserves the full context of recent interactions
local conversation_pairs = {}
local last_user_message = nil
for i = #history_messages, 1, -1 do
local message = history_messages[i]
if message and message.message and message.message.content then
local role = message.message.role
-- Build conversation pairs for better context preservation
if role == "user" then
last_user_message = message
elseif role == "assistant" and last_user_message then
-- Found a complete conversation pair
table.insert(conversation_pairs, 1, { user = last_user_message, assistant = message })
last_user_message = nil
end
end
end
-- Add complete conversation pairs first (better context preservation)
for _, pair in ipairs(conversation_pairs) do
if count >= MAX_RECOVERY_MESSAGES then break end
-- Add user message
table.insert(truncated, 1, pair.user)
count = count + 1
if count < MAX_RECOVERY_MESSAGES then
-- Add assistant response
table.insert(truncated, 1, pair.assistant)
count = count + 1
end
end
-- Add remaining individual messages if space allows
for i = #history_messages, 1, -1 do
if count >= MAX_RECOVERY_MESSAGES then break end
local message = history_messages[i]
if message and message.message and message.message.content then
-- Skip if already added as part of conversation pair
local already_added = false
for _, added_msg in ipairs(truncated) do
if added_msg.uuid == message.uuid then
already_added = true
break
end
end
if not already_added then
-- Prioritize user messages and important assistant replies, skip verbose tool call results
local content = message.message.content
local role = message.message.role
-- Skip overly verbose tool call results with multiple code blocks
if
role == "assistant"
and type(content) == "string"
and content:match("```.*```.*```")
and #content > MAX_MESSAGE_LENGTH * 2
then
goto continue
end
-- Handle string content
if type(content) == "string" then
if #content > MAX_MESSAGE_LENGTH then
-- Truncate overly long messages
local truncated_message = vim.deepcopy(message)
truncated_message.message.content = content:sub(1, MAX_MESSAGE_LENGTH) .. "...[truncated]"
table.insert(truncated, 1, truncated_message)
else
table.insert(truncated, 1, message)
end
-- Handle table content (multimodal messages)
elseif type(content) == "table" then
local truncated_message = vim.deepcopy(message)
-- Safely handle table content
if truncated_message.message.content and type(truncated_message.message.content) == "table" then
for j, item in ipairs(truncated_message.message.content) do
-- Handle various content item types
if type(item) == "string" and #item > MAX_MESSAGE_LENGTH then
truncated_message.message.content[j] = item:sub(1, MAX_MESSAGE_LENGTH) .. "...[truncated]"
elseif
type(item) == "table"
and item.text
and type(item.text) == "string"
and #item.text > MAX_MESSAGE_LENGTH
then
-- Handle {type="text", text="..."} format
item.text = item.text:sub(1, MAX_MESSAGE_LENGTH) .. "...[truncated]"
end
end
end
table.insert(truncated, 1, truncated_message)
else
table.insert(truncated, 1, message)
end
count = count + 1
end
end
::continue::
end
return truncated
end
---@param opts AvanteLLMStreamOptions
function M._stream_acp(opts)
Utils.debug("use ACP", Config.provider)
---@type table
local tool_call_messages = {}
---@type avante.HistoryMessage
local last_tool_call_message = nil
local acp_provider = Config.acp_providers[Config.provider]
local prev_text_message_content = ""
local history_messages = {}
local get_history_messages = function()
if opts.get_history_messages then return opts.get_history_messages() end
return history_messages
end
local on_messages_add = function(messages)
if opts.on_chunk then
for _, message in ipairs(messages) do
if message.message.role == "assistant" and type(message.message.content) == "string" then
local chunk = message.message.content:sub(#prev_text_message_content + 1)
opts.on_chunk(chunk)
prev_text_message_content = message.message.content
end
end
end
if opts.on_messages_add then
opts.on_messages_add(messages)
else
for _, message in ipairs(messages) do
local idx = nil
for i, m in ipairs(history_messages) do
if m.uuid == message.uuid then
idx = i
break
end
end
if idx ~= nil then
history_messages[idx] = message
else
table.insert(history_messages, message)
end
end
end
end
local function add_tool_call_message(update)
local message = History.Message:new("assistant", {
type = "tool_use",
id = update.toolCallId,
name = update.kind or update.title,
input = update.rawInput or {},
}, {
uuid = update.toolCallId,
})
last_tool_call_message = message
message.acp_tool_call = update
if update.status == "pending" or update.status == "in_progress" then message.is_calling = true end
tool_call_messages[update.toolCallId] = message
if update.rawInput then
local description = update.rawInput.description
if description then
message.tool_use_logs = message.tool_use_logs or {}
table.insert(message.tool_use_logs, description)
end
end
on_messages_add({ message })
return message
end
local acp_client = opts.acp_client
local session_id = opts.acp_session_id
if not acp_client then
local acp_config = vim.tbl_deep_extend("force", acp_provider, {
---@type ACPHandlers
handlers = {
on_session_update = function(update)
if update.sessionUpdate == "plan" then
local todos = {}
for idx, entry in ipairs(update.entries) do
local status = "todo"
if entry.status == "in_progress" then status = "doing" end
if entry.status == "completed" then status = "done" end
---@type avante.TODO
local todo = {
id = tostring(idx),
content = entry.content,
status = status,
priority = entry.priority,
}
table.insert(todos, todo)
end
vim.schedule(function()
if opts.update_todos then opts.update_todos(todos) end
end)
return
end
if update.sessionUpdate == "agent_message_chunk" then
if update.content.type == "text" then
local messages = get_history_messages()
local last_message = messages[#messages]
if last_message and last_message.message.role == "assistant" then
local has_text = false
local content = last_message.message.content
if type(content) == "string" then
last_message.message.content = last_message.message.content .. update.content.text
has_text = true
elseif type(content) == "table" then
for idx, item in ipairs(content) do
if type(item) == "string" then
content[idx] = item .. update.content.text
has_text = true
end
if type(item) == "table" and item.type == "text" then
item.text = item.text .. update.content.text
has_text = true
end
end
end
if has_text then
on_messages_add({ last_message })
return
end
end
local message = History.Message:new("assistant", update.content.text)
on_messages_add({ message })
end
end
if update.sessionUpdate == "agent_thought_chunk" then
if update.content.type == "text" then
local messages = get_history_messages()
local last_message = messages[#messages]
if last_message and last_message.message.role == "assistant" then
local is_thinking = false
local content = last_message.message.content
if type(content) == "table" then
for idx, item in ipairs(content) do
if type(item) == "table" and item.type == "thinking" then
is_thinking = true
content[idx].thinking = content[idx].thinking .. update.content.text
end
end
end
if is_thinking then
on_messages_add({ last_message })
return
end
end
local message = History.Message:new("assistant", {
type = "thinking",
thinking = update.content.text,
})
on_messages_add({ message })
end
end
if update.sessionUpdate == "tool_call" then
add_tool_call_message(update)
local sidebar = require("avante").get()
if
Config.behaviour.acp_follow_agent_locations
and sidebar
and not sidebar.is_in_full_view -- don't follow when in Zen mode
and update.kind == "edit" -- to avoid entering more than once
and update.locations
and #update.locations > 0
then
vim.schedule(function()
if not sidebar:is_open() then return end
-- Find a valid code window (non-sidebar window)
local code_winid = nil
if sidebar.code.winid and sidebar.code.winid ~= 0 and api.nvim_win_is_valid(sidebar.code.winid) then
code_winid = sidebar.code.winid
else
-- Find first non-sidebar window in the current tab
local all_wins = api.nvim_tabpage_list_wins(0)
for _, winid in ipairs(all_wins) do
if api.nvim_win_is_valid(winid) and not sidebar:is_sidebar_winid(winid) then
code_winid = winid
break
end
end
end
if not code_winid then return end
local now = uv.now()
local last_auto_nav = vim.g.avante_last_auto_nav or 0
local grace_period = 2000
-- Check if user navigated manually recently
if now - last_auto_nav < grace_period then return end
-- Only follow first location to avoid rapid jumping
local location = update.locations[1]
if not location or not location.path then return end
local abs_path = Utils.join_paths(Utils.get_project_root(), location.path)
local bufnr = vim.fn.bufnr(abs_path, true)
if not bufnr or bufnr == -1 then return end
if not api.nvim_buf_is_loaded(bufnr) then pcall(vim.fn.bufload, bufnr) end
local ok = pcall(api.nvim_win_set_buf, code_winid, bufnr)
if not ok then return end
local line = location.line or 1
local line_count = api.nvim_buf_line_count(bufnr)
local target_line = math.min(line, line_count)
pcall(api.nvim_win_set_cursor, code_winid, { target_line, 0 })
pcall(api.nvim_win_call, code_winid, function()
vim.cmd("normal! zz") -- Center line in viewport
end)
vim.g.avante_last_auto_nav = now
end)
end
end
if update.sessionUpdate == "tool_call_update" then
local tool_call_message = tool_call_messages[update.toolCallId]
if not tool_call_message then
tool_call_message = History.Message:new("assistant", {
type = "tool_use",
id = update.toolCallId,
name = "",
})
tool_call_messages[update.toolCallId] = tool_call_message
tool_call_message.acp_tool_call = update
end
if tool_call_message.acp_tool_call then
if update.content and next(update.content) == nil then update.content = nil end
tool_call_message.acp_tool_call = vim.tbl_deep_extend("force", tool_call_message.acp_tool_call, update)
end
tool_call_message.tool_use_logs = tool_call_message.tool_use_logs or {}
tool_call_message.tool_use_log_lines = tool_call_message.tool_use_log_lines or {}
local tool_result_message
if update.status == "pending" or update.status == "in_progress" then
tool_call_message.is_calling = true
tool_call_message.state = "generating"
elseif update.status == "completed" or update.status == "failed" then
tool_call_message.is_calling = false
tool_call_message.state = "generated"
tool_result_message = History.Message:new("assistant", {
type = "tool_result",
tool_use_id = update.toolCallId,
content = nil,
is_error = update.status == "failed",
is_user_declined = update.status == "cancelled",
})
end
local messages = { tool_call_message }
if tool_result_message then table.insert(messages, tool_result_message) end
on_messages_add(messages)
end
if update.sessionUpdate == "available_commands_update" then
local commands = update.availableCommands
local has_cmp, cmp = pcall(require, "cmp")
if has_cmp then
local slash_commands_id = require("avante").slash_commands_id
if slash_commands_id ~= nil then cmp.unregister_source(slash_commands_id) end
for _, command in ipairs(commands) do
local exists = false
for _, command_ in ipairs(Config.slash_commands) do
if command_.name == command.name then
exists = true
break
end
end
if not exists then
table.insert(Config.slash_commands, {
name = command.name,
description = command.description,
details = command.description,
})
end
end
local avante = require("avante")
avante.slash_commands_id = cmp.register_source("avante_commands", require("cmp_avante.commands"):new())
end
end
end,
on_request_permission = function(tool_call, options, callback)
local sidebar = require("avante").get()
if not sidebar then
Utils.error("Avante sidebar not found")
return
end
---@cast tool_call avante.acp.ToolCall
local message = tool_call_messages[tool_call.toolCallId]
if not message then
message = add_tool_call_message(tool_call)
else
if message.acp_tool_call then
if tool_call.content and next(tool_call.content) == nil then tool_call.content = nil end
message.acp_tool_call = vim.tbl_deep_extend("force", message.acp_tool_call, tool_call)
end
end
on_messages_add({ message })
local description = HistoryRender.get_tool_display_name(message)
LLMToolHelpers.confirm(description, function(ok)
local acp_mapped_options = ACPConfirmAdapter.map_acp_options(options)
if ok and opts.session_ctx and opts.session_ctx.always_yes then
callback(acp_mapped_options.all)
elseif ok then
callback(acp_mapped_options.yes)
else
callback(acp_mapped_options.no)
end
sidebar.scroll = true
sidebar._history_cache_invalidated = true
sidebar:update_content("")
end, {
focus = true,
skip_reject_prompt = true,
permission_options = options,
}, opts.session_ctx, tool_call.kind)
end,
on_read_file = function(path, line, limit, callback, error_callback)
local abs_path = Utils.to_absolute_path(path)
local lines, err, errname = Utils.read_file_from_buf_or_disk(abs_path)
if err then
if error_callback then
local code = errname == "ENOENT" and ACPClient.ERROR_CODES.RESOURCE_NOT_FOUND or nil
error_callback(err, code)
end
return
end
lines = lines or {}
if line ~= nil and limit ~= nil then lines = vim.list_slice(lines, line, line + limit) end
local content = table.concat(lines, "\n")
if
last_tool_call_message
and last_tool_call_message.acp_tool_call
and last_tool_call_message.acp_tool_call.kind == "read"
then
if
last_tool_call_message.acp_tool_call.content
and next(last_tool_call_message.acp_tool_call.content) == nil
then
last_tool_call_message.acp_tool_call.content = {
{
type = "content",
content = {
type = "text",
text = content,
},
},
}
end
end
callback(content)
end,
on_write_file = function(path, content, callback)
local abs_path = Utils.to_absolute_path(path)
local file = io.open(abs_path, "w")
if file then
file:write(content)
file:close()
local buffers = vim.tbl_filter(
function(bufnr)
return vim.api.nvim_buf_is_valid(bufnr)
and vim.fn.fnamemodify(vim.api.nvim_buf_get_name(bufnr), ":p")
== vim.fn.fnamemodify(abs_path, ":p")
end,
vim.api.nvim_list_bufs()
)
for _, buf in ipairs(buffers) do
vim.api.nvim_buf_call(buf, function() vim.cmd("edit") end)
end
callback(nil)
return
end
callback("Failed to write file: " .. abs_path)
end,
},
})
acp_client = ACPClient:new(acp_config)
acp_client:connect(function(conn_err)
if conn_err then
opts.on_stop({ reason = "error", error = conn_err })
return
end
-- Register ACP client for global cleanup on exit (Fix Issue #2749)
local client_id = "acp_" .. tostring(acp_client) .. "_" .. os.time()
local ok, Avante = pcall(require, "avante")
if ok and Avante.register_acp_client then Avante.register_acp_client(client_id, acp_client) end
-- If we create a new client and it does not support sesion loading,
-- remove the old session
if not acp_client.agent_capabilities.loadSession then opts.acp_session_id = nil end
if opts.on_save_acp_client then opts.on_save_acp_client(acp_client) end
session_id = opts.acp_session_id
if not session_id then
M._create_acp_session_and_continue(opts, acp_client)
else
M._load_acp_session_and_continue(opts, acp_client, session_id)
end
end)
return
elseif not session_id then
M._create_acp_session_and_continue(opts, acp_client)
return
end
if opts.just_connect_acp_client then return end
M._continue_stream_acp(opts, acp_client, session_id)
end
---@param opts AvanteLLMStreamOptions
---@param acp_client avante.acp.ACPClient
function M._create_acp_session_and_continue(opts, acp_client)
local project_root = Utils.root.get()
local acp_provider = Config.acp_providers[Config.provider] or {}
local mcp_servers = acp_provider.mcp_servers or {}
acp_client:create_session(project_root, mcp_servers, function(session_id_, err)
if err then
opts.on_stop({ reason = "error", error = err })
return
end
if not session_id_ then
opts.on_stop({ reason = "error", error = "Failed to create session" })
return
end
opts.acp_session_id = session_id_
if opts.on_save_acp_session_id then opts.on_save_acp_session_id(session_id_) end
if opts.just_connect_acp_client then return end
M._continue_stream_acp(opts, acp_client, session_id_)
end)
end
---@param opts AvanteLLMStreamOptions
---@param acp_client avante.acp.ACPClient
---@param session_id string
function M._load_acp_session_and_continue(opts, acp_client, session_id)
local project_root = Utils.root.get()
acp_client:load_session(session_id, project_root, {}, function(_, err)
if err then
-- Failed to load session, create a new one. It happens after switching acp providers
M._create_acp_session_and_continue(opts, acp_client)
return
end
if opts.just_connect_acp_client then return end
M._continue_stream_acp(opts, acp_client, session_id)
end)
end
---@param opts AvanteLLMStreamOptions
---@param acp_client avante.acp.ACPClient
---@param session_id string
function M._continue_stream_acp(opts, acp_client, session_id)
local prompt = {}
local donot_use_builtin_system_prompt = opts.history_messages ~= nil and #opts.history_messages > 0
if donot_use_builtin_system_prompt then
if opts.selected_filepaths then
for _, filepath in ipairs(opts.selected_filepaths) do
local abs_path = Utils.to_absolute_path(filepath)
local file_name = vim.fn.fnamemodify(abs_path, ":t")
local prompt_item = acp_client:create_resource_link_content("file://" .. abs_path, file_name)
table.insert(prompt, prompt_item)
end
end
if opts.selected_code then
local prompt_item = {
type = "text",
text = string.format(
"\n%s\n%s\n",
opts.selected_code.path,
opts.selected_code.content
),
}
table.insert(prompt, prompt_item)
end
end
local history_messages = opts.history_messages or {}
-- DEBUG: Log history message details
Utils.debug("ACP history messages count: " .. #history_messages)
for i, msg in ipairs(history_messages) do
if msg and msg.message then
Utils.debug(
"History msg "
.. i
.. ": role="
.. (msg.message.role or "unknown")
.. ", has_content="
.. tostring(msg.message.content ~= nil)
)
if msg.message.role == "assistant" then
Utils.debug("Found assistant message " .. i .. ": " .. tostring(msg.message.content):sub(1, 100))
end
end
end
-- DEBUG: Log session recovery state
Utils.debug(
"Session recovery state: _is_session_recovery="
.. tostring(rawget(opts, "_is_session_recovery"))
.. ", acp_session_id="
.. tostring(opts.acp_session_id)
)
-- CRITICAL: Enhanced session recovery with full context preservation
if rawget(opts, "_is_session_recovery") and opts.acp_session_id then
-- For session recovery, preserve full conversation context
Utils.info("ACP session recovery: preserving full conversation context")
-- Add all recent messages (both user and assistant) for better context
local recent_messages = {}
local recovery_config = Config.session_recovery or {}
local include_history_count = recovery_config.include_history_count or 15 -- Default to 15 for better context
-- Get recent messages from truncated history
local start_idx = math.max(1, #history_messages - include_history_count + 1)
Utils.debug("Including history from index " .. start_idx .. " to " .. #history_messages)
for i = start_idx, #history_messages do
local message = history_messages[i]
if message and message.message then
table.insert(recent_messages, message)
Utils.debug("Adding message " .. i .. " to recent_messages: role=" .. (message.message.role or "unknown"))
end
end
Utils.info("ACP recovery: including " .. #recent_messages .. " recent messages")
-- DEBUG: Log what we're about to add to prompt
for i, msg in ipairs(recent_messages) do
if msg and msg.message then
Utils.debug("Adding to prompt: " .. i .. " role=" .. (msg.message.role or "unknown"))
end
end
-- CRITICAL: Add all recent messages to prompt for complete context
for _, message in ipairs(recent_messages) do
local role = message.message.role
local content = message.message.content
Utils.debug("Processing message: role=" .. (role or "unknown") .. ", content_type=" .. type(content))
-- Format based on role
local role_tag = role == "user" and "previous_user_message" or "previous_assistant_message"
if type(content) == "table" then
for _, item in ipairs(content) do
if type(item) == "string" then
table.insert(prompt, {
type = "text",
text = "<" .. role_tag .. ">" .. item .. "" .. role_tag .. ">",
})
Utils.debug("Added assistant table content: " .. item:sub(1, 50) .. "...")
elseif type(item) == "table" and item.type == "text" then
table.insert(prompt, {
type = "text",
text = "<" .. role_tag .. ">" .. item.text .. "" .. role_tag .. ">",
})
Utils.debug("Added assistant text content: " .. item.text:sub(1, 50) .. "...")
end
end
else
table.insert(prompt, {
type = "text",
text = "<" .. role_tag .. ">" .. content .. "" .. role_tag .. ">",
})
if role == "assistant" then
Utils.debug("Added assistant content: " .. tostring(content):sub(1, 50) .. "...")
end
end
end
-- Add context about session recovery with more detail
if #recent_messages > 0 then
table.insert(prompt, {
type = "text",
text = "Continuing from previous ACP session with "
.. #recent_messages
.. " recent messages preserved for context",
})
end
elseif opts.acp_session_id then
-- Original logic for non-recovery session continuation
local recovery_config = Config.session_recovery or {}
local include_history_count = recovery_config.include_history_count or 5
local user_messages_added = 0
for i = #history_messages, 1, -1 do
local message = history_messages[i]
if message.message.role == "user" and user_messages_added < include_history_count then
local content = message.message.content
if type(content) == "table" then
for _, item in ipairs(content) do
if type(item) == "string" then
table.insert(prompt, {
type = "text",
text = "" .. item .. "",
})
elseif type(item) == "table" and item.type == "text" then
table.insert(prompt, {
type = "text",
text = "" .. item.text .. "",
})
end
end
elseif type(content) == "string" then
table.insert(prompt, {
type = "text",
text = "" .. content .. "",
})
end
user_messages_added = user_messages_added + 1
end
end
-- Add context about session recovery
if user_messages_added > 0 then
table.insert(prompt, {
type = "text",
text = "Continuing from previous session with "
.. user_messages_added
.. " recent user messages",
})
end
else
if donot_use_builtin_system_prompt then
-- Include all user messages for better context preservation
for _, message in ipairs(history_messages) do
if message.message.role == "user" then
local content = message.message.content
if type(content) == "table" then
for _, item in ipairs(content) do
if type(item) == "string" then
table.insert(prompt, {
type = "text",
text = item,
})
elseif type(item) == "table" and item.type == "text" then
table.insert(prompt, {
type = "text",
text = item.text,
})
end
end
else
table.insert(prompt, {
type = "text",
text = content,
})
end
end
end
else
local prompt_opts = M.generate_prompts(opts)
table.insert(prompt, {
type = "text",
text = prompt_opts.system_prompt,
})
for _, message in ipairs(prompt_opts.messages) do
if message.role == "user" then
table.insert(prompt, {
type = "text",
text = message.content,
})
end
end
end
end
local cancelled = false
local stop_cmd_id = api.nvim_create_autocmd("User", {
group = group,
pattern = M.CANCEL_PATTERN,
once = true,
callback = function()
cancelled = true
local cancelled_text = "\n*[Request cancelled by user.]*\n"
if opts.on_chunk then opts.on_chunk(cancelled_text) end
if opts.on_messages_add then
local message = History.Message:new("assistant", cancelled_text, {
just_for_display = true,
})
opts.on_messages_add({ message })
end
acp_client:cancel_session(session_id)
opts.on_stop({ reason = "cancelled" })
end,
})
acp_client:send_prompt(session_id, prompt, function(_, err_)
if cancelled then return end
vim.schedule(function() api.nvim_del_autocmd(stop_cmd_id) end)
if err_ then
-- ACP-specific session recovery: Check for session not found error
-- Check for session recovery conditions
local recovery_config = Config.session_recovery or {}
local recovery_enabled = recovery_config.enabled ~= false -- Default enabled unless explicitly disabled
local is_session_not_found = false
if err_.code == -32603 and err_.data and err_.data.details then
local details = err_.data.details
-- Support both Claude format ("Session not found") and Gemini-CLI format ("Session not found: session-id")
is_session_not_found = details == "Session not found" or details:match("^Session not found:")
end
if recovery_enabled and is_session_not_found and not rawget(opts, "_session_recovery_attempted") then
-- Mark recovery attempt to prevent infinite loops
rawset(opts, "_session_recovery_attempted", true)
-- DEBUG: Log recovery attempt
Utils.debug("Session recovery attempt detected, setting _session_recovery_attempted flag")
-- Clear invalid session ID
if opts.on_save_acp_session_id then
opts.on_save_acp_session_id("") -- Use empty string instead of nil
end
-- Clear invalid session for recovery - let global cleanup handle ACP processes
vim.schedule(function()
opts.acp_client = nil
opts.acp_session_id = nil
end)
-- CRITICAL: Preserve full history for better context retention
-- Only truncate if explicitly configured to do so, otherwise keep full history
local original_history = opts.history_messages or {}
local truncated_history
-- Check if history truncation is explicitly enabled
local should_truncate = recovery_config.truncate_history ~= false -- Default to true for backward compatibility
-- DEBUG: Log original history details
Utils.debug("Original history for recovery: " .. #original_history .. " messages")
for i, msg in ipairs(original_history) do
if msg and msg.message then
Utils.debug("Original history " .. i .. ": role=" .. (msg.message.role or "unknown"))
end
end
if should_truncate and #original_history > 20 then -- Only truncate if history is long enough (20条)
-- Safely call truncation function
local ok, result = pcall(truncate_history_for_recovery, original_history)
if ok then
truncated_history = result
Utils.info(
"History truncated from "
.. #original_history
.. " to "
.. #truncated_history
.. " messages for recovery"
)
else
Utils.warn("Failed to truncate history for recovery: " .. tostring(result))
truncated_history = original_history -- Use full history as fallback
end
else
-- Use full history for better context retention
truncated_history = original_history
Utils.debug("Using full history for session recovery: " .. #truncated_history .. " messages")
end
-- DEBUG: Log truncated history details
Utils.debug("Truncated history for recovery: " .. #truncated_history .. " messages")
for i, msg in ipairs(truncated_history) do
if msg and msg.message then
Utils.debug("Truncated history " .. i .. ": role=" .. (msg.message.role or "unknown"))
end
end
opts.history_messages = truncated_history
Utils.info(
string.format(
"Session expired, recovering with %d recent messages (from %d total)...",
#truncated_history,
#original_history
)
)
-- CRITICAL: Use vim.schedule to move recovery out of fast event context
-- This prevents E5560 errors by avoiding vim.fn calls in fast event context
vim.schedule(function()
Utils.debug("Session recovery: clearing old session ID and retrying...")
-- Clean up recovery flags for fresh session state management
rawset(opts, "_session_recovery_attempted", nil)
-- Mark this as a recovery attempt to preserve history context
rawset(opts, "_is_session_recovery", true)
-- Update UI state if available
if opts.on_state_change then opts.on_state_change("generating") end
-- CRITICAL: Ensure history messages are preserved in recovery
Utils.info("Session recovery retry with " .. #(opts.history_messages or {}) .. " history messages")
-- DEBUG: Log recovery history details
local recovery_history = opts.history_messages or {}
Utils.debug("Recovery history messages: " .. #recovery_history)
for i, msg in ipairs(recovery_history) do
if msg and msg.message then
Utils.debug("Recovery msg " .. i .. ": role=" .. (msg.message.role or "unknown"))
if msg.message.role == "assistant" then
Utils.debug("Recovery assistant content: " .. tostring(msg.message.content):sub(1, 100))
end
end
end
-- Retry with truncated history to rebuild context in new session
M._stream_acp(opts)
end)
-- CRITICAL: Return immediately to prevent further processing in fast event context
return
end
opts.on_stop({ reason = "error", error = err_ })
return
end
opts.on_stop({ reason = "complete" })
end)
end
---@param opts AvanteLLMStreamOptions
function M._stream(opts)
-- Reset the cancellation flag at the start of a new request
if LLMToolHelpers then LLMToolHelpers.is_cancelled = false end
local acp_provider = Config.acp_providers[Config.provider]
if acp_provider then return M._stream_acp(opts) end
local provider = opts.provider or Providers[Config.provider]
opts.session_ctx = opts.session_ctx or {}
if not opts.session_ctx.on_messages_add then opts.session_ctx.on_messages_add = opts.on_messages_add end
if not opts.session_ctx.on_state_change then opts.session_ctx.on_state_change = opts.on_state_change end
if not opts.session_ctx.on_start then opts.session_ctx.on_start = opts.on_start end
if not opts.session_ctx.on_chunk then opts.session_ctx.on_chunk = opts.on_chunk end
if not opts.session_ctx.on_stop then opts.session_ctx.on_stop = opts.on_stop end
if not opts.session_ctx.on_tool_log then opts.session_ctx.on_tool_log = opts.on_tool_log end
if not opts.session_ctx.get_history_messages then
opts.session_ctx.get_history_messages = opts.get_history_messages
end
---@cast provider AvanteProviderFunctor
local prompt_opts = M.generate_prompts(opts)
if
prompt_opts.pending_compaction_history_messages
and #prompt_opts.pending_compaction_history_messages > 0
and opts.on_memory_summarize
then
opts.on_memory_summarize(prompt_opts.pending_compaction_history_messages)
return
end
local resp_headers = {}
local function dispatch_cancel_message()
local cancelled_text = "\n*[Request cancelled by user.]*\n"
if opts.on_chunk then opts.on_chunk(cancelled_text) end
if opts.on_messages_add then
local message = History.Message:new("assistant", cancelled_text, {
just_for_display = true,
})
opts.on_messages_add({ message })
end
return opts.on_stop({ reason = "cancelled" })
end
---@type AvanteHandlerOptions
local handler_opts = {
on_messages_add = opts.on_messages_add,
on_state_change = opts.on_state_change,
update_tokens_usage = opts.update_tokens_usage,
on_start = opts.on_start,
on_chunk = opts.on_chunk,
on_stop = function(stop_opts)
if stop_opts.usage and opts.update_tokens_usage then opts.update_tokens_usage(stop_opts.usage) end
---@param tool_uses AvantePartialLLMToolUse[]
---@param tool_use_index integer
---@param tool_results AvanteLLMToolResult[]
local function handle_next_tool_use(
tool_uses,
tool_use_messages,
tool_use_index,
tool_results,
streaming_tool_use
)
if tool_use_index > #tool_uses then
---@type avante.HistoryMessage[]
local messages = {}
for _, tool_result in ipairs(tool_results) do
messages[#messages + 1] = History.Message:new("user", {
type = "tool_result",
tool_use_id = tool_result.tool_use_id,
content = tool_result.content,
is_error = tool_result.is_error,
is_user_declined = tool_result.is_user_declined,
})
end
if opts.on_messages_add then opts.on_messages_add(messages) end
local the_last_tool_use = tool_uses[#tool_uses]
if the_last_tool_use and the_last_tool_use.name == "attempt_completion" then
opts.on_stop({ reason = "complete" })
return
end
local new_opts = vim.tbl_deep_extend("force", opts, {
history_messages = opts.get_history_messages and opts.get_history_messages() or {},
})
if provider.get_rate_limit_sleep_time then
local sleep_time = provider:get_rate_limit_sleep_time(resp_headers)
if sleep_time and sleep_time > 0 then
Utils.info("Rate limit reached. Sleeping for " .. sleep_time .. " seconds ...")
vim.defer_fn(function() M._stream(new_opts) end, sleep_time * 1000)
return
end
end
if not streaming_tool_use then M._stream(new_opts) end
return
end
local partial_tool_use = tool_uses[tool_use_index]
local partial_tool_use_message = tool_use_messages[tool_use_index]
---@param result string | nil
---@param error string | nil
local function handle_tool_result(result, error)
partial_tool_use_message.is_calling = false
if opts.on_messages_add then opts.on_messages_add({ partial_tool_use_message }) end
-- Special handling for cancellation signal from tools
if error == LLMToolHelpers.CANCEL_TOKEN then
Utils.debug("Tool execution was cancelled by user")
local cancelled_text = "\n*[Request cancelled by user during tool execution.]*\n"
if opts.on_chunk then opts.on_chunk(cancelled_text) end
if opts.on_messages_add then
local message = History.Message:new("assistant", cancelled_text, {
just_for_display = true,
})
opts.on_messages_add({ message })
end
return opts.on_stop({ reason = "cancelled" })
end
local is_user_declined = error and error:match("^User declined")
local tool_result = {
tool_use_id = partial_tool_use.id,
content = error ~= nil and error or result,
is_error = error ~= nil, -- Keep this as error to prevent processing as success
is_user_declined = is_user_declined ~= nil,
}
table.insert(tool_results, tool_result)
return handle_next_tool_use(tool_uses, tool_use_messages, tool_use_index + 1, tool_results)
end
local is_edit_tool_use = Utils.is_edit_tool_use(partial_tool_use)
local support_streaming = false
local llm_tool = vim.iter(prompt_opts.tools):find(function(tool) return tool.name == partial_tool_use.name end)
if llm_tool then support_streaming = llm_tool.support_streaming == true end
---@type AvanteLLMToolFuncOpts
local tool_use_opts = {
session_ctx = opts.session_ctx,
tool_use_id = partial_tool_use.id,
streaming = partial_tool_use.state == "generating",
on_complete = function() end,
}
if partial_tool_use.state == "generating" then
if not is_edit_tool_use and not support_streaming then return end
if type(partial_tool_use.input) == "table" then
LLMTools.process_tool_use(prompt_opts.tools, partial_tool_use, tool_use_opts)
end
return
end
if streaming_tool_use then return end
partial_tool_use_message.is_calling = true
if opts.on_messages_add then opts.on_messages_add({ partial_tool_use_message }) end
-- Either on_complete handles the tool result asynchronously or we receive the result and error synchronously when either is not nil
local result, error = LLMTools.process_tool_use(prompt_opts.tools, partial_tool_use, {
session_ctx = opts.session_ctx,
on_log = opts.on_tool_log,
set_tool_use_store = opts.set_tool_use_store,
on_complete = handle_tool_result,
tool_use_id = partial_tool_use.id,
})
if result ~= nil or error ~= nil then return handle_tool_result(result, error) end
end
if stop_opts.reason == "cancelled" then dispatch_cancel_message() end
local history_messages = opts.get_history_messages and opts.get_history_messages({ all = true }) or {}
local pending_tools, pending_tool_use_messages = History.get_pending_tools(history_messages)
if stop_opts.reason == "complete" and Config.mode == "agentic" then
local completed_attempt_completion_tool_use = nil
for idx = #history_messages, 1, -1 do
local message = history_messages[idx]
if message.is_user_submission then break end
local use = History.Helpers.get_tool_use_data(message)
if use and use.name == "attempt_completion" then
completed_attempt_completion_tool_use = message
break
end
end
local unfinished_todos = {}
if opts.get_todos then
local todos = opts.get_todos()
unfinished_todos = vim.tbl_filter(
function(todo) return todo.status ~= "done" and todo.status ~= "cancelled" end,
todos
)
end
local user_reminder_count = opts.session_ctx.user_reminder_count or 0
if
not completed_attempt_completion_tool_use
and opts.on_messages_add
and (user_reminder_count < 3 or #unfinished_todos > 0)
then
opts.session_ctx.user_reminder_count = user_reminder_count + 1
Utils.debug("user reminder count", user_reminder_count)
local message
if #unfinished_todos > 0 then
message = History.Message:new(
"user",
"You should use tool calls to answer the question, for example, use write_todos if the task step is done or cancelled.",
{
visible = false,
}
)
else
message = History.Message:new(
"user",
"You should use tool calls to answer the question, for example, use attempt_completion if the job is done.",
{
visible = false,
}
)
end
opts.on_messages_add({ message })
local new_opts = vim.tbl_deep_extend("force", opts, {
history_messages = opts.get_history_messages(),
})
if provider.get_rate_limit_sleep_time then
local sleep_time = provider:get_rate_limit_sleep_time(resp_headers)
if sleep_time and sleep_time > 0 then
Utils.info("Rate limit reached. Sleeping for " .. sleep_time .. " seconds ...")
vim.defer_fn(function() M._stream(new_opts) end, sleep_time * 1000)
return
end
end
M._stream(new_opts)
return
end
end
if stop_opts.reason == "tool_use" then
opts.session_ctx.user_reminder_count = 0
return handle_next_tool_use(pending_tools, pending_tool_use_messages, 1, {}, stop_opts.streaming_tool_use)
end
if stop_opts.reason == "rate_limit" then
local message = opts.on_messages_add
and History.Message:new(
"assistant",
"", -- Actual content will be set below
{
just_for_display = true,
}
)
local retry_count = stop_opts.retry_after
Utils.info("Rate limit reached. Retrying in " .. retry_count .. " seconds", { title = "Avante" })
local function countdown()
if abort_retry_timer then
Utils.info("Retry aborted due to user requested cancellation.")
stop_retry_timer()
dispatch_cancel_message()
return
end
local msg_content = "*[Rate limit reached. Retrying in " .. retry_count .. " seconds ...]*"
if opts.on_chunk then
-- Use ANSI escape codes to clear line and move cursor up only for subsequent updates
local prefix = ""
if retry_count < stop_opts.retry_after then prefix = [[\033[1A\033[K]] end
opts.on_chunk(prefix .. "\n" .. msg_content .. "\n")
end
if opts.on_messages_add and message then
message:update_content("\n\n" .. msg_content)
opts.on_messages_add({ message })
end
if retry_count <= 0 then
stop_retry_timer()
Utils.info("Restarting stream after rate limit pause")
M._stream(opts)
else
retry_count = retry_count - 1
end
end
stop_retry_timer()
retry_timer = uv.new_timer()
if retry_timer then retry_timer:start(0, 1000, vim.schedule_wrap(function() countdown() end)) end
return
end
return opts.on_stop(stop_opts)
end,
}
return M.curl({
provider = provider,
prompt_opts = prompt_opts,
handler_opts = handler_opts,
on_response_headers = function(headers) resp_headers = headers end,
})
end
local function _merge_response(first_response, second_response, opts)
local prompt = "\n" .. Config.dual_boost.prompt
prompt = prompt
:gsub("{{[%s]*provider1_output[%s]*}}", function() return first_response end)
:gsub("{{[%s]*provider2_output[%s]*}}", function() return second_response end)
prompt = prompt .. "\n"
if opts.instructions == nil then opts.instructions = "" end
-- append this reference prompt to the prompt_opts messages at last
opts.instructions = opts.instructions .. prompt
M._stream(opts)
end
local function _collector_process_responses(collector, opts)
if not collector[1] or not collector[2] then
Utils.error("One or both responses failed to complete")
return
end
_merge_response(collector[1], collector[2], opts)
end
local function _collector_add_response(collector, index, response, opts)
collector[index] = response
collector.count = collector.count + 1
if collector.count == 2 then
collector.timer:stop()
_collector_process_responses(collector, opts)
end
end
function M._dual_boost_stream(opts, Provider1, Provider2)
Utils.debug("Starting Dual Boost Stream")
local collector = {
count = 0,
responses = {},
timer = uv.new_timer(),
timeout_ms = Config.dual_boost.timeout,
}
-- Setup timeout
collector.timer:start(
collector.timeout_ms,
0,
vim.schedule_wrap(function()
if collector.count < 2 then
Utils.warn("Dual boost stream timeout reached")
collector.timer:stop()
-- Process whatever responses we have
_collector_process_responses(collector, opts)
end
end)
)
-- Create options for both streams
local function create_stream_opts(index)
local response = ""
return vim.tbl_extend("force", opts, {
on_chunk = function(chunk)
if chunk then response = response .. chunk end
end,
on_stop = function(stop_opts)
if stop_opts.error then
Utils.error(string.format("Stream %d failed: %s", index, stop_opts.error))
return
end
Utils.debug(string.format("Response %d completed", index))
_collector_add_response(collector, index, response, opts)
end,
})
end
-- Start both streams
local success, err = xpcall(function()
local opts1 = create_stream_opts(1)
opts1.provider = Provider1
M._stream(opts1)
local opts2 = create_stream_opts(2)
opts2.provider = Provider2
M._stream(opts2)
end, function(err) return err end)
if not success then Utils.error("Failed to start dual_boost streams: " .. tostring(err)) end
end
---@param opts AvanteLLMStreamOptions
function M.stream(opts)
local is_completed = false
if opts.on_tool_log ~= nil then
local original_on_tool_log = opts.on_tool_log
opts.on_tool_log = vim.schedule_wrap(function(...)
if not original_on_tool_log then return end
return original_on_tool_log(...)
end)
end
if opts.set_tool_use_store ~= nil then
local original_set_tool_use_store = opts.set_tool_use_store
opts.set_tool_use_store = vim.schedule_wrap(function(...)
if not original_set_tool_use_store then return end
return original_set_tool_use_store(...)
end)
end
if opts.on_chunk ~= nil then
local original_on_chunk = opts.on_chunk
opts.on_chunk = vim.schedule_wrap(function(chunk)
if is_completed then return end
if original_on_chunk then return original_on_chunk(chunk) end
end)
end
if opts.on_stop ~= nil then
local original_on_stop = opts.on_stop
opts.on_stop = vim.schedule_wrap(function(stop_opts)
if is_completed then return end
if stop_opts.reason == "complete" or stop_opts.reason == "error" or stop_opts.reason == "cancelled" then
is_completed = true
end
return original_on_stop(stop_opts)
end)
end
local valid_dual_boost_modes = {
legacy = true,
}
opts.mode = opts.mode or Config.mode
abort_retry_timer = false
if Config.dual_boost.enabled and valid_dual_boost_modes[opts.mode] then
M._dual_boost_stream(
opts,
Providers[Config.dual_boost.first_provider],
Providers[Config.dual_boost.second_provider]
)
else
M._stream(opts)
end
end
function M.cancel_inflight_request()
if LLMToolHelpers.is_cancelled ~= nil then LLMToolHelpers.is_cancelled = true end
if LLMToolHelpers.confirm_popup ~= nil then
LLMToolHelpers.confirm_popup:cancel()
LLMToolHelpers.confirm_popup = nil
end
abort_retry_timer = true
api.nvim_exec_autocmds("User", { pattern = M.CANCEL_PATTERN })
end
return M