fix(host-sweep): clear orphan processing_ack on kill to prevent claim-stuck loop
When the host kills a container (absolute-ceiling, claim-stuck, or crashed), resetStuckProcessingRows reset messages_in but left orphan rows in processing_ack. The next sweep tick spawned a fresh container and, on the same tick, ran enforceRunningContainerSla against outbound.db that still contained the previous container's claim with a hours-old status_changed timestamp — instant kill-claim, before the agent-runner could open outbound.db to run its own clearStaleProcessingAcks(). Loop until tries hit MAX_TRIES. Add deleteOrphanProcessingClaims() in session-db and call it at the end of resetStuckProcessingRows. Safe to write outbound.db here because the host only enters this path after killContainer (or when no container is running). Tests in host-sweep.test.ts cover the helper plus the regression: orphan claim from a 2h-old kill is now removed atomically with the messages_in reset, so the next sweep tick sees an empty claims list and the freshly respawned container survives long enough to start its agent-runner. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -180,6 +180,19 @@ export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[]
|
||||
.all() as ProcessingClaim[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete orphan 'processing' rows. Called by the host after killing a
|
||||
* container so the leftover claim doesn't trip claim-stuck on the next sweep
|
||||
* tick (which would kill the freshly respawned container before its
|
||||
* agent-runner can run its own startup cleanup).
|
||||
*
|
||||
* Safe because the host only writes to outbound.db when no container is
|
||||
* running (we just killed it). Returns the number of rows deleted.
|
||||
*/
|
||||
export function deleteOrphanProcessingClaims(outDb: Database.Database): number {
|
||||
return outDb.prepare("DELETE FROM processing_ack WHERE status = 'processing'").run().changes;
|
||||
}
|
||||
|
||||
export interface ContainerState {
|
||||
current_tool: string | null;
|
||||
tool_declared_timeout_ms: number | null;
|
||||
|
||||
@@ -3,9 +3,17 @@
|
||||
* ACTION-ITEMS item 9. Lives on the pure helper `decideStuckAction` so we
|
||||
* don't have to mock the filesystem or the container runner.
|
||||
*/
|
||||
import Database from 'better-sqlite3';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, decideStuckAction } from './host-sweep.js';
|
||||
import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js';
|
||||
import {
|
||||
ABSOLUTE_CEILING_MS,
|
||||
CLAIM_STUCK_MS,
|
||||
_resetStuckProcessingRowsForTesting,
|
||||
decideStuckAction,
|
||||
} from './host-sweep.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
const BASE = Date.parse('2026-04-20T12:00:00.000Z');
|
||||
|
||||
@@ -144,3 +152,143 @@ describe('decideStuckAction', () => {
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
});
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Orphan claim cleanup (regression test for the SIGKILL → claim-stuck loop)
|
||||
//
|
||||
// Repro of the production bug seen 2026-04-30: container A claimed message M
|
||||
// (writes processing_ack row with status='processing'). Host kills A by
|
||||
// absolute-ceiling. Old behavior: messages_in.M was reset to pending but
|
||||
// processing_ack.M survived. On the next sweep tick, wakeContainer spawned B,
|
||||
// the same-tick SLA check saw M's stale claim age (hours), and SIGKILL'd B
|
||||
// before agent-runner could run clearStaleProcessingAcks(). Loop. The fix
|
||||
// deletes processing_ack 'processing' rows when the host kills/cleans the
|
||||
// container, breaking the loop atomically.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
function makeSessionDbs(): { inDb: Database.Database; outDb: Database.Database } {
|
||||
const inDb = new Database(':memory:');
|
||||
inDb.exec(`
|
||||
CREATE TABLE messages_in (
|
||||
id TEXT PRIMARY KEY,
|
||||
seq INTEGER UNIQUE,
|
||||
kind TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
process_after TEXT,
|
||||
recurrence TEXT,
|
||||
series_id TEXT,
|
||||
tries INTEGER DEFAULT 0,
|
||||
trigger INTEGER NOT NULL DEFAULT 1,
|
||||
platform_id TEXT,
|
||||
channel_type TEXT,
|
||||
thread_id TEXT,
|
||||
content TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
const outDb = new Database(':memory:');
|
||||
outDb.exec(`
|
||||
CREATE TABLE processing_ack (
|
||||
message_id TEXT PRIMARY KEY,
|
||||
status TEXT NOT NULL,
|
||||
status_changed TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
return { inDb, outDb };
|
||||
}
|
||||
|
||||
function fakeSession(): Session {
|
||||
return {
|
||||
id: 'sess-test',
|
||||
agent_group_id: 'ag-test',
|
||||
messaging_group_id: null,
|
||||
thread_id: null,
|
||||
agent_provider: null,
|
||||
status: 'active',
|
||||
container_status: 'stopped',
|
||||
last_active: null,
|
||||
created_at: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
describe('deleteOrphanProcessingClaims', () => {
|
||||
it('removes only processing rows, leaves completed/failed alone', () => {
|
||||
const { outDb } = makeSessionDbs();
|
||||
const ts = new Date().toISOString();
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-proc', 'processing', ?)").run(ts);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-done', 'completed', ?)").run(ts);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-fail', 'failed', ?)").run(ts);
|
||||
|
||||
const removed = deleteOrphanProcessingClaims(outDb);
|
||||
|
||||
expect(removed).toBe(1);
|
||||
const remaining = outDb.prepare('SELECT message_id, status FROM processing_ack ORDER BY message_id').all();
|
||||
expect(remaining).toEqual([
|
||||
{ message_id: 'm-done', status: 'completed' },
|
||||
{ message_id: 'm-fail', status: 'failed' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('returns 0 when nothing to clear', () => {
|
||||
const { outDb } = makeSessionDbs();
|
||||
expect(deleteOrphanProcessingClaims(outDb)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('resetStuckProcessingRows — orphan claim cleanup', () => {
|
||||
it('deletes orphan processing_ack rows so next sweep tick does not see them', () => {
|
||||
const { inDb, outDb } = makeSessionDbs();
|
||||
const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); // 2h ago
|
||||
|
||||
// messages_in.status stays 'pending' during processing — only the
|
||||
// container's processing_ack moves to 'processing'. See
|
||||
// src/db/schema.ts header comment on processing_ack.
|
||||
inDb
|
||||
.prepare(
|
||||
"INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES ('m-1', 1, 'chat', ?, 'pending', '{}')",
|
||||
)
|
||||
.run(claimedAt);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-1', 'processing', ?)").run(claimedAt);
|
||||
|
||||
// Sanity: the orphan claim is what would trip claim-stuck.
|
||||
expect(getProcessingClaims(outDb)).toHaveLength(1);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling');
|
||||
|
||||
// Regression assertion: orphan claim is gone — next sweep tick will see
|
||||
// an empty claims list and not kill the freshly respawned container.
|
||||
expect(getProcessingClaims(outDb)).toEqual([]);
|
||||
|
||||
// And the message itself was rescheduled with backoff (existing behavior).
|
||||
const row = inDb.prepare('SELECT status, tries, process_after FROM messages_in WHERE id = ?').get('m-1') as {
|
||||
status: string;
|
||||
tries: number;
|
||||
process_after: string | null;
|
||||
};
|
||||
expect(row.status).toBe('pending');
|
||||
expect(row.tries).toBe(1);
|
||||
expect(row.process_after).not.toBeNull();
|
||||
});
|
||||
|
||||
it('still clears orphan claims even when the inbound message has already been retried (skip path)', () => {
|
||||
// Edge case: the inbound row was already rescheduled (process_after in
|
||||
// future), so the per-message retry loop skips it. The orphan in
|
||||
// processing_ack must still be removed — otherwise the bug remains.
|
||||
const { inDb, outDb } = makeSessionDbs();
|
||||
const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString();
|
||||
const future = new Date(Date.now() + 60_000).toISOString();
|
||||
|
||||
inDb
|
||||
.prepare(
|
||||
"INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, tries, content) VALUES ('m-2', 2, 'chat', ?, 'pending', ?, 1, '{}')",
|
||||
)
|
||||
.run(claimedAt, future);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck');
|
||||
|
||||
expect(getProcessingClaims(outDb)).toEqual([]);
|
||||
const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number };
|
||||
expect(row.tries).toBe(1); // not bumped, the skip path held
|
||||
});
|
||||
});
|
||||
|
||||
@@ -33,6 +33,7 @@ import { getActiveSessions } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import {
|
||||
countDueMessages,
|
||||
deleteOrphanProcessingClaims,
|
||||
getContainerState,
|
||||
getMessageForRetry,
|
||||
getProcessingClaims,
|
||||
@@ -249,6 +250,15 @@ function enforceRunningContainerSla(
|
||||
resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck');
|
||||
}
|
||||
|
||||
export function _resetStuckProcessingRowsForTesting(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
reason: string,
|
||||
): void {
|
||||
resetStuckProcessingRows(inDb, outDb, session, reason);
|
||||
}
|
||||
|
||||
function resetStuckProcessingRows(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
@@ -285,4 +295,15 @@ function resetStuckProcessingRows(
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the orphan 'processing' rows. Without this, the next sweep tick
|
||||
// would re-read them, see the old status_changed timestamp, conclude the
|
||||
// freshly respawned container is stuck, and SIGKILL it before its
|
||||
// agent-runner has a chance to run clearStaleProcessingAcks() on startup.
|
||||
// We're safe to write outbound.db here because we just killed the container
|
||||
// that owned it (or it crashed and left no writer behind).
|
||||
const cleared = deleteOrphanProcessingClaims(outDb);
|
||||
if (cleared > 0) {
|
||||
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user