diff --git a/src/host-sweep.test.ts b/src/host-sweep.test.ts index bd2e233..155b1b1 100644 --- a/src/host-sweep.test.ts +++ b/src/host-sweep.test.ts @@ -7,12 +7,7 @@ import Database from 'better-sqlite3'; import { describe, expect, it } from 'vitest'; import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js'; -import { - ABSOLUTE_CEILING_MS, - CLAIM_STUCK_MS, - _resetStuckProcessingRowsForTesting, - decideStuckAction, -} from './host-sweep.js'; +import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, resetStuckProcessingRows, decideStuckAction } from './host-sweep.js'; import type { Session } from './types.js'; const BASE = Date.parse('2026-04-20T12:00:00.000Z'); @@ -253,7 +248,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => { // Sanity: the orphan claim is what would trip claim-stuck. expect(getProcessingClaims(outDb)).toHaveLength(1); - _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling'); + resetStuckProcessingRows(inDb, outDb, fakeSession(), 'absolute-ceiling', outDb); // Regression assertion: orphan claim is gone — next sweep tick will see // an empty claims list and not kill the freshly respawned container. @@ -285,7 +280,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => { .run(claimedAt, future); outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt); - _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck'); + resetStuckProcessingRows(inDb, outDb, fakeSession(), 'claim-stuck', outDb); expect(getProcessingClaims(outDb)).toEqual([]); const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number }; diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 09c82ac..b10ee0d 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -250,20 +250,28 @@ function enforceRunningContainerSla( resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck'); } -export function _resetStuckProcessingRowsForTesting( - inDb: Database.Database, - outDb: Database.Database, - session: Session, - reason: string, -): void { - resetStuckProcessingRows(inDb, outDb, session, reason); -} - -function resetStuckProcessingRows( +/** + * Reset retries on inbound rows the container claimed but never acked, and + * delete the orphan `processing_ack` rows so the next sweep tick doesn't + * see them. + * + * Safe to call only when the container that owned `outbound.db` is dead — + * production callers invoke this either in the `!alive` branch or right + * after `killContainer`. Without that guarantee, the orphan-claim delete + * would race the container's own writer. + * + * `writableOutDb` is the same handle outbound writes go through. When + * omitted (typical production path) the function reopens `outbound.db` + * read-write by session path for the delete and closes that handle on + * exit. Callers that already hold a writable handle — including tests + * using in-memory DBs — can pass it in to skip the reopen. + */ +export function resetStuckProcessingRows( inDb: Database.Database, outDb: Database.Database, session: Session, reason: string, + writableOutDb?: Database.Database, ): void { const claims = getProcessingClaims(outDb); const now = Date.now(); @@ -300,19 +308,17 @@ function resetStuckProcessingRows( // would re-read them, see the old status_changed timestamp, conclude the // freshly respawned container is stuck, and SIGKILL it before its // agent-runner has a chance to run clearStaleProcessingAcks() on startup. - // We're safe to write outbound.db here because we just killed the container - // that owned it (or it crashed and left no writer behind). - // outDb was opened readonly for reads above; reopen with write access for this delete. - let outDbRw: Database.Database | null = null; + const ownsDb = !writableOutDb; + let useDb: Database.Database | null = writableOutDb ?? null; try { - outDbRw = openOutboundDbRw(session.agent_group_id, session.id); - const cleared = deleteOrphanProcessingClaims(outDbRw); + if (!useDb) useDb = openOutboundDbRw(session.agent_group_id, session.id); + const cleared = deleteOrphanProcessingClaims(useDb); if (cleared > 0) { log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason }); } } catch (err) { log.warn('Failed to clear orphan processing claims', { sessionId: session.id, err }); } finally { - outDbRw?.close(); + if (ownsDb) useDb?.close(); } }