/** * Host sweep — periodic maintenance of all session DBs. * * Two-DB architecture: * - Reads processing_ack + container_state from outbound.db * - Writes to inbound.db (host-owned) for status updates + recurrence * - Uses heartbeat file mtime for liveness (never polls DB for it) * - Never writes to outbound.db — preserves single-writer-per-file invariant * * Stuck / idle detection (replaces the old IDLE_TIMEOUT setTimeout + 10-min * heartbeat threshold): * * If the container isn't running and there are 'processing' rows left over * (e.g. it crashed mid-turn) → reset them to pending with backoff + * tries++. Existing retry machinery does the rest. * * If the container IS running: * 1. Absolute ceiling: heartbeat age > max(30 min, current_bash_timeout) * → kill. Covers the "alive but silent for 30 min" case. Extended * only while Bash is declared as running longer, honouring the * user's own timeout directive. Kill then resets processing rows. * * 2. Message-scoped stuck: for each 'processing' row, tolerance = * max(60s, current_bash_timeout_ms_if_Bash_running). If * (claim_age > tolerance) AND (heartbeat_mtime <= status_changed) * → kill + reset this message + tries++. Semantics: "container * claimed a message and went quiet past tolerance since the claim." */ import type Database from 'better-sqlite3'; import fs from 'fs'; import { getActiveSessions } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { countDueMessages, deleteOrphanProcessingClaims, getContainerState, getMessageForRetry, getProcessingClaims, markMessageFailed, retryWithBackoff, syncProcessingAcks, type ContainerState, } from './db/session-db.js'; import { log } from './log.js'; import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js'; import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js'; import type { Session } from './types.js'; const SWEEP_INTERVAL_MS = 60_000; // Absolute idle ceiling for a running container. If the heartbeat file hasn't // been touched in this long, the container is either stuck or doing genuinely // nothing — kill and restart on the next inbound. export const ABSOLUTE_CEILING_MS = 30 * 60 * 1000; // Stuck tolerance window applied per 'processing' claim — "did we see any // signs of life since this message was claimed?" export const CLAIM_STUCK_MS = 60 * 1000; const MAX_TRIES = 5; const BACKOFF_BASE_MS = 5000; export type StuckDecision = | { action: 'ok' } | { action: 'kill-ceiling'; heartbeatAgeMs: number; ceilingMs: number } | { action: 'kill-claim'; messageId: string; claimAgeMs: number; toleranceMs: number }; /** * Pure decision for whether a running container should be killed this sweep * tick. Inputs are all deterministic; filesystem + DB reads happen in the * caller. */ export function decideStuckAction(args: { now: number; heartbeatMtimeMs: number; // 0 when heartbeat file absent containerState: ContainerState | null; claims: Array<{ message_id: string; status_changed: string }>; }): StuckDecision { const { now, heartbeatMtimeMs, containerState, claims } = args; const declaredBashMs = bashTimeoutMs(containerState); // Ceiling check only applies when we have an actual heartbeat timestamp. // A freshly-spawned container hasn't had any SDK activity yet so no // heartbeat file exists — if we treated that as infinitely stale we'd // kill every container within seconds of spawn. Genuinely-dead containers // that never wrote a heartbeat are caught by the separate "container // process not running" cleanup path, not here. If a fresh container is // hanging at the gate (claimed a message but never did anything) the // claim-stuck check below handles it. if (heartbeatMtimeMs !== 0) { const heartbeatAge = now - heartbeatMtimeMs; const ceiling = Math.max(ABSOLUTE_CEILING_MS, declaredBashMs ?? 0); if (heartbeatAge > ceiling) { return { action: 'kill-ceiling', heartbeatAgeMs: heartbeatAge, ceilingMs: ceiling }; } } const tolerance = Math.max(CLAIM_STUCK_MS, declaredBashMs ?? 0); for (const claim of claims) { const claimedAt = Date.parse(claim.status_changed); if (Number.isNaN(claimedAt)) continue; const claimAge = now - claimedAt; if (claimAge <= tolerance) continue; if (heartbeatMtimeMs > claimedAt) continue; return { action: 'kill-claim', messageId: claim.message_id, claimAgeMs: claimAge, toleranceMs: tolerance }; } return { action: 'ok' }; } let running = false; export function startHostSweep(): void { if (running) return; running = true; sweep(); } export function stopHostSweep(): void { running = false; } async function sweep(): Promise { if (!running) return; try { const sessions = getActiveSessions(); for (const session of sessions) { await sweepSession(session); } } catch (err) { log.error('Host sweep error', { err }); } setTimeout(sweep, SWEEP_INTERVAL_MS); } async function sweepSession(session: Session): Promise { const agentGroup = getAgentGroup(session.agent_group_id); if (!agentGroup) return; const inPath = inboundDbPath(agentGroup.id, session.id); if (!fs.existsSync(inPath)) return; let inDb: Database.Database; let outDb: Database.Database | null = null; try { inDb = openInboundDb(agentGroup.id, session.id); } catch { return; } try { outDb = openOutboundDb(agentGroup.id, session.id); } catch { // outbound.db might not exist yet (container hasn't started) } try { // 1. Sync processing_ack → messages_in status if (outDb) { syncProcessingAcks(inDb, outDb); } // 2. Wake a container if work is due and nothing is running. Ordered // before the crashed-container cleanup so a fresh container gets a chance // to clean its own orphan processing_ack rows on startup (see // container/agent-runner/src/db/connection.ts). Otherwise the reset path // would keep bumping process_after into the future, dueCount would stay 0, // and the wake would never fire. const dueCount = countDueMessages(inDb); if (dueCount > 0 && !isContainerRunning(session.id)) { log.info('Waking container for due messages', { sessionId: session.id, count: dueCount }); // wakeContainer never throws — transient spawn failures (OneCLI down, // etc.) return false and leave messages pending for the next tick. await wakeContainer(session); } const alive = isContainerRunning(session.id); // 3. Running-container SLA: absolute ceiling + per-claim stuck rules. if (alive && outDb) { enforceRunningContainerSla(inDb, outDb, session, agentGroup.id); } // 4. Crashed-container cleanup: processing rows left behind get retried. // Only fires when wake in step 2 didn't pick up the work (no due messages, // or wake failed). resetStuckProcessingRows itself is idempotent — it // skips messages already scheduled for a future retry. if (!alive && outDb) { resetStuckProcessingRows(inDb, outDb, session, 'container not running'); } // 5. Recurrence fanout for completed recurring tasks. // MODULE-HOOK:scheduling-recurrence:start const { handleRecurrence } = await import('./modules/scheduling/recurrence.js'); await handleRecurrence(inDb, session); // MODULE-HOOK:scheduling-recurrence:end } finally { inDb.close(); outDb?.close(); } } function heartbeatMtimeMs(agentGroupId: string, sessionId: string): number { const hbPath = heartbeatPath(agentGroupId, sessionId); try { return fs.statSync(hbPath).mtimeMs; } catch { return 0; } } function bashTimeoutMs(state: ContainerState | null): number | null { if (!state || state.current_tool !== 'Bash') return null; return typeof state.tool_declared_timeout_ms === 'number' ? state.tool_declared_timeout_ms : null; } function enforceRunningContainerSla( inDb: Database.Database, outDb: Database.Database, session: Session, agentGroupId: string, ): void { const decision = decideStuckAction({ now: Date.now(), heartbeatMtimeMs: heartbeatMtimeMs(agentGroupId, session.id), containerState: getContainerState(outDb), claims: getProcessingClaims(outDb), }); if (decision.action === 'ok') return; if (decision.action === 'kill-ceiling') { log.warn('Killing container past absolute ceiling', { sessionId: session.id, heartbeatAgeMs: decision.heartbeatAgeMs, ceilingMs: decision.ceilingMs, }); killContainer(session.id, 'absolute-ceiling'); resetStuckProcessingRows(inDb, outDb, session, 'absolute-ceiling'); return; } log.warn('Killing container — message claimed then silent', { sessionId: session.id, messageId: decision.messageId, claimAgeMs: decision.claimAgeMs, toleranceMs: decision.toleranceMs, }); killContainer(session.id, 'claim-stuck'); resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck'); } export function _resetStuckProcessingRowsForTesting( inDb: Database.Database, outDb: Database.Database, session: Session, reason: string, ): void { resetStuckProcessingRows(inDb, outDb, session, reason); } function resetStuckProcessingRows( inDb: Database.Database, outDb: Database.Database, session: Session, reason: string, ): void { const claims = getProcessingClaims(outDb); const now = Date.now(); for (const { message_id } of claims) { const msg = getMessageForRetry(inDb, message_id, 'pending'); if (!msg) continue; // Already rescheduled for a future retry — don't bump tries again. The // wake path (sweep step 2) will fire when process_after elapses and a // fresh container will clean the orphan claim on startup. if (msg.processAfter && Date.parse(msg.processAfter) > now) continue; if (msg.tries >= MAX_TRIES) { markMessageFailed(inDb, msg.id); log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id, reason, }); } else { const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries); const backoffSec = Math.floor(backoffMs / 1000); retryWithBackoff(inDb, msg.id, backoffSec); log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs, reason, }); } } // Drop the orphan 'processing' rows. Without this, the next sweep tick // would re-read them, see the old status_changed timestamp, conclude the // freshly respawned container is stuck, and SIGKILL it before its // agent-runner has a chance to run clearStaleProcessingAcks() on startup. // We're safe to write outbound.db here because we just killed the container // that owned it (or it crashed and left no writer behind). const cleared = deleteOrphanProcessingClaims(outDb); if (cleared > 0) { log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason }); } }