diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 9bf2551..772f4f1 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -64,10 +64,58 @@ export function getOutboundDb(): Database { if (!cols.has('updated_at')) { _outbound.exec(`ALTER TABLE session_state ADD COLUMN updated_at TEXT NOT NULL DEFAULT ''`); } + // container_state: tracks the current tool in flight (if any) so the host + // sweep can widen its stuck tolerance when Bash is running with a user- + // declared long timeout. Forward-compat for older outbound.db files. + _outbound.exec(` + CREATE TABLE IF NOT EXISTS container_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + current_tool TEXT, + tool_declared_timeout_ms INTEGER, + tool_started_at TEXT, + updated_at TEXT NOT NULL + ); + `); } return _outbound; } +/** + * Record that a tool is starting. `declaredTimeoutMs` is the tool's own + * timeout hint when one is available (Bash exposes it in the tool_use input); + * omit for tools with no declared timeout. + */ +export function setContainerToolInFlight(tool: string, declaredTimeoutMs: number | null): void { + const now = new Date().toISOString(); + getOutboundDb() + .prepare( + `INSERT INTO container_state (id, current_tool, tool_declared_timeout_ms, tool_started_at, updated_at) + VALUES (1, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + current_tool = excluded.current_tool, + tool_declared_timeout_ms = excluded.tool_declared_timeout_ms, + tool_started_at = excluded.tool_started_at, + updated_at = excluded.updated_at`, + ) + .run(tool, declaredTimeoutMs, now, now); +} + +/** Clear the in-flight tool — called on PostToolUse / PostToolUseFailure. */ +export function clearContainerToolInFlight(): void { + const now = new Date().toISOString(); + getOutboundDb() + .prepare( + `INSERT INTO container_state (id, current_tool, tool_declared_timeout_ms, tool_started_at, updated_at) + VALUES (1, NULL, NULL, NULL, ?) + ON CONFLICT(id) DO UPDATE SET + current_tool = NULL, + tool_declared_timeout_ms = NULL, + tool_started_at = NULL, + updated_at = excluded.updated_at`, + ) + .run(now); +} + /** * Touch the heartbeat file — replaces the old touchProcessing() DB writes. * The host checks this file's mtime for stale container detection. @@ -157,6 +205,13 @@ export function initTestSessionDb(): { inbound: Database; outbound: Database } { value TEXT NOT NULL, updated_at TEXT NOT NULL ); + CREATE TABLE container_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + current_tool TEXT, + tool_declared_timeout_ms INTEGER, + tool_started_at TEXT, + updated_at TEXT NOT NULL + ); `); return { inbound: _inbound, outbound: _outbound }; diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 742de14..8a4ec7d 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -8,7 +8,6 @@ import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types const POLL_INTERVAL_MS = 1000; const ACTIVE_POLL_INTERVAL_MS = 500; -const IDLE_END_MS = 20_000; // End stream after 20s with no SDK events function log(msg: string): void { console.error(`[poll-loop] ${msg}`); @@ -267,9 +266,13 @@ interface QueryResult { async function processQuery(query: AgentQuery, routing: RoutingContext): Promise { let queryContinuation: string | undefined; let done = false; - let lastEventTime = Date.now(); - // Concurrent polling: push follow-ups, checkpoint WAL, detect idle + // Concurrent polling: push follow-ups into the active query as they arrive. + // We do NOT force-end the stream on silence — keeping the query open is + // strictly cheaper than close+reopen (no cold prompt cache, no reconnect). + // Stream liveness is decided host-side via the heartbeat file + processing + // claim age (see src/host-sweep.ts); if something is truly stuck, the host + // will kill the container and messages get reset to pending. const pollHandle = setInterval(() => { if (done) return; @@ -296,19 +299,11 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise query.push(prompt); markCompleted(newIds); - lastEventTime = Date.now(); // new input counts as activity - } - - // End stream when agent is idle: no SDK events and no pending messages - if (Date.now() - lastEventTime > IDLE_END_MS) { - log(`No SDK events for ${IDLE_END_MS / 1000}s, ending query`); - query.end(); } }, ACTIVE_POLL_INTERVAL_MS); try { for await (const event of query.events) { - lastEventTime = Date.now(); handleEvent(event, routing); touchHeartbeat(); diff --git a/container/agent-runner/src/providers/claude.ts b/container/agent-runner/src/providers/claude.ts index 97fe44a..a797f06 100644 --- a/container/agent-runner/src/providers/claude.ts +++ b/container/agent-runner/src/providers/claude.ts @@ -3,6 +3,7 @@ import path from 'path'; import { query as sdkQuery, type HookCallback, type PreCompactHookInput } from '@anthropic-ai/claude-agent-sdk'; +import { clearContainerToolInFlight, setContainerToolInFlight } from '../db/connection.js'; import { registerProvider } from './provider-registry.js'; import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent, ProviderOptions, QueryInput } from './types.js'; @@ -10,10 +11,28 @@ function log(msg: string): void { console.error(`[claude-provider] ${msg}`); } -// Deferred SDK builtins that would sidestep nanoclaw's own scheduling. -// Scheduling goes through mcp__nanoclaw__schedule_task so that tasks are -// durable across sessions/restarts and gated by our pre-task script hook. -const SDK_DISALLOWED_TOOLS = ['CronCreate', 'CronDelete', 'CronList', 'ScheduleWakeup']; +// Deferred SDK builtins that either sidestep nanoclaw's own scheduling or +// don't fit our async message-passing model (they're designed for Claude +// Code's interactive UI and would hang here). +// +// - CronCreate / CronDelete / CronList / ScheduleWakeup: we have durable +// scheduling via mcp__nanoclaw__schedule_task. +// - AskUserQuestion: SDK returns a placeholder instead of blocking on a +// real answer — we have mcp__nanoclaw__ask_user_question that persists +// the question and blocks on the real reply. +// - EnterPlanMode / ExitPlanMode / EnterWorktree / ExitWorktree: Claude +// Code UI affordances; in a headless container they'd appear stuck. +const SDK_DISALLOWED_TOOLS = [ + 'CronCreate', + 'CronDelete', + 'CronList', + 'ScheduleWakeup', + 'AskUserQuestion', + 'EnterPlanMode', + 'ExitPlanMode', + 'EnterWorktree', + 'ExitWorktree', +]; // Tool allowlist for NanoClaw agent containers const TOOL_ALLOWLIST = [ @@ -122,6 +141,43 @@ function formatTranscriptMarkdown(messages: ParsedMessage[], title?: string | nu return lines.join('\n'); } +/** + * PreToolUse hook: record the current tool + its declared timeout so the host + * sweep can widen its stuck tolerance while Bash is running a long-declared + * script. Defense-in-depth: if SDK_DISALLOWED_TOOLS slips through somehow, + * block the call here instead of letting the agent hang. + */ +const preToolUseHook: HookCallback = async (input) => { + const i = input as { tool_name?: string; tool_input?: Record }; + const toolName = i.tool_name ?? ''; + if (SDK_DISALLOWED_TOOLS.includes(toolName)) { + return { + decision: 'block', + stopReason: `Tool '${toolName}' is not available in this environment — use the nanoclaw equivalent.`, + } as unknown as ReturnType; + } + // Bash exposes its timeout via the tool_input.timeout field (ms). Any other + // tool: no declared timeout. + const declaredTimeoutMs = + toolName === 'Bash' && typeof i.tool_input?.timeout === 'number' ? (i.tool_input.timeout as number) : null; + try { + setContainerToolInFlight(toolName, declaredTimeoutMs); + } catch (err) { + log(`PreToolUse: failed to record container_state: ${err instanceof Error ? err.message : String(err)}`); + } + return { continue: true }; +}; + +/** Clear in-flight tool on PostToolUse / PostToolUseFailure. */ +const postToolUseHook: HookCallback = async () => { + try { + clearContainerToolInFlight(); + } catch (err) { + log(`PostToolUse: failed to clear container_state: ${err instanceof Error ? err.message : String(err)}`); + } + return { continue: true }; +}; + function createPreCompactHook(assistantName?: string): HookCallback { return async (input) => { const preCompact = input as PreCompactHookInput; @@ -224,6 +280,9 @@ export class ClaudeProvider implements AgentProvider { settingSources: ['project', 'user'], mcpServers: this.mcpServers, hooks: { + PreToolUse: [{ hooks: [preToolUseHook] }], + PostToolUse: [{ hooks: [postToolUseHook] }], + PostToolUseFailure: [{ hooks: [postToolUseHook] }], PreCompact: [{ hooks: [createPreCompactHook(this.assistantName)] }], }, }, diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index 0abbf9d..0e856f6 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -10,7 +10,6 @@ import type { ChannelAdapter, ChannelSetup, InboundMessage, OutboundMessage } fr // Mock container runner vi.mock('../container-runner.js', () => ({ wakeContainer: vi.fn().mockResolvedValue(undefined), - resetContainerIdleTimer: vi.fn(), isContainerRunning: vi.fn().mockReturnValue(false), getActiveContainerCount: vi.fn().mockReturnValue(0), killContainer: vi.fn(), diff --git a/src/container-runner.ts b/src/container-runner.ts index c3fb24f..9764126 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -9,7 +9,7 @@ import path from 'path'; import { OneCLI } from '@onecli-sh/sdk'; -import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZONE } from './config.js'; +import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, ONECLI_URL, TIMEZONE } from './config.js'; import { readContainerConfig, writeContainerConfig } from './container-config.js'; import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js'; import { getAgentGroup } from './db/agent-groups.js'; @@ -26,12 +26,7 @@ import { type ProviderContainerContribution, type VolumeMount, } from './providers/provider-container-registry.js'; -import { - markContainerRunning, - markContainerStopped, - sessionDir, - writeSessionRouting, -} from './session-manager.js'; +import { markContainerRunning, markContainerStopped, sessionDir, writeSessionRouting } from './session-manager.js'; import type { AgentGroup, Session } from './types.js'; const onecli = new OneCLI({ url: ONECLI_URL }); @@ -125,22 +120,12 @@ async function spawnContainer(session: Session): Promise { // stdout is unused in v2 (all IO is via session DB) container.stdout?.on('data', () => {}); - // Idle timeout: kill container after IDLE_TIMEOUT of no activity - let idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT); - - const resetIdle = () => { - clearTimeout(idleTimer); - idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT); - }; - - // Reset idle timer when the host detects new messages_out (called by delivery.ts) - const entry = activeContainers.get(session.id); - if (entry) { - (entry as { resetIdle?: () => void }).resetIdle = resetIdle; - } + // No host-side idle timeout. Stale/stuck detection is driven by the host + // sweep reading heartbeat mtime + processing_ack claim age + container_state + // (see src/host-sweep.ts). This avoids killing long-running legitimate work + // on a wall-clock timer. container.on('close', (code) => { - clearTimeout(idleTimer); activeContainers.delete(session.id); markContainerStopped(session.id); stopTypingRefresh(session.id); @@ -148,7 +133,6 @@ async function spawnContainer(session: Session): Promise { }); container.on('error', (err) => { - clearTimeout(idleTimer); activeContainers.delete(session.id); markContainerStopped(session.id); stopTypingRefresh(session.id); @@ -156,12 +140,6 @@ async function spawnContainer(session: Session): Promise { }); } -/** Reset the idle timer for a session's container (called when messages_out are delivered). */ -export function resetContainerIdleTimer(sessionId: string): void { - const entry = activeContainers.get(sessionId) as { resetIdle?: () => void } | undefined; - entry?.resetIdle?.(); -} - /** Kill a container for a session. */ export function killContainer(sessionId: string, reason: string): void { const entry = activeContainers.get(sessionId); diff --git a/src/db/schema.ts b/src/db/schema.ts index 044d717..47d4c9f 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -213,4 +213,16 @@ CREATE TABLE IF NOT EXISTS session_state ( value TEXT NOT NULL, updated_at TEXT NOT NULL ); + +-- Current tool-in-flight state. Single-row table (id=1). Container writes on +-- PreToolUse and clears on PostToolUse / PostToolUseFailure. Host reads in the +-- sweep to extend the stuck-tolerance window when Bash is running with a +-- declared timeout > 60s (long-running scripts shouldn't be flagged as stuck). +CREATE TABLE IF NOT EXISTS container_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + current_tool TEXT, + tool_declared_timeout_ms INTEGER, + tool_started_at TEXT, + updated_at TEXT NOT NULL +); `; diff --git a/src/db/session-db.ts b/src/db/session-db.ts index 05104cf..a73ca5c 100644 --- a/src/db/session-db.ts +++ b/src/db/session-db.ts @@ -161,6 +161,47 @@ export function getStuckProcessingIds(outDb: Database.Database): string[] { ).map((r) => r.message_id); } +export interface ProcessingClaim { + message_id: string; + status_changed: string; +} + +/** Return processing_ack rows still in 'processing' with their claim timestamps. */ +export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[] { + return outDb + .prepare( + "SELECT message_id, status_changed FROM processing_ack WHERE status = 'processing'", + ) + .all() as ProcessingClaim[]; +} + +export interface ContainerState { + current_tool: string | null; + tool_declared_timeout_ms: number | null; + tool_started_at: string | null; +} + +/** + * Read the container's current tool-in-flight state, if any. Returns null + * when either the table doesn't exist yet (older session DB) or no tool is + * active. Host sweep reads this to widen stuck-detection tolerance while + * Bash is running with a long declared timeout. + */ +export function getContainerState(outDb: Database.Database): ContainerState | null { + try { + const row = outDb + .prepare( + `SELECT current_tool, tool_declared_timeout_ms, tool_started_at + FROM container_state WHERE id = 1`, + ) + .get() as ContainerState | undefined; + return row ?? null; + } catch { + // Table not present on older session DBs — treat as "no tool in flight". + return null; + } +} + // --------------------------------------------------------------------------- // messages_out (read-only from host) // --------------------------------------------------------------------------- diff --git a/src/delivery.test.ts b/src/delivery.test.ts index d631836..a5e1efd 100644 --- a/src/delivery.test.ts +++ b/src/delivery.test.ts @@ -14,7 +14,6 @@ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; vi.mock('./container-runner.js', () => ({ wakeContainer: vi.fn().mockResolvedValue(undefined), - resetContainerIdleTimer: vi.fn(), isContainerRunning: vi.fn().mockReturnValue(false), killContainer: vi.fn(), buildAgentGroupImage: vi.fn().mockResolvedValue(undefined), diff --git a/src/delivery.ts b/src/delivery.ts index 7b1ee7d..2e193d4 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -23,7 +23,6 @@ import { import { log } from './log.js'; import { normalizeOptions } from './channels/ask-question.js'; import { clearOutbox, openInboundDb, openOutboundDb, readOutboxFiles } from './session-manager.js'; -import { resetContainerIdleTimer } from './container-runner.js'; import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js'; import type { OutboundFile } from './channels/adapter.js'; import type { Session } from './types.js'; @@ -193,7 +192,6 @@ async function drainSession(session: Session): Promise { const platformMsgId = await deliverMessage(msg, session, inDb); markDelivered(inDb, msg.id, platformMsgId ?? null); deliveryAttempts.delete(msg.id); - resetContainerIdleTimer(session.id); // Pause the typing indicator after a real user-facing message // lands on the user's screen, so the client has time to visually diff --git a/src/host-core.test.ts b/src/host-core.test.ts index a8b4684..7269164 100644 --- a/src/host-core.test.ts +++ b/src/host-core.test.ts @@ -30,7 +30,6 @@ import type { InboundEvent } from './router.js'; // Mock container runner to prevent actual Docker spawning vi.mock('./container-runner.js', () => ({ wakeContainer: vi.fn().mockResolvedValue(undefined), - resetContainerIdleTimer: vi.fn(), isContainerRunning: vi.fn().mockReturnValue(false), getActiveContainerCount: vi.fn().mockReturnValue(0), killContainer: vi.fn(), diff --git a/src/host-sweep.test.ts b/src/host-sweep.test.ts new file mode 100644 index 0000000..d9505a4 --- /dev/null +++ b/src/host-sweep.test.ts @@ -0,0 +1,128 @@ +/** + * Unit tests for the stuck-container decision logic introduced by + * ACTION-ITEMS item 9. Lives on the pure helper `decideStuckAction` so we + * don't have to mock the filesystem or the container runner. + */ +import { describe, expect, it } from 'vitest'; + +import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, decideStuckAction } from './host-sweep.js'; + +const BASE = Date.parse('2026-04-20T12:00:00.000Z'); + +function claim(id: string, offsetMs: number) { + return { message_id: id, status_changed: new Date(BASE - offsetMs).toISOString() }; +} + +describe('decideStuckAction', () => { + it('returns ok when heartbeat is fresh and no claims', () => { + expect( + decideStuckAction({ + now: BASE, + heartbeatMtimeMs: BASE - 5_000, + containerState: null, + claims: [], + }), + ).toEqual({ action: 'ok' }); + }); + + it('returns kill-ceiling when heartbeat older than 30 min', () => { + const heartbeatMtimeMs = BASE - ABSOLUTE_CEILING_MS - 1_000; + const res = decideStuckAction({ + now: BASE, + heartbeatMtimeMs, + containerState: null, + claims: [], + }); + expect(res.action).toBe('kill-ceiling'); + if (res.action !== 'kill-ceiling') return; + expect(res.ceilingMs).toBe(ABSOLUTE_CEILING_MS); + expect(res.heartbeatAgeMs).toBeGreaterThan(ABSOLUTE_CEILING_MS); + }); + + it('treats an absent heartbeat file as infinitely stale', () => { + const res = decideStuckAction({ + now: BASE, + heartbeatMtimeMs: 0, + containerState: null, + claims: [], + }); + expect(res.action).toBe('kill-ceiling'); + }); + + it('extends the ceiling when Bash has a declared timeout longer than 30 min', () => { + const twoHrMs = 2 * 60 * 60 * 1000; + const res = decideStuckAction({ + now: BASE, + // 45 min — over the default ceiling, but under the Bash timeout + heartbeatMtimeMs: BASE - 45 * 60 * 1000, + containerState: { + current_tool: 'Bash', + tool_declared_timeout_ms: twoHrMs, + tool_started_at: new Date(BASE - 45 * 60 * 1000).toISOString(), + }, + claims: [], + }); + expect(res.action).toBe('ok'); + }); + + it('returns kill-claim when a claim is past 60s and heartbeat has not moved', () => { + const claimedAgeMs = CLAIM_STUCK_MS + 10_000; + const res = decideStuckAction({ + now: BASE, + heartbeatMtimeMs: BASE - claimedAgeMs - 5_000, // older than the claim + containerState: null, + claims: [claim('msg-1', claimedAgeMs)], + }); + expect(res.action).toBe('kill-claim'); + if (res.action !== 'kill-claim') return; + expect(res.messageId).toBe('msg-1'); + expect(res.toleranceMs).toBe(CLAIM_STUCK_MS); + }); + + it('does not kill when heartbeat has been touched since the claim', () => { + const claimedAgeMs = CLAIM_STUCK_MS + 10_000; + const res = decideStuckAction({ + now: BASE, + heartbeatMtimeMs: BASE - 2_000, // fresh, updated after the claim + containerState: null, + claims: [claim('msg-1', claimedAgeMs)], + }); + expect(res.action).toBe('ok'); + }); + + it('does not kill when claim age is below tolerance', () => { + const res = decideStuckAction({ + now: BASE, + heartbeatMtimeMs: BASE - CLAIM_STUCK_MS - 10_000, // old, but claim is recent + containerState: null, + claims: [claim('msg-1', 5_000)], + }); + expect(res.action).toBe('ok'); + }); + + it('widens per-claim tolerance for a running Bash with long timeout', () => { + const tenMinMs = 10 * 60 * 1000; + const res = decideStuckAction({ + now: BASE, + // 5 min since claim, over the 60s default but under the declared Bash timeout + heartbeatMtimeMs: BASE - (5 * 60 * 1000) - 5_000, + containerState: { + current_tool: 'Bash', + tool_declared_timeout_ms: tenMinMs, + tool_started_at: new Date(BASE - 5 * 60 * 1000).toISOString(), + }, + claims: [claim('msg-1', 5 * 60 * 1000)], + }); + expect(res.action).toBe('ok'); + }); + + it('ignores claims with unparseable timestamps', () => { + const res = decideStuckAction({ + now: BASE, + heartbeatMtimeMs: BASE - 5_000, + containerState: null, + claims: [{ message_id: 'x', status_changed: 'not-a-date' }], + }); + expect(res.action).toBe('ok'); + }); +}); diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 7a7688f..0f8365c 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -2,10 +2,29 @@ * Host sweep — periodic maintenance of all session DBs. * * Two-DB architecture: - * - Reads processing_ack from outbound.db to sync message status - * - Writes to inbound.db (host-owned) for status updates and recurrence - * - Uses heartbeat file mtime for stale container detection (not DB writes) + * - Reads processing_ack + container_state from outbound.db + * - Writes to inbound.db (host-owned) for status updates + recurrence + * - Uses heartbeat file mtime for liveness (never polls DB for it) * - Never writes to outbound.db — preserves single-writer-per-file invariant + * + * Stuck / idle detection (replaces the old IDLE_TIMEOUT setTimeout + 10-min + * heartbeat threshold): + * + * If the container isn't running and there are 'processing' rows left over + * (e.g. it crashed mid-turn) → reset them to pending with backoff + + * tries++. Existing retry machinery does the rest. + * + * If the container IS running: + * 1. Absolute ceiling: heartbeat age > max(30 min, current_bash_timeout) + * → kill. Covers the "alive but silent for 30 min" case. Extended + * only while Bash is declared as running longer, honouring the + * user's own timeout directive. Kill then resets processing rows. + * + * 2. Message-scoped stuck: for each 'processing' row, tolerance = + * max(60s, current_bash_timeout_ms_if_Bash_running). If + * (claim_age > tolerance) AND (heartbeat_mtime <= status_changed) + * → kill + reset this message + tries++. Semantics: "container + * claimed a message and went quiet past tolerance since the claim." */ import type Database from 'better-sqlite3'; import fs from 'fs'; @@ -14,22 +33,68 @@ import { getActiveSessions } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { countDueMessages, - syncProcessingAcks, - getStuckProcessingIds, + getContainerState, getMessageForRetry, + getProcessingClaims, markMessageFailed, retryWithBackoff, + syncProcessingAcks, + type ContainerState, } from './db/session-db.js'; import { log } from './log.js'; import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js'; -import { wakeContainer, isContainerRunning } from './container-runner.js'; +import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js'; import type { Session } from './types.js'; const SWEEP_INTERVAL_MS = 60_000; -const STALE_THRESHOLD_MS = 10 * 60 * 1000; // 10 minutes +// Absolute idle ceiling for a running container. If the heartbeat file hasn't +// been touched in this long, the container is either stuck or doing genuinely +// nothing — kill and restart on the next inbound. +export const ABSOLUTE_CEILING_MS = 30 * 60 * 1000; +// Stuck tolerance window applied per 'processing' claim — "did we see any +// signs of life since this message was claimed?" +export const CLAIM_STUCK_MS = 60 * 1000; const MAX_TRIES = 5; const BACKOFF_BASE_MS = 5000; +export type StuckDecision = + | { action: 'ok' } + | { action: 'kill-ceiling'; heartbeatAgeMs: number; ceilingMs: number } + | { action: 'kill-claim'; messageId: string; claimAgeMs: number; toleranceMs: number }; + +/** + * Pure decision for whether a running container should be killed this sweep + * tick. Inputs are all deterministic; filesystem + DB reads happen in the + * caller. + */ +export function decideStuckAction(args: { + now: number; + heartbeatMtimeMs: number; // 0 when heartbeat file absent + containerState: ContainerState | null; + claims: Array<{ message_id: string; status_changed: string }>; +}): StuckDecision { + const { now, heartbeatMtimeMs, containerState, claims } = args; + const declaredBashMs = bashTimeoutMs(containerState); + const heartbeatAge = heartbeatMtimeMs === 0 ? Infinity : now - heartbeatMtimeMs; + + const ceiling = Math.max(ABSOLUTE_CEILING_MS, declaredBashMs ?? 0); + if (heartbeatAge > ceiling) { + return { action: 'kill-ceiling', heartbeatAgeMs: heartbeatAge, ceilingMs: ceiling }; + } + + const tolerance = Math.max(CLAIM_STUCK_MS, declaredBashMs ?? 0); + for (const claim of claims) { + const claimedAt = Date.parse(claim.status_changed); + if (Number.isNaN(claimedAt)) continue; + const claimAge = now - claimedAt; + if (claimAge <= tolerance) continue; + if (heartbeatMtimeMs > claimedAt) continue; + return { action: 'kill-claim', messageId: claim.message_id, claimAgeMs: claimAge, toleranceMs: tolerance }; + } + + return { action: 'ok' }; +} + let running = false; export function startHostSweep(): void { @@ -84,20 +149,26 @@ async function sweepSession(session: Session): Promise { syncProcessingAcks(inDb, outDb); } - // 2. Check for due pending messages → wake container - const dueCount = countDueMessages(inDb); + const alive = isContainerRunning(session.id); + // 2. Crashed-container cleanup: processing rows left behind get retried. + if (!alive && outDb) { + resetStuckProcessingRows(inDb, outDb, session, 'container not running'); + } + + // 3. Running-container SLA: absolute ceiling + per-claim stuck rules. + if (alive && outDb) { + enforceRunningContainerSla(inDb, outDb, session, agentGroup.id); + } + + // 4. Wake a container if new work is due and nothing is running. + const dueCount = countDueMessages(inDb); if (dueCount > 0 && !isContainerRunning(session.id)) { log.info('Waking container for due messages', { sessionId: session.id, count: dueCount }); await wakeContainer(session); } - // 3. Detect stale containers via heartbeat file - if (outDb) { - detectStaleContainers(inDb, outDb, session, agentGroup.id); - } - - // 4. Handle recurrence for completed messages. + // 5. Recurrence fanout for completed recurring tasks. // MODULE-HOOK:scheduling-recurrence:start const { handleRecurrence } = await import('./modules/scheduling/recurrence.js'); await handleRecurrence(inDb, session); @@ -108,45 +179,84 @@ async function sweepSession(session: Session): Promise { } } -/** - * Detect stale containers using heartbeat file mtime. - * If the heartbeat is older than STALE_THRESHOLD and processing_ack has - * 'processing' entries, the container likely crashed — reset with backoff. - */ -function detectStaleContainers( +function heartbeatMtimeMs(agentGroupId: string, sessionId: string): number { + const hbPath = heartbeatPath(agentGroupId, sessionId); + try { + return fs.statSync(hbPath).mtimeMs; + } catch { + return 0; + } +} + +function bashTimeoutMs(state: ContainerState | null): number | null { + if (!state || state.current_tool !== 'Bash') return null; + return typeof state.tool_declared_timeout_ms === 'number' ? state.tool_declared_timeout_ms : null; +} + +function enforceRunningContainerSla( inDb: Database.Database, outDb: Database.Database, session: Session, agentGroupId: string, ): void { - const hbPath = heartbeatPath(agentGroupId, session.id); - let heartbeatAge = Infinity; - try { - const stat = fs.statSync(hbPath); - heartbeatAge = Date.now() - stat.mtimeMs; - } catch { - // No heartbeat file — container may never have started, or it's very old + const decision = decideStuckAction({ + now: Date.now(), + heartbeatMtimeMs: heartbeatMtimeMs(agentGroupId, session.id), + containerState: getContainerState(outDb), + claims: getProcessingClaims(outDb), + }); + + if (decision.action === 'ok') return; + + if (decision.action === 'kill-ceiling') { + log.warn('Killing container past absolute ceiling', { + sessionId: session.id, + heartbeatAgeMs: decision.heartbeatAgeMs, + ceilingMs: decision.ceilingMs, + }); + killContainer(session.id, 'absolute-ceiling'); + resetStuckProcessingRows(inDb, outDb, session, 'absolute-ceiling'); + return; } - if (heartbeatAge < STALE_THRESHOLD_MS) return; // Container is alive + log.warn('Killing container — message claimed then silent', { + sessionId: session.id, + messageId: decision.messageId, + claimAgeMs: decision.claimAgeMs, + toleranceMs: decision.toleranceMs, + }); + killContainer(session.id, 'claim-stuck'); + resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck'); +} - // Heartbeat is stale — check for stuck processing entries - const processingIds = getStuckProcessingIds(outDb); - if (processingIds.length === 0) return; - - for (const messageId of processingIds) { - const msg = getMessageForRetry(inDb, messageId, 'pending'); +function resetStuckProcessingRows( + inDb: Database.Database, + outDb: Database.Database, + session: Session, + reason: string, +): void { + const claims = getProcessingClaims(outDb); + for (const { message_id } of claims) { + const msg = getMessageForRetry(inDb, message_id, 'pending'); if (!msg) continue; if (msg.tries >= MAX_TRIES) { markMessageFailed(inDb, msg.id); - log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id }); + log.warn('Message marked as failed after max retries', { + messageId: msg.id, + sessionId: session.id, + reason, + }); } else { const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries); const backoffSec = Math.floor(backoffMs / 1000); retryWithBackoff(inDb, msg.id, backoffSec); - log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs }); + log.info('Reset stale message with backoff', { + messageId: msg.id, + tries: msg.tries, + backoffMs, + reason, + }); } } } -