diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 5ccb2e4..d93bdd3 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -160,7 +160,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise { const skippedSet = new Set(skipped); const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id)); try { - const result = await processQuery(query, routing); + const result = await processQuery(query, routing, processingIds); if (result.continuation && result.continuation !== continuation) { continuation = result.continuation; setStoredSessionId(continuation); @@ -189,6 +189,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise { }); } + // Ensure completed even if processQuery ended without a result event + // (e.g. stream closed unexpectedly). markCompleted(processingIds); log(`Completed ${ids.length} message(s)`); } @@ -232,7 +234,11 @@ interface QueryResult { continuation?: string; } -async function processQuery(query: AgentQuery, routing: RoutingContext): Promise { +async function processQuery( + query: AgentQuery, + routing: RoutingContext, + initialBatchIds: string[], +): Promise { let queryContinuation: string | undefined; let done = false; @@ -246,14 +252,15 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise if (done) return; // Skip system messages (MCP tool responses) and /clear (needs fresh query). - // Also defer messages whose thread_id differs from the active turn's routing - // — mixing threads into one streaming turn would send the reply to the wrong - // thread because `routing` is captured at turn start. The next turn will pick - // them up with fresh routing. + // Thread routing is the router's concern — if a message landed in this + // session, the agent should see it. Per-thread sessions already isolate + // threads into separate containers; shared sessions intentionally merge + // everything. Filtering on thread_id here caused deadlocks when the + // initial batch and follow-ups had mismatched thread_ids (e.g. a + // host-generated welcome trigger with null thread vs a Discord DM reply). const newMessages = getPendingMessages().filter((m) => { if (m.kind === 'system') return false; if ((m.kind === 'chat' || m.kind === 'chat-sdk') && isClearCommand(m)) return false; - if ((m.thread_id ?? null) !== (routing.threadId ?? null)) return false; return true; }); if (newMessages.length > 0) { @@ -282,8 +289,17 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise // effectively orphaned and the next message started a blank // Claude session with no prior context. setStoredSessionId(event.continuation); - } else if (event.type === 'result' && event.text) { - dispatchResultText(event.text, routing); + } else if (event.type === 'result') { + // A result — with or without text — means the turn is done. Mark + // the initial batch completed now so the host sweep doesn't see + // stale 'processing' claims while the query stays open for + // follow-up pushes. The agent may have responded via MCP + // (send_message) mid-turn, or the message may not need a response + // at all — either way the turn is finished. + markCompleted(initialBatchIds); + if (event.text) { + dispatchResultText(event.text, routing); + } } } } finally { diff --git a/scripts/init-first-agent.ts b/scripts/init-first-agent.ts index b3d7bd0..dcb99b5 100644 --- a/scripts/init-first-agent.ts +++ b/scripts/init-first-agent.ts @@ -363,7 +363,7 @@ async function sendWelcomeViaCliSocket( to: { channelType: dmMg.channel_type, platformId: dmMg.platform_id, - threadId: null, + threadId: dmMg.platform_id, }, }) + '\n'; socket.write(payload, (err) => {