diff --git a/src/container-runner.ts b/src/container-runner.ts index 029b5fe..27b0f5c 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -58,7 +58,7 @@ const activeContainers = new Map>(); +const wakePromises = new Map>(); export function getActiveContainerCount(): number { return activeContainers.size; @@ -73,20 +73,32 @@ export function isContainerRunning(sessionId: string): boolean { * (the in-flight wake promise is reused). * * The container runs the v2 agent-runner which polls the session DB. + * + * Contract: never throws. Returns `true` on successful spawn, `false` on + * transient spawn failure (e.g. OneCLI gateway unreachable). Callers don't + * need to wrap — the inbound row stays pending and host-sweep retries on + * its next tick. Callers that care (e.g. the router's typing indicator) + * can branch on the boolean. */ -export function wakeContainer(session: Session): Promise { +export function wakeContainer(session: Session): Promise { if (activeContainers.has(session.id)) { log.debug('Container already running', { sessionId: session.id }); - return Promise.resolve(); + return Promise.resolve(true); } const existing = wakePromises.get(session.id); if (existing) { log.debug('Container wake already in-flight — joining existing promise', { sessionId: session.id }); return existing; } - const promise = spawnContainer(session).finally(() => { - wakePromises.delete(session.id); - }); + const promise = spawnContainer(session) + .then(() => true) + .catch((err) => { + log.warn('wakeContainer failed — host-sweep will retry', { sessionId: session.id, err }); + return false; + }) + .finally(() => { + wakePromises.delete(session.id); + }); wakePromises.set(session.id, promise); return promise; } @@ -435,20 +447,18 @@ async function buildContainerArgs( } // OneCLI gateway — injects HTTPS_PROXY + certs so container API calls - // are routed through the agent vault for credential injection. - try { - if (agentIdentifier) { - await onecli.ensureAgent({ name: agentGroup.name, identifier: agentIdentifier }); - } - const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier }); - if (onecliApplied) { - log.info('OneCLI gateway applied', { containerName }); - } else { - log.warn('OneCLI gateway not applied — container will have no credentials', { containerName }); - } - } catch (err) { - log.warn('OneCLI gateway error — container will have no credentials', { containerName, err }); + // are routed through the agent vault for credential injection. Treated as + // a transient hard failure: if we can't wire the gateway, we don't spawn. + // The caller (router or host-sweep) catches the throw, leaves the inbound + // message pending, and the next sweep tick retries. + if (agentIdentifier) { + await onecli.ensureAgent({ name: agentGroup.name, identifier: agentIdentifier }); } + const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier }); + if (!onecliApplied) { + throw new Error('OneCLI gateway not applied — refusing to spawn container without credentials'); + } + log.info('OneCLI gateway applied', { containerName }); // Host gateway args.push(...hostGatewayArgs()); diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 4dc2fb7..69a4d61 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -168,6 +168,8 @@ async function sweepSession(session: Session): Promise { const dueCount = countDueMessages(inDb); if (dueCount > 0 && !isContainerRunning(session.id)) { log.info('Waking container for due messages', { sessionId: session.id, count: dueCount }); + // wakeContainer never throws — transient spawn failures (OneCLI down, + // etc.) return false and leave messages pending for the next tick. await wakeContainer(session); } diff --git a/src/router.ts b/src/router.ts index 995496d..ad4cd56 100644 --- a/src/router.ts +++ b/src/router.ts @@ -27,7 +27,7 @@ import { getMessagingGroupWithAgentCount, } from './db/messaging-groups.js'; import { findSessionForAgent } from './db/sessions.js'; -import { startTypingRefresh } from './modules/typing/index.js'; +import { startTypingRefresh, stopTypingRefresh } from './modules/typing/index.js'; import { log } from './log.js'; import { resolveSession, writeSessionMessage, writeOutboundDirect } from './session-manager.js'; import { wakeContainer } from './container-runner.js'; @@ -457,7 +457,11 @@ async function deliverToAgent( startTypingRefresh(session.id, session.agent_group_id, event.channelType, event.platformId, event.threadId); const freshSession = getSession(session.id); if (freshSession) { - await wakeContainer(freshSession); + const woke = await wakeContainer(freshSession); + // wakeContainer never throws — it returns false on transient spawn + // failure (host-sweep retries). Stop the typing indicator we just + // started so it doesn't leak; the inbound row stays pending. + if (!woke) stopTypingRefresh(freshSession.id); } } }