diff --git a/lua/avante/libs/acp_client.lua b/lua/avante/libs/acp_client.lua index dc55881..85f021e 100644 --- a/lua/avante/libs/acp_client.lua +++ b/lua/avante/libs/acp_client.lua @@ -242,7 +242,6 @@ function ACPClient:new(config) }, }, debug_log_file = nil, - pending_responses = {}, callbacks = {}, transport = nil, config = config or {}, @@ -388,7 +387,7 @@ function ACPClient:_create_stdio_transport() if self.config.reconnect and self.reconnect_count < (self.config.max_reconnect_attempts or 3) then self.reconnect_count = self.reconnect_count + 1 vim.defer_fn(function() - if self.state == "disconnected" then self:connect() end + if self.state == "disconnected" then self:connect(function(_err) end) end end, 2000) -- Wait 2 seconds before reconnecting end end) @@ -493,9 +492,7 @@ end ---Send JSON-RPC request ---@param method string ---@param params table? ----@param callback? fun(result: table|nil, err: avante.acp.ACPError|nil) ----@return table|nil result ----@return avante.acp.ACPError|nil err +---@param callback fun(result: table|nil, err: avante.acp.ACPError|nil) function ACPClient:_send_request(method, params, callback) local id = self:_next_id() local message = { @@ -505,30 +502,11 @@ function ACPClient:_send_request(method, params, callback) params = params or {}, } - if callback then self.callbacks[id] = callback end + self.callbacks[id] = callback local data = vim.json.encode(message) self:_debug_log("request: " .. data .. string.rep("=", 100) .. "\n") - if not self.transport:send(data) then return nil end - - if not callback then return self:_wait_response(id) end -end - -function ACPClient:_wait_response(id) - local start_time = vim.loop.now() - local timeout = self.config.timeout or 100000 - - while vim.loop.now() - start_time < timeout do - vim.wait(10) - - if self.pending_responses[id] then - local result, err = unpack(self.pending_responses[id]) - self.pending_responses[id] = nil - return result, err - end - end - - return nil, self:_create_error(self.ERROR_CODES.TIMEOUT_ERROR, "Timeout waiting for response") + self.transport:send(data) end ---Send JSON-RPC notification @@ -584,8 +562,6 @@ function ACPClient:_handle_message(message) if callback then callback(message.result, message.error) self.callbacks[message.id] = nil - else - self.pending_responses[message.id] = { message.result, message.error } end else -- Unknown message type @@ -715,111 +691,139 @@ function ACPClient:_handle_write_text_file(message_id, params) end ---Start client -function ACPClient:connect() - if self.state ~= "disconnected" then return end +---@param callback fun(err: avante.acp.ACPError|nil) +function ACPClient:connect(callback) + callback = callback or function() end - self.transport:start(function(message) self:_handle_message(message) end) + if self.state ~= "disconnected" then + callback(nil) + return + end - self:initialize() + self.transport:start(vim.schedule_wrap(function(message) self:_handle_message(message) end)) + + self:initialize(callback) end ---Stop client function ACPClient:stop() self.transport:stop() self:_close_debug_log() - self.pending_responses = {} self.reconnect_count = 0 end ---Initialize protocol connection -function ACPClient:initialize() +---@param callback fun(err: avante.acp.ACPError|nil) +function ACPClient:initialize(callback) + callback = callback or function() end + if self.state ~= "connected" then local error = self:_create_error(self.ERROR_CODES.PROTOCOL_ERROR, "Cannot initialize: client not connected") - return error + callback(error) + return end self:_set_state("initializing") - local result = self:_send_request("initialize", { + self:_send_request("initialize", { protocolVersion = self.protocol_version, clientCapabilities = self.capabilities, - }) + }, function(result, err) + if err or not result then + self:_set_state("error") + vim.schedule(function() vim.notify("Failed to initialize", vim.log.levels.ERROR) end) + callback(err or self:_create_error(self.ERROR_CODES.PROTOCOL_ERROR, "Failed to initialize: missing result")) + return + end - if not result then - self:_set_state("error") - vim.notify("Failed to initialize", vim.log.levels.ERROR) - return - end + -- Update protocol version and capabilities + self.protocol_version = result.protocolVersion + self.agent_capabilities = result.agentCapabilities + self.auth_methods = result.authMethods or {} - -- Update protocol version and capabilities - self.protocol_version = result.protocolVersion - self.agent_capabilities = result.agentCapabilities - self.auth_methods = result.authMethods or {} + -- Check if we need to authenticate + local auth_method = self.config.auth_method - -- Check if we need to authenticate - local auth_method = self.config.auth_method - - if auth_method then - Utils.debug("Authenticating with method " .. auth_method) - self:authenticate(auth_method) - self:_set_state("ready") - else - Utils.debug("No authentication method found or specified") - self:_set_state("ready") - end + if auth_method then + Utils.debug("Authenticating with method " .. auth_method) + self:authenticate(auth_method, function(auth_err) + if auth_err then + callback(auth_err) + else + self:_set_state("ready") + callback(nil) + end + end) + else + Utils.debug("No authentication method found or specified") + self:_set_state("ready") + callback(nil) + end + end) end ---Authentication (if required) ---@param method_id string -function ACPClient:authenticate(method_id) - return self:_send_request("authenticate", { +---@param callback fun(err: avante.acp.ACPError|nil) +function ACPClient:authenticate(method_id, callback) + callback = callback or function() end + + self:_send_request("authenticate", { methodId = method_id, - }) + }, function(result, err) callback(err) end) end ---Create new session ---@param cwd string ---@param mcp_servers table[]? ----@return string|nil session_id ----@return avante.acp.ACPError|nil err -function ACPClient:create_session(cwd, mcp_servers) - local result, err = self:_send_request("session/new", { +---@param callback fun(session_id: string|nil, err: avante.acp.ACPError|nil) +function ACPClient:create_session(cwd, mcp_servers, callback) + callback = callback or function() end + + self:_send_request("session/new", { cwd = cwd, mcpServers = mcp_servers or {}, - }) - if err then - vim.notify("Failed to create session: " .. err.message, vim.log.levels.ERROR) - return nil, err - end - if not result then - err = self:_create_error(self.ERROR_CODES.PROTOCOL_ERROR, "Failed to create session: missing result") - return nil, err - end - return result.sessionId, nil + }, function(result, err) + if err then + vim.schedule(function() vim.notify("Failed to create session: " .. err.message, vim.log.levels.ERROR) end) + callback(nil, err) + return + end + if not result then + local error = self:_create_error(self.ERROR_CODES.PROTOCOL_ERROR, "Failed to create session: missing result") + callback(nil, error) + return + end + callback(result.sessionId, nil) + end) end ---Load existing session ---@param session_id string ---@param cwd string ---@param mcp_servers table[]? ----@return table|nil result -function ACPClient:load_session(session_id, cwd, mcp_servers) +---@param callback fun(result: table|nil, err: avante.acp.ACPError|nil) +function ACPClient:load_session(session_id, cwd, mcp_servers, callback) + callback = callback or function() end + if not self.agent_capabilities or not self.agent_capabilities.loadSession then - vim.notify("Agent does not support loading sessions", vim.log.levels.WARN) + vim.schedule(function() vim.notify("Agent does not support loading sessions", vim.log.levels.WARN) end) + local err = self:_create_error(self.ERROR_CODES.PROTOCOL_ERROR, "Agent does not support loading sessions") + callback(nil, err) return end - return self:_send_request("session/load", { + self:_send_request("session/load", { sessionId = session_id, cwd = cwd, mcpServers = mcp_servers or {}, - }) + }, callback) end ---Send prompt ---@param session_id string ---@param prompt table[] ----@param callback? fun(result: table|nil, err: avante.acp.ACPError|nil) +---@param callback fun(result: table|nil, err: avante.acp.ACPError|nil) function ACPClient:send_prompt(session_id, prompt, callback) local params = { sessionId = session_id, @@ -980,9 +984,10 @@ end ---Convenience method: Send simple text prompt ---@param session_id string ---@param text string -function ACPClient:send_text_prompt(session_id, text) +---@param callback fun(result: table|nil, err: avante.acp.ACPError|nil) +function ACPClient:send_text_prompt(session_id, text, callback) local prompt = { self:create_text_content(text) } - self:send_prompt(session_id, prompt) + self:send_prompt(session_id, prompt, callback) end return ACPClient diff --git a/lua/avante/llm.lua b/lua/avante/llm.lua index 1d1cdd0..cbab279 100644 --- a/lua/avante/llm.lua +++ b/lua/avante/llm.lua @@ -977,6 +977,7 @@ function M._stream_acp(opts) return message end local acp_client = opts.acp_client + local session_id = opts.acp_session_id if not acp_client then local acp_config = vim.tbl_deep_extend("force", acp_provider, { ---@type ACPHandlers @@ -1298,22 +1299,46 @@ function M._stream_acp(opts) }, }) acp_client = ACPClient:new(acp_config) - acp_client:connect() - -- Register ACP client for global cleanup on exit (Fix Issue #2749) - local client_id = "acp_" .. tostring(acp_client) .. "_" .. os.time() - local ok, Avante = pcall(require, "avante") - if ok and Avante.register_acp_client then Avante.register_acp_client(client_id, acp_client) end + acp_client:connect(function(conn_err) + if conn_err then + opts.on_stop({ reason = "error", error = conn_err }) + return + end - -- If we create a new client and it does not support sesion loading, - -- remove the old session - if not acp_client.agent_capabilities.loadSession then opts.acp_session_id = nil end - if opts.on_save_acp_client then opts.on_save_acp_client(acp_client) end + -- Register ACP client for global cleanup on exit (Fix Issue #2749) + local client_id = "acp_" .. tostring(acp_client) .. "_" .. os.time() + local ok, Avante = pcall(require, "avante") + if ok and Avante.register_acp_client then Avante.register_acp_client(client_id, acp_client) end + + -- If we create a new client and it does not support sesion loading, + -- remove the old session + if not acp_client.agent_capabilities.loadSession then opts.acp_session_id = nil end + if opts.on_save_acp_client then opts.on_save_acp_client(acp_client) end + + session_id = opts.acp_session_id + if not session_id then + M._create_acp_session_and_continue(opts, acp_client) + else + if opts.just_connect_acp_client then return end + M._continue_stream_acp(opts, acp_client, session_id) + end + end) + return + elseif not session_id then + M._create_acp_session_and_continue(opts, acp_client) + return end - local session_id = opts.acp_session_id - if not session_id then - local project_root = Utils.root.get() - local session_id_, err = acp_client:create_session(project_root, {}) + + if opts.just_connect_acp_client then return end + M._continue_stream_acp(opts, acp_client, session_id) +end + +---@param opts AvanteLLMStreamOptions +---@param acp_client avante.acp.ACPClient +function M._create_acp_session_and_continue(opts, acp_client) + local project_root = Utils.root.get() + acp_client:create_session(project_root, {}, function(session_id_, err) if err then opts.on_stop({ reason = "error", error = err }) return @@ -1322,10 +1347,18 @@ function M._stream_acp(opts) opts.on_stop({ reason = "error", error = "Failed to create session" }) return end - session_id = session_id_ - if opts.on_save_acp_session_id then opts.on_save_acp_session_id(session_id) end - end - if opts.just_connect_acp_client then return end + opts.acp_session_id = session_id_ + if opts.on_save_acp_session_id then opts.on_save_acp_session_id(session_id_) end + + if opts.just_connect_acp_client then return end + M._continue_stream_acp(opts, acp_client, session_id_) + end) +end + +---@param opts AvanteLLMStreamOptions +---@param acp_client avante.acp.ACPClient +---@param session_id string +function M._continue_stream_acp(opts, acp_client, session_id) local prompt = {} local donot_use_builtin_system_prompt = opts.history_messages ~= nil and #opts.history_messages > 0 if donot_use_builtin_system_prompt then