diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 772f4f1..3c0fffd 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -157,7 +157,9 @@ export function initTestSessionDb(): { inbound: Database; outbound: Database } { status TEXT DEFAULT 'pending', process_after TEXT, recurrence TEXT, + series_id TEXT, tries INTEGER DEFAULT 0, + trigger INTEGER NOT NULL DEFAULT 1, platform_id TEXT, channel_type TEXT, thread_id TEXT, diff --git a/container/agent-runner/src/poll-loop.test.ts b/container/agent-runner/src/poll-loop.test.ts index de5fb68..356108f 100644 --- a/container/agent-runner/src/poll-loop.test.ts +++ b/container/agent-runner/src/poll-loop.test.ts @@ -14,13 +14,13 @@ afterEach(() => { closeSessionDb(); }); -function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string }) { +function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string; trigger?: 0 | 1 }) { getInboundDb() .prepare( - `INSERT INTO messages_in (id, kind, timestamp, status, process_after, content) - VALUES (?, ?, datetime('now'), 'pending', ?, ?)`, + `INSERT INTO messages_in (id, kind, timestamp, status, process_after, trigger, content) + VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?)`, ) - .run(id, kind, opts?.processAfter ?? null, JSON.stringify(content)); + .run(id, kind, opts?.processAfter ?? null, opts?.trigger ?? 1, JSON.stringify(content)); } describe('formatter', () => { @@ -84,6 +84,51 @@ describe('formatter', () => { }); }); +describe('accumulate gate (trigger column)', () => { + it('getPendingMessages returns both trigger=0 and trigger=1 rows', () => { + // trigger=0 rides along as context, trigger=1 is the wake-eligible row. + // The poll loop's gate depends on this data contract. + insertMessage('m1', 'chat', { sender: 'A', text: 'chit chat' }, { trigger: 0 }); + insertMessage('m2', 'chat', { sender: 'B', text: 'actual mention' }, { trigger: 1 }); + const messages = getPendingMessages(); + expect(messages).toHaveLength(2); + const byId = Object.fromEntries(messages.map((m) => [m.id, m])); + expect(byId.m1.trigger).toBe(0); + expect(byId.m2.trigger).toBe(1); + }); + + it('trigger=0-only batch: gate predicate `some(trigger===1)` is false', () => { + insertMessage('m1', 'chat', { sender: 'A', text: 'noise' }, { trigger: 0 }); + insertMessage('m2', 'chat', { sender: 'B', text: 'more noise' }, { trigger: 0 }); + const messages = getPendingMessages(); + // This is the exact predicate the poll loop uses to skip accumulate-only + // batches — gate should be false, so the loop sleeps without waking the agent. + expect(messages.some((m) => m.trigger === 1)).toBe(false); + }); + + it('mixed batch: gate is true → loop proceeds, accumulated rows ride along', () => { + insertMessage('m1', 'chat', { sender: 'A', text: 'earlier chatter' }, { trigger: 0 }); + insertMessage('m2', 'chat', { sender: 'B', text: 'the real mention' }, { trigger: 1 }); + const messages = getPendingMessages(); + expect(messages.some((m) => m.trigger === 1)).toBe(true); + // Both messages are present for the formatter → agent sees the prior context. + expect(messages.map((m) => m.id).sort()).toEqual(['m1', 'm2']); + }); + + it('trigger column defaults to 1 for legacy inserts without explicit value', () => { + // The schema default is 1 (see src/db/schema.ts INBOUND_SCHEMA) — existing + // rows / tests without the column set are effectively wake-eligible. + getInboundDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, content) + VALUES ('m1', 'chat', datetime('now'), 'pending', '{"text":"hi"}')`, + ) + .run(); + const [msg] = getPendingMessages(); + expect(msg.trigger).toBe(1); + }); +}); + describe('routing', () => { it('should extract routing from messages', () => { getInboundDb() diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 8a4ec7d..3f0e364 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -72,6 +72,19 @@ export async function runPollLoop(config: PollLoopConfig): Promise { continue; } + // Accumulate gate: if the batch contains only trigger=0 rows + // (context-only, router-stored under ignored_message_policy='accumulate'), + // don't wake the agent. Leave them `pending` — they'll ride along the + // next time a real trigger=1 message lands via this same getPendingMessages + // query. Without this gate, a warm container keeps processing + // (and potentially responding to) every accumulate-only batch, defeating + // the "store as context, don't engage" contract. Host-side countDueMessages + // gates the same way for wake-from-cold (see src/db/session-db.ts). + if (!messages.some((m) => m.trigger === 1)) { + await sleep(POLL_INTERVAL_MS); + continue; + } + const ids = messages.map((m) => m.id); markProcessing(ids);