decideStuckAction treated a missing heartbeat file as heartbeatAge = Infinity, which always exceeded the 30-minute ceiling. Result: every freshly-spawned container got killed within seconds of spawn on the first sweep pass because it hadn't produced an SDK event yet (heartbeat is only touched on SDK events inside processQuery, not on boot). Skip the ceiling branch when heartbeatMtimeMs === 0. Containers that genuinely never wrote a heartbeat because they died are caught by the separate "container process not running" cleanup path. Containers that boot, claim a message, but hang at the gate are caught by the claim-stuck check below — which correctly fires regardless of heartbeat presence once claimAge exceeds tolerance. Updates the "absent heartbeat → kill-ceiling" test (which was encoding the bug) and adds a companion that the claim-stuck path still fires for absent-heartbeat containers with aged claims. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
273 lines
9.1 KiB
TypeScript
273 lines
9.1 KiB
TypeScript
/**
|
|
* Host sweep — periodic maintenance of all session DBs.
|
|
*
|
|
* Two-DB architecture:
|
|
* - Reads processing_ack + container_state from outbound.db
|
|
* - Writes to inbound.db (host-owned) for status updates + recurrence
|
|
* - Uses heartbeat file mtime for liveness (never polls DB for it)
|
|
* - Never writes to outbound.db — preserves single-writer-per-file invariant
|
|
*
|
|
* Stuck / idle detection (replaces the old IDLE_TIMEOUT setTimeout + 10-min
|
|
* heartbeat threshold):
|
|
*
|
|
* If the container isn't running and there are 'processing' rows left over
|
|
* (e.g. it crashed mid-turn) → reset them to pending with backoff +
|
|
* tries++. Existing retry machinery does the rest.
|
|
*
|
|
* If the container IS running:
|
|
* 1. Absolute ceiling: heartbeat age > max(30 min, current_bash_timeout)
|
|
* → kill. Covers the "alive but silent for 30 min" case. Extended
|
|
* only while Bash is declared as running longer, honouring the
|
|
* user's own timeout directive. Kill then resets processing rows.
|
|
*
|
|
* 2. Message-scoped stuck: for each 'processing' row, tolerance =
|
|
* max(60s, current_bash_timeout_ms_if_Bash_running). If
|
|
* (claim_age > tolerance) AND (heartbeat_mtime <= status_changed)
|
|
* → kill + reset this message + tries++. Semantics: "container
|
|
* claimed a message and went quiet past tolerance since the claim."
|
|
*/
|
|
import type Database from 'better-sqlite3';
|
|
import fs from 'fs';
|
|
|
|
import { getActiveSessions } from './db/sessions.js';
|
|
import { getAgentGroup } from './db/agent-groups.js';
|
|
import {
|
|
countDueMessages,
|
|
getContainerState,
|
|
getMessageForRetry,
|
|
getProcessingClaims,
|
|
markMessageFailed,
|
|
retryWithBackoff,
|
|
syncProcessingAcks,
|
|
type ContainerState,
|
|
} from './db/session-db.js';
|
|
import { log } from './log.js';
|
|
import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js';
|
|
import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js';
|
|
import type { Session } from './types.js';
|
|
|
|
const SWEEP_INTERVAL_MS = 60_000;
|
|
// Absolute idle ceiling for a running container. If the heartbeat file hasn't
|
|
// been touched in this long, the container is either stuck or doing genuinely
|
|
// nothing — kill and restart on the next inbound.
|
|
export const ABSOLUTE_CEILING_MS = 30 * 60 * 1000;
|
|
// Stuck tolerance window applied per 'processing' claim — "did we see any
|
|
// signs of life since this message was claimed?"
|
|
export const CLAIM_STUCK_MS = 60 * 1000;
|
|
const MAX_TRIES = 5;
|
|
const BACKOFF_BASE_MS = 5000;
|
|
|
|
export type StuckDecision =
|
|
| { action: 'ok' }
|
|
| { action: 'kill-ceiling'; heartbeatAgeMs: number; ceilingMs: number }
|
|
| { action: 'kill-claim'; messageId: string; claimAgeMs: number; toleranceMs: number };
|
|
|
|
/**
|
|
* Pure decision for whether a running container should be killed this sweep
|
|
* tick. Inputs are all deterministic; filesystem + DB reads happen in the
|
|
* caller.
|
|
*/
|
|
export function decideStuckAction(args: {
|
|
now: number;
|
|
heartbeatMtimeMs: number; // 0 when heartbeat file absent
|
|
containerState: ContainerState | null;
|
|
claims: Array<{ message_id: string; status_changed: string }>;
|
|
}): StuckDecision {
|
|
const { now, heartbeatMtimeMs, containerState, claims } = args;
|
|
const declaredBashMs = bashTimeoutMs(containerState);
|
|
|
|
// Ceiling check only applies when we have an actual heartbeat timestamp.
|
|
// A freshly-spawned container hasn't had any SDK activity yet so no
|
|
// heartbeat file exists — if we treated that as infinitely stale we'd
|
|
// kill every container within seconds of spawn. Genuinely-dead containers
|
|
// that never wrote a heartbeat are caught by the separate "container
|
|
// process not running" cleanup path, not here. If a fresh container is
|
|
// hanging at the gate (claimed a message but never did anything) the
|
|
// claim-stuck check below handles it.
|
|
if (heartbeatMtimeMs !== 0) {
|
|
const heartbeatAge = now - heartbeatMtimeMs;
|
|
const ceiling = Math.max(ABSOLUTE_CEILING_MS, declaredBashMs ?? 0);
|
|
if (heartbeatAge > ceiling) {
|
|
return { action: 'kill-ceiling', heartbeatAgeMs: heartbeatAge, ceilingMs: ceiling };
|
|
}
|
|
}
|
|
|
|
const tolerance = Math.max(CLAIM_STUCK_MS, declaredBashMs ?? 0);
|
|
for (const claim of claims) {
|
|
const claimedAt = Date.parse(claim.status_changed);
|
|
if (Number.isNaN(claimedAt)) continue;
|
|
const claimAge = now - claimedAt;
|
|
if (claimAge <= tolerance) continue;
|
|
if (heartbeatMtimeMs > claimedAt) continue;
|
|
return { action: 'kill-claim', messageId: claim.message_id, claimAgeMs: claimAge, toleranceMs: tolerance };
|
|
}
|
|
|
|
return { action: 'ok' };
|
|
}
|
|
|
|
let running = false;
|
|
|
|
export function startHostSweep(): void {
|
|
if (running) return;
|
|
running = true;
|
|
sweep();
|
|
}
|
|
|
|
export function stopHostSweep(): void {
|
|
running = false;
|
|
}
|
|
|
|
async function sweep(): Promise<void> {
|
|
if (!running) return;
|
|
|
|
try {
|
|
const sessions = getActiveSessions();
|
|
for (const session of sessions) {
|
|
await sweepSession(session);
|
|
}
|
|
} catch (err) {
|
|
log.error('Host sweep error', { err });
|
|
}
|
|
|
|
setTimeout(sweep, SWEEP_INTERVAL_MS);
|
|
}
|
|
|
|
async function sweepSession(session: Session): Promise<void> {
|
|
const agentGroup = getAgentGroup(session.agent_group_id);
|
|
if (!agentGroup) return;
|
|
|
|
const inPath = inboundDbPath(agentGroup.id, session.id);
|
|
if (!fs.existsSync(inPath)) return;
|
|
|
|
let inDb: Database.Database;
|
|
let outDb: Database.Database | null = null;
|
|
try {
|
|
inDb = openInboundDb(agentGroup.id, session.id);
|
|
} catch {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
outDb = openOutboundDb(agentGroup.id, session.id);
|
|
} catch {
|
|
// outbound.db might not exist yet (container hasn't started)
|
|
}
|
|
|
|
try {
|
|
// 1. Sync processing_ack → messages_in status
|
|
if (outDb) {
|
|
syncProcessingAcks(inDb, outDb);
|
|
}
|
|
|
|
const alive = isContainerRunning(session.id);
|
|
|
|
// 2. Crashed-container cleanup: processing rows left behind get retried.
|
|
if (!alive && outDb) {
|
|
resetStuckProcessingRows(inDb, outDb, session, 'container not running');
|
|
}
|
|
|
|
// 3. Running-container SLA: absolute ceiling + per-claim stuck rules.
|
|
if (alive && outDb) {
|
|
enforceRunningContainerSla(inDb, outDb, session, agentGroup.id);
|
|
}
|
|
|
|
// 4. Wake a container if new work is due and nothing is running.
|
|
const dueCount = countDueMessages(inDb);
|
|
if (dueCount > 0 && !isContainerRunning(session.id)) {
|
|
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
|
|
await wakeContainer(session);
|
|
}
|
|
|
|
// 5. Recurrence fanout for completed recurring tasks.
|
|
// MODULE-HOOK:scheduling-recurrence:start
|
|
const { handleRecurrence } = await import('./modules/scheduling/recurrence.js');
|
|
await handleRecurrence(inDb, session);
|
|
// MODULE-HOOK:scheduling-recurrence:end
|
|
} finally {
|
|
inDb.close();
|
|
outDb?.close();
|
|
}
|
|
}
|
|
|
|
function heartbeatMtimeMs(agentGroupId: string, sessionId: string): number {
|
|
const hbPath = heartbeatPath(agentGroupId, sessionId);
|
|
try {
|
|
return fs.statSync(hbPath).mtimeMs;
|
|
} catch {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
function bashTimeoutMs(state: ContainerState | null): number | null {
|
|
if (!state || state.current_tool !== 'Bash') return null;
|
|
return typeof state.tool_declared_timeout_ms === 'number' ? state.tool_declared_timeout_ms : null;
|
|
}
|
|
|
|
function enforceRunningContainerSla(
|
|
inDb: Database.Database,
|
|
outDb: Database.Database,
|
|
session: Session,
|
|
agentGroupId: string,
|
|
): void {
|
|
const decision = decideStuckAction({
|
|
now: Date.now(),
|
|
heartbeatMtimeMs: heartbeatMtimeMs(agentGroupId, session.id),
|
|
containerState: getContainerState(outDb),
|
|
claims: getProcessingClaims(outDb),
|
|
});
|
|
|
|
if (decision.action === 'ok') return;
|
|
|
|
if (decision.action === 'kill-ceiling') {
|
|
log.warn('Killing container past absolute ceiling', {
|
|
sessionId: session.id,
|
|
heartbeatAgeMs: decision.heartbeatAgeMs,
|
|
ceilingMs: decision.ceilingMs,
|
|
});
|
|
killContainer(session.id, 'absolute-ceiling');
|
|
resetStuckProcessingRows(inDb, outDb, session, 'absolute-ceiling');
|
|
return;
|
|
}
|
|
|
|
log.warn('Killing container — message claimed then silent', {
|
|
sessionId: session.id,
|
|
messageId: decision.messageId,
|
|
claimAgeMs: decision.claimAgeMs,
|
|
toleranceMs: decision.toleranceMs,
|
|
});
|
|
killContainer(session.id, 'claim-stuck');
|
|
resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck');
|
|
}
|
|
|
|
function resetStuckProcessingRows(
|
|
inDb: Database.Database,
|
|
outDb: Database.Database,
|
|
session: Session,
|
|
reason: string,
|
|
): void {
|
|
const claims = getProcessingClaims(outDb);
|
|
for (const { message_id } of claims) {
|
|
const msg = getMessageForRetry(inDb, message_id, 'pending');
|
|
if (!msg) continue;
|
|
|
|
if (msg.tries >= MAX_TRIES) {
|
|
markMessageFailed(inDb, msg.id);
|
|
log.warn('Message marked as failed after max retries', {
|
|
messageId: msg.id,
|
|
sessionId: session.id,
|
|
reason,
|
|
});
|
|
} else {
|
|
const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries);
|
|
const backoffSec = Math.floor(backoffMs / 1000);
|
|
retryWithBackoff(inDb, msg.id, backoffSec);
|
|
log.info('Reset stale message with backoff', {
|
|
messageId: msg.id,
|
|
tries: msg.tries,
|
|
backoffMs,
|
|
reason,
|
|
});
|
|
}
|
|
}
|
|
}
|