fix(host-sweep): regression in #2183 — orphan-claim delete missed in tests
#2183 added orphan-claim cleanup that reopens `outbound.db` by session path (`openOutboundDbRw(session.agent_group_id, session.id)`) so the delete runs against a writable handle even when callers pass a readonly one. That works for the production caller — there's a real on-disk session DB at the expected path. The test wrapper `_resetStuckProcessingRowsForTesting` (introduced in the same series, #2151) is called with in-memory DBs that have no on-disk path. The reopen creates a fresh empty file at `<DATA_DIR>/v2-sessions/ag-test/sess-test/outbound.db`, runs the delete against that, and leaves the in-memory `outDb` (which the test reads afterward) untouched. The two `resetStuckProcessingRows — orphan claim cleanup` tests assert `getProcessingClaims(outDb).toEqual([])` after the call and fail on the row that's still there. Fix: drop the `_…ForTesting` wrapper, export `resetStuckProcessingRows` directly with an optional `writableOutDb` parameter. When omitted (production), the function reopens `outbound.db` RW by session path — existing behavior, existing safety guarantee. When provided (tests, or any future caller that already holds a writable handle), the function uses it directly and skips the reopen. The optional parameter has a real meaning, not a "for tests" hack. Public API surface change: `_resetStuckProcessingRowsForTesting` is gone, `resetStuckProcessingRows` is now exported. No other callers inside the repo besides the test.
This commit is contained in:
@@ -7,12 +7,7 @@ import Database from 'better-sqlite3';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js';
|
||||
import {
|
||||
ABSOLUTE_CEILING_MS,
|
||||
CLAIM_STUCK_MS,
|
||||
_resetStuckProcessingRowsForTesting,
|
||||
decideStuckAction,
|
||||
} from './host-sweep.js';
|
||||
import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, resetStuckProcessingRows, decideStuckAction } from './host-sweep.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
const BASE = Date.parse('2026-04-20T12:00:00.000Z');
|
||||
@@ -253,7 +248,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => {
|
||||
// Sanity: the orphan claim is what would trip claim-stuck.
|
||||
expect(getProcessingClaims(outDb)).toHaveLength(1);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling');
|
||||
resetStuckProcessingRows(inDb, outDb, fakeSession(), 'absolute-ceiling', outDb);
|
||||
|
||||
// Regression assertion: orphan claim is gone — next sweep tick will see
|
||||
// an empty claims list and not kill the freshly respawned container.
|
||||
@@ -285,7 +280,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => {
|
||||
.run(claimedAt, future);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck');
|
||||
resetStuckProcessingRows(inDb, outDb, fakeSession(), 'claim-stuck', outDb);
|
||||
|
||||
expect(getProcessingClaims(outDb)).toEqual([]);
|
||||
const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number };
|
||||
|
||||
@@ -250,20 +250,28 @@ function enforceRunningContainerSla(
|
||||
resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck');
|
||||
}
|
||||
|
||||
export function _resetStuckProcessingRowsForTesting(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
reason: string,
|
||||
): void {
|
||||
resetStuckProcessingRows(inDb, outDb, session, reason);
|
||||
}
|
||||
|
||||
function resetStuckProcessingRows(
|
||||
/**
|
||||
* Reset retries on inbound rows the container claimed but never acked, and
|
||||
* delete the orphan `processing_ack` rows so the next sweep tick doesn't
|
||||
* see them.
|
||||
*
|
||||
* Safe to call only when the container that owned `outbound.db` is dead —
|
||||
* production callers invoke this either in the `!alive` branch or right
|
||||
* after `killContainer`. Without that guarantee, the orphan-claim delete
|
||||
* would race the container's own writer.
|
||||
*
|
||||
* `writableOutDb` is the same handle outbound writes go through. When
|
||||
* omitted (typical production path) the function reopens `outbound.db`
|
||||
* read-write by session path for the delete and closes that handle on
|
||||
* exit. Callers that already hold a writable handle — including tests
|
||||
* using in-memory DBs — can pass it in to skip the reopen.
|
||||
*/
|
||||
export function resetStuckProcessingRows(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
reason: string,
|
||||
writableOutDb?: Database.Database,
|
||||
): void {
|
||||
const claims = getProcessingClaims(outDb);
|
||||
const now = Date.now();
|
||||
@@ -300,19 +308,17 @@ function resetStuckProcessingRows(
|
||||
// would re-read them, see the old status_changed timestamp, conclude the
|
||||
// freshly respawned container is stuck, and SIGKILL it before its
|
||||
// agent-runner has a chance to run clearStaleProcessingAcks() on startup.
|
||||
// We're safe to write outbound.db here because we just killed the container
|
||||
// that owned it (or it crashed and left no writer behind).
|
||||
// outDb was opened readonly for reads above; reopen with write access for this delete.
|
||||
let outDbRw: Database.Database | null = null;
|
||||
const ownsDb = !writableOutDb;
|
||||
let useDb: Database.Database | null = writableOutDb ?? null;
|
||||
try {
|
||||
outDbRw = openOutboundDbRw(session.agent_group_id, session.id);
|
||||
const cleared = deleteOrphanProcessingClaims(outDbRw);
|
||||
if (!useDb) useDb = openOutboundDbRw(session.agent_group_id, session.id);
|
||||
const cleared = deleteOrphanProcessingClaims(useDb);
|
||||
if (cleared > 0) {
|
||||
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn('Failed to clear orphan processing claims', { sessionId: session.id, err });
|
||||
} finally {
|
||||
outDbRw?.close();
|
||||
if (ownsDb) useDb?.close();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user