diff --git a/src/db/session-db.ts b/src/db/session-db.ts index 48e9297..ca15276 100644 --- a/src/db/session-db.ts +++ b/src/db/session-db.ts @@ -180,6 +180,19 @@ export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[] .all() as ProcessingClaim[]; } +/** + * Delete orphan 'processing' rows. Called by the host after killing a + * container so the leftover claim doesn't trip claim-stuck on the next sweep + * tick (which would kill the freshly respawned container before its + * agent-runner can run its own startup cleanup). + * + * Safe because the host only writes to outbound.db when no container is + * running (we just killed it). Returns the number of rows deleted. + */ +export function deleteOrphanProcessingClaims(outDb: Database.Database): number { + return outDb.prepare("DELETE FROM processing_ack WHERE status = 'processing'").run().changes; +} + export interface ContainerState { current_tool: string | null; tool_declared_timeout_ms: number | null; diff --git a/src/host-sweep.test.ts b/src/host-sweep.test.ts index eefcc8a..bd2e233 100644 --- a/src/host-sweep.test.ts +++ b/src/host-sweep.test.ts @@ -3,9 +3,17 @@ * ACTION-ITEMS item 9. Lives on the pure helper `decideStuckAction` so we * don't have to mock the filesystem or the container runner. */ +import Database from 'better-sqlite3'; import { describe, expect, it } from 'vitest'; -import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, decideStuckAction } from './host-sweep.js'; +import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js'; +import { + ABSOLUTE_CEILING_MS, + CLAIM_STUCK_MS, + _resetStuckProcessingRowsForTesting, + decideStuckAction, +} from './host-sweep.js'; +import type { Session } from './types.js'; const BASE = Date.parse('2026-04-20T12:00:00.000Z'); @@ -144,3 +152,143 @@ describe('decideStuckAction', () => { expect(res.action).toBe('ok'); }); }); + +// ───────────────────────────────────────────────────────────────────────────── +// Orphan claim cleanup (regression test for the SIGKILL → claim-stuck loop) +// +// Repro of the production bug seen 2026-04-30: container A claimed message M +// (writes processing_ack row with status='processing'). Host kills A by +// absolute-ceiling. Old behavior: messages_in.M was reset to pending but +// processing_ack.M survived. On the next sweep tick, wakeContainer spawned B, +// the same-tick SLA check saw M's stale claim age (hours), and SIGKILL'd B +// before agent-runner could run clearStaleProcessingAcks(). Loop. The fix +// deletes processing_ack 'processing' rows when the host kills/cleans the +// container, breaking the loop atomically. +// ───────────────────────────────────────────────────────────────────────────── + +function makeSessionDbs(): { inDb: Database.Database; outDb: Database.Database } { + const inDb = new Database(':memory:'); + inDb.exec(` + CREATE TABLE messages_in ( + id TEXT PRIMARY KEY, + seq INTEGER UNIQUE, + kind TEXT NOT NULL, + timestamp TEXT NOT NULL, + status TEXT DEFAULT 'pending', + process_after TEXT, + recurrence TEXT, + series_id TEXT, + tries INTEGER DEFAULT 0, + trigger INTEGER NOT NULL DEFAULT 1, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + content TEXT NOT NULL + ); + `); + const outDb = new Database(':memory:'); + outDb.exec(` + CREATE TABLE processing_ack ( + message_id TEXT PRIMARY KEY, + status TEXT NOT NULL, + status_changed TEXT NOT NULL + ); + `); + return { inDb, outDb }; +} + +function fakeSession(): Session { + return { + id: 'sess-test', + agent_group_id: 'ag-test', + messaging_group_id: null, + thread_id: null, + agent_provider: null, + status: 'active', + container_status: 'stopped', + last_active: null, + created_at: new Date().toISOString(), + }; +} + +describe('deleteOrphanProcessingClaims', () => { + it('removes only processing rows, leaves completed/failed alone', () => { + const { outDb } = makeSessionDbs(); + const ts = new Date().toISOString(); + outDb.prepare("INSERT INTO processing_ack VALUES ('m-proc', 'processing', ?)").run(ts); + outDb.prepare("INSERT INTO processing_ack VALUES ('m-done', 'completed', ?)").run(ts); + outDb.prepare("INSERT INTO processing_ack VALUES ('m-fail', 'failed', ?)").run(ts); + + const removed = deleteOrphanProcessingClaims(outDb); + + expect(removed).toBe(1); + const remaining = outDb.prepare('SELECT message_id, status FROM processing_ack ORDER BY message_id').all(); + expect(remaining).toEqual([ + { message_id: 'm-done', status: 'completed' }, + { message_id: 'm-fail', status: 'failed' }, + ]); + }); + + it('returns 0 when nothing to clear', () => { + const { outDb } = makeSessionDbs(); + expect(deleteOrphanProcessingClaims(outDb)).toBe(0); + }); +}); + +describe('resetStuckProcessingRows — orphan claim cleanup', () => { + it('deletes orphan processing_ack rows so next sweep tick does not see them', () => { + const { inDb, outDb } = makeSessionDbs(); + const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); // 2h ago + + // messages_in.status stays 'pending' during processing — only the + // container's processing_ack moves to 'processing'. See + // src/db/schema.ts header comment on processing_ack. + inDb + .prepare( + "INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES ('m-1', 1, 'chat', ?, 'pending', '{}')", + ) + .run(claimedAt); + outDb.prepare("INSERT INTO processing_ack VALUES ('m-1', 'processing', ?)").run(claimedAt); + + // Sanity: the orphan claim is what would trip claim-stuck. + expect(getProcessingClaims(outDb)).toHaveLength(1); + + _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling'); + + // Regression assertion: orphan claim is gone — next sweep tick will see + // an empty claims list and not kill the freshly respawned container. + expect(getProcessingClaims(outDb)).toEqual([]); + + // And the message itself was rescheduled with backoff (existing behavior). + const row = inDb.prepare('SELECT status, tries, process_after FROM messages_in WHERE id = ?').get('m-1') as { + status: string; + tries: number; + process_after: string | null; + }; + expect(row.status).toBe('pending'); + expect(row.tries).toBe(1); + expect(row.process_after).not.toBeNull(); + }); + + it('still clears orphan claims even when the inbound message has already been retried (skip path)', () => { + // Edge case: the inbound row was already rescheduled (process_after in + // future), so the per-message retry loop skips it. The orphan in + // processing_ack must still be removed — otherwise the bug remains. + const { inDb, outDb } = makeSessionDbs(); + const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); + const future = new Date(Date.now() + 60_000).toISOString(); + + inDb + .prepare( + "INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, tries, content) VALUES ('m-2', 2, 'chat', ?, 'pending', ?, 1, '{}')", + ) + .run(claimedAt, future); + outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt); + + _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck'); + + expect(getProcessingClaims(outDb)).toEqual([]); + const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number }; + expect(row.tries).toBe(1); // not bumped, the skip path held + }); +}); diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 69a4d61..30cdc64 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -33,6 +33,7 @@ import { getActiveSessions } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { countDueMessages, + deleteOrphanProcessingClaims, getContainerState, getMessageForRetry, getProcessingClaims, @@ -249,6 +250,15 @@ function enforceRunningContainerSla( resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck'); } +export function _resetStuckProcessingRowsForTesting( + inDb: Database.Database, + outDb: Database.Database, + session: Session, + reason: string, +): void { + resetStuckProcessingRows(inDb, outDb, session, reason); +} + function resetStuckProcessingRows( inDb: Database.Database, outDb: Database.Database, @@ -285,4 +295,15 @@ function resetStuckProcessingRows( }); } } + + // Drop the orphan 'processing' rows. Without this, the next sweep tick + // would re-read them, see the old status_changed timestamp, conclude the + // freshly respawned container is stuck, and SIGKILL it before its + // agent-runner has a chance to run clearStaleProcessingAcks() on startup. + // We're safe to write outbound.db here because we just killed the container + // that owned it (or it crashed and left no writer behind). + const cleared = deleteOrphanProcessingClaims(outDb); + if (cleared > 0) { + log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason }); + } }