Merge pull request #2114 from robbyczgw-cla/fix/poll-loop-prescripts-on-followups

fix(poll-loop): apply pre-task scripts to follow-up injections too
This commit is contained in:
gavrielc
2026-05-01 00:57:34 +03:00
committed by GitHub

View File

@@ -260,31 +260,69 @@ async function processQuery(
// Stream liveness is decided host-side via the heartbeat file + processing // Stream liveness is decided host-side via the heartbeat file + processing
// claim age (see src/host-sweep.ts); if something is truly stuck, the host // claim age (see src/host-sweep.ts); if something is truly stuck, the host
// will kill the container and messages get reset to pending. // will kill the container and messages get reset to pending.
let pollInFlight = false;
const pollHandle = setInterval(() => { const pollHandle = setInterval(() => {
if (done) return; if (done || pollInFlight) return;
pollInFlight = true;
// Skip system messages (MCP tool responses) and /clear (needs fresh query). void (async () => {
// Thread routing is the router's concern — if a message landed in this try {
// session, the agent should see it. Per-thread sessions already isolate // Skip system messages (MCP tool responses) and /clear (needs fresh query).
// threads into separate containers; shared sessions intentionally merge // Thread routing is the router's concern — if a message landed in this
// everything. Filtering on thread_id here caused deadlocks when the // session, the agent should see it. Per-thread sessions already isolate
// initial batch and follow-ups had mismatched thread_ids (e.g. a // threads into separate containers; shared sessions intentionally merge
// host-generated welcome trigger with null thread vs a Discord DM reply). // everything. Filtering on thread_id here caused deadlocks when the
const newMessages = getPendingMessages().filter((m) => { // initial batch and follow-ups had mismatched thread_ids (e.g. a
if (m.kind === 'system') return false; // host-generated welcome trigger with null thread vs a Discord DM reply).
if ((m.kind === 'chat' || m.kind === 'chat-sdk') && isClearCommand(m)) return false; const newMessages = getPendingMessages().filter((m) => {
return true; if (m.kind === 'system') return false;
}); if ((m.kind === 'chat' || m.kind === 'chat-sdk') && isClearCommand(m)) return false;
if (newMessages.length > 0) { return true;
const newIds = newMessages.map((m) => m.id); });
markProcessing(newIds); if (newMessages.length === 0) return;
const prompt = formatMessages(newMessages); const newIds = newMessages.map((m) => m.id);
log(`Pushing ${newMessages.length} follow-up message(s) into active query`); markProcessing(newIds);
query.push(prompt);
markCompleted(newIds); // Run pre-task scripts on follow-ups too — without this, a task that
} // arrives during an active query (e.g. a */10 monitoring cron) bypasses
// its script gate and always wakes the agent, defeating the gate.
// Mirrors the initial-batch hook above.
let keep = newMessages;
let skipped: string[] = [];
// MODULE-HOOK:scheduling-pre-task-followup:start
const { applyPreTaskScripts } = await import('./scheduling/task-script.js');
const preTask = await applyPreTaskScripts(newMessages);
keep = preTask.keep;
skipped = preTask.skipped;
if (skipped.length > 0) {
markCompleted(skipped);
log(`Pre-task script skipped ${skipped.length} follow-up task(s): ${skipped.join(', ')}`);
}
// MODULE-HOOK:scheduling-pre-task-followup:end
if (keep.length === 0) return;
// Re-check done — the outer query may have finished while the script
// was awaited. Pushing into a closed stream is wasted work; the
// claimed messages get released by the host's processing-claim sweep.
if (done) return;
const keptIds = keep.map((m) => m.id);
const prompt = formatMessages(keep);
log(`Pushing ${keep.length} follow-up message(s) into active query`);
query.push(prompt);
markCompleted(keptIds);
} catch (err) {
// Without this catch the rejection escapes the void IIFE and Node
// terminates the container on unhandled-rejection. The initial-batch
// path is wrapped by processQuery's outer try/catch; the follow-up
// path is not, so it needs its own.
const errMsg = err instanceof Error ? err.message : String(err);
log(`Follow-up poll error: ${errMsg}`);
} finally {
pollInFlight = false;
}
})();
}, ACTIVE_POLL_INTERVAL_MS); }, ACTIVE_POLL_INTERVAL_MS);
try { try {