From 31f2da95856f85c1d754756a4b940b05f86fa3a2 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Mon, 20 Apr 2026 11:23:47 +0300 Subject: [PATCH] fix(container): gate poll loop on trigger=1 to honor accumulate contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A warm container picks up every pending messages_in row on each poll tick and calls markProcessing → agent.query → markCompleted. Before this, that included trigger=0 rows (ignored_message_policy='accumulate' context), causing the agent to wake and potentially respond to messages the wiring had explicitly opted out of engaging on — defeating accumulate's "store as context, don't engage" contract. Gate the main poll loop with `messages.some(m => m.trigger === 1)` — mirrors host-side countDueMessages which is already gated. If the batch has no wake-eligible row, sleep and leave them pending. They ride along via the same getPendingMessages query the next time a real trigger=1 lands, which is the intended accumulate behavior. The concurrent active-turn poll (line ~290) is unchanged on purpose — once the agent has engaged, pushing in accumulate rows mid-turn as additional context is desired. initTestSessionDb was missing the trigger and series_id columns on messages_in, out of sync with the live migration. Added both so the new tests (and any future trigger-aware tests) can run. Four tests cover the data contract: trigger=0 rows are returned by getPendingMessages (so they ride along), the gate predicate correctly identifies accumulate-only batches, mixed batches pass the gate, and the schema default of 1 applies when the column is omitted. Co-Authored-By: Claude Opus 4.7 (1M context) --- container/agent-runner/src/db/connection.ts | 2 + container/agent-runner/src/poll-loop.test.ts | 53 ++++++++++++++++++-- container/agent-runner/src/poll-loop.ts | 13 +++++ 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 772f4f1..3c0fffd 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -157,7 +157,9 @@ export function initTestSessionDb(): { inbound: Database; outbound: Database } { status TEXT DEFAULT 'pending', process_after TEXT, recurrence TEXT, + series_id TEXT, tries INTEGER DEFAULT 0, + trigger INTEGER NOT NULL DEFAULT 1, platform_id TEXT, channel_type TEXT, thread_id TEXT, diff --git a/container/agent-runner/src/poll-loop.test.ts b/container/agent-runner/src/poll-loop.test.ts index de5fb68..356108f 100644 --- a/container/agent-runner/src/poll-loop.test.ts +++ b/container/agent-runner/src/poll-loop.test.ts @@ -14,13 +14,13 @@ afterEach(() => { closeSessionDb(); }); -function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string }) { +function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string; trigger?: 0 | 1 }) { getInboundDb() .prepare( - `INSERT INTO messages_in (id, kind, timestamp, status, process_after, content) - VALUES (?, ?, datetime('now'), 'pending', ?, ?)`, + `INSERT INTO messages_in (id, kind, timestamp, status, process_after, trigger, content) + VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?)`, ) - .run(id, kind, opts?.processAfter ?? null, JSON.stringify(content)); + .run(id, kind, opts?.processAfter ?? null, opts?.trigger ?? 1, JSON.stringify(content)); } describe('formatter', () => { @@ -84,6 +84,51 @@ describe('formatter', () => { }); }); +describe('accumulate gate (trigger column)', () => { + it('getPendingMessages returns both trigger=0 and trigger=1 rows', () => { + // trigger=0 rides along as context, trigger=1 is the wake-eligible row. + // The poll loop's gate depends on this data contract. + insertMessage('m1', 'chat', { sender: 'A', text: 'chit chat' }, { trigger: 0 }); + insertMessage('m2', 'chat', { sender: 'B', text: 'actual mention' }, { trigger: 1 }); + const messages = getPendingMessages(); + expect(messages).toHaveLength(2); + const byId = Object.fromEntries(messages.map((m) => [m.id, m])); + expect(byId.m1.trigger).toBe(0); + expect(byId.m2.trigger).toBe(1); + }); + + it('trigger=0-only batch: gate predicate `some(trigger===1)` is false', () => { + insertMessage('m1', 'chat', { sender: 'A', text: 'noise' }, { trigger: 0 }); + insertMessage('m2', 'chat', { sender: 'B', text: 'more noise' }, { trigger: 0 }); + const messages = getPendingMessages(); + // This is the exact predicate the poll loop uses to skip accumulate-only + // batches — gate should be false, so the loop sleeps without waking the agent. + expect(messages.some((m) => m.trigger === 1)).toBe(false); + }); + + it('mixed batch: gate is true → loop proceeds, accumulated rows ride along', () => { + insertMessage('m1', 'chat', { sender: 'A', text: 'earlier chatter' }, { trigger: 0 }); + insertMessage('m2', 'chat', { sender: 'B', text: 'the real mention' }, { trigger: 1 }); + const messages = getPendingMessages(); + expect(messages.some((m) => m.trigger === 1)).toBe(true); + // Both messages are present for the formatter → agent sees the prior context. + expect(messages.map((m) => m.id).sort()).toEqual(['m1', 'm2']); + }); + + it('trigger column defaults to 1 for legacy inserts without explicit value', () => { + // The schema default is 1 (see src/db/schema.ts INBOUND_SCHEMA) — existing + // rows / tests without the column set are effectively wake-eligible. + getInboundDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, content) + VALUES ('m1', 'chat', datetime('now'), 'pending', '{"text":"hi"}')`, + ) + .run(); + const [msg] = getPendingMessages(); + expect(msg.trigger).toBe(1); + }); +}); + describe('routing', () => { it('should extract routing from messages', () => { getInboundDb() diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 8a4ec7d..3f0e364 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -72,6 +72,19 @@ export async function runPollLoop(config: PollLoopConfig): Promise { continue; } + // Accumulate gate: if the batch contains only trigger=0 rows + // (context-only, router-stored under ignored_message_policy='accumulate'), + // don't wake the agent. Leave them `pending` — they'll ride along the + // next time a real trigger=1 message lands via this same getPendingMessages + // query. Without this gate, a warm container keeps processing + // (and potentially responding to) every accumulate-only batch, defeating + // the "store as context, don't engage" contract. Host-side countDueMessages + // gates the same way for wake-from-cold (see src/db/session-db.ts). + if (!messages.some((m) => m.trigger === 1)) { + await sleep(POLL_INTERVAL_MS); + continue; + } + const ids = messages.map((m) => m.id); markProcessing(ids);