fix(agent-runner): remove thread_id filter and fix processing ack on empty result
The concurrent poll in processQuery filtered out messages with mismatched thread_ids, causing a deadlock when the initial batch (e.g. a host-generated welcome trigger with null thread_id) completed but follow-ups arrived with a different thread_id (e.g. a Discord DM). The query stayed open waiting for matching-thread pushes that never came, blocking the poll loop indefinitely. Thread routing is the router's concern — per-thread sessions already isolate threads into separate containers; shared sessions intentionally merge everything. Removed the filter. Also fixed processing_ack: a result event (with or without text) means the turn is done, but markCompleted only ran when event.text was truthy. When the agent responded via MCP send_message (empty result text), the initial batch stayed in 'processing' for the query's lifetime, creating false stuck signals in the host sweep. Now marks completed on any result event. Belt-and-suspenders: init-first-agent welcome trigger now sets threadId to the DM platform_id instead of null. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -160,7 +160,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|||||||
const skippedSet = new Set(skipped);
|
const skippedSet = new Set(skipped);
|
||||||
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
||||||
try {
|
try {
|
||||||
const result = await processQuery(query, routing);
|
const result = await processQuery(query, routing, processingIds);
|
||||||
if (result.continuation && result.continuation !== continuation) {
|
if (result.continuation && result.continuation !== continuation) {
|
||||||
continuation = result.continuation;
|
continuation = result.continuation;
|
||||||
setStoredSessionId(continuation);
|
setStoredSessionId(continuation);
|
||||||
@@ -189,6 +189,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure completed even if processQuery ended without a result event
|
||||||
|
// (e.g. stream closed unexpectedly).
|
||||||
markCompleted(processingIds);
|
markCompleted(processingIds);
|
||||||
log(`Completed ${ids.length} message(s)`);
|
log(`Completed ${ids.length} message(s)`);
|
||||||
}
|
}
|
||||||
@@ -232,7 +234,11 @@ interface QueryResult {
|
|||||||
continuation?: string;
|
continuation?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function processQuery(query: AgentQuery, routing: RoutingContext): Promise<QueryResult> {
|
async function processQuery(
|
||||||
|
query: AgentQuery,
|
||||||
|
routing: RoutingContext,
|
||||||
|
initialBatchIds: string[],
|
||||||
|
): Promise<QueryResult> {
|
||||||
let queryContinuation: string | undefined;
|
let queryContinuation: string | undefined;
|
||||||
let done = false;
|
let done = false;
|
||||||
|
|
||||||
@@ -246,14 +252,15 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise
|
|||||||
if (done) return;
|
if (done) return;
|
||||||
|
|
||||||
// Skip system messages (MCP tool responses) and /clear (needs fresh query).
|
// Skip system messages (MCP tool responses) and /clear (needs fresh query).
|
||||||
// Also defer messages whose thread_id differs from the active turn's routing
|
// Thread routing is the router's concern — if a message landed in this
|
||||||
// — mixing threads into one streaming turn would send the reply to the wrong
|
// session, the agent should see it. Per-thread sessions already isolate
|
||||||
// thread because `routing` is captured at turn start. The next turn will pick
|
// threads into separate containers; shared sessions intentionally merge
|
||||||
// them up with fresh routing.
|
// everything. Filtering on thread_id here caused deadlocks when the
|
||||||
|
// initial batch and follow-ups had mismatched thread_ids (e.g. a
|
||||||
|
// host-generated welcome trigger with null thread vs a Discord DM reply).
|
||||||
const newMessages = getPendingMessages().filter((m) => {
|
const newMessages = getPendingMessages().filter((m) => {
|
||||||
if (m.kind === 'system') return false;
|
if (m.kind === 'system') return false;
|
||||||
if ((m.kind === 'chat' || m.kind === 'chat-sdk') && isClearCommand(m)) return false;
|
if ((m.kind === 'chat' || m.kind === 'chat-sdk') && isClearCommand(m)) return false;
|
||||||
if ((m.thread_id ?? null) !== (routing.threadId ?? null)) return false;
|
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
if (newMessages.length > 0) {
|
if (newMessages.length > 0) {
|
||||||
@@ -282,8 +289,17 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise
|
|||||||
// effectively orphaned and the next message started a blank
|
// effectively orphaned and the next message started a blank
|
||||||
// Claude session with no prior context.
|
// Claude session with no prior context.
|
||||||
setStoredSessionId(event.continuation);
|
setStoredSessionId(event.continuation);
|
||||||
} else if (event.type === 'result' && event.text) {
|
} else if (event.type === 'result') {
|
||||||
dispatchResultText(event.text, routing);
|
// A result — with or without text — means the turn is done. Mark
|
||||||
|
// the initial batch completed now so the host sweep doesn't see
|
||||||
|
// stale 'processing' claims while the query stays open for
|
||||||
|
// follow-up pushes. The agent may have responded via MCP
|
||||||
|
// (send_message) mid-turn, or the message may not need a response
|
||||||
|
// at all — either way the turn is finished.
|
||||||
|
markCompleted(initialBatchIds);
|
||||||
|
if (event.text) {
|
||||||
|
dispatchResultText(event.text, routing);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|||||||
@@ -363,7 +363,7 @@ async function sendWelcomeViaCliSocket(
|
|||||||
to: {
|
to: {
|
||||||
channelType: dmMg.channel_type,
|
channelType: dmMg.channel_type,
|
||||||
platformId: dmMg.platform_id,
|
platformId: dmMg.platform_id,
|
||||||
threadId: null,
|
threadId: dmMg.platform_id,
|
||||||
},
|
},
|
||||||
}) + '\n';
|
}) + '\n';
|
||||||
socket.write(payload, (err) => {
|
socket.write(payload, (err) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user