feat(lifecycle): stuck detection + heartbeat lifecycle + SDK tool blocklist
Replaces the two overlapping old mechanisms (30-min setTimeout kill in
container-runner, 10-min heartbeat STALE_THRESHOLD reset in host-sweep)
with message-scoped stuck detection anchored to the processing_ack claim
age + an absolute 30-min ceiling that extends for long-declared Bash
tools.
Old model problems:
- IDLE_TIMEOUT setTimeout fired on plain wall-clock time; slow-but-alive
agents got killed at 30min regardless of activity
- 10-min STALE_THRESHOLD in the sweep was unreliable — the heartbeat is
only touched on SDK events, so legitimate silent tool work (sleep 30,
long WebFetch, npm install) looked identical to a hung container
- Two overlapping sources of truth for "when to let go of a container"
New model:
- Host sweep is the single source of truth.
- Container exposes a new `container_state` single-row table in outbound.db
(schema added; container writes, host reads). PreToolUse hook writes
current_tool + tool_declared_timeout_ms (read from Bash's tool_input);
PostToolUse / PostToolUseFailure clear it.
- Sweep decides with a pure helper `decideStuckAction`:
* absolute ceiling — kill if heartbeat age > max(30min, bash_timeout)
* per-claim stuck — kill if any processing_ack row has claim_age >
max(60s, bash_timeout) AND heartbeat hasn't been touched since claim
* otherwise ok
Kill paths reset leftover processing rows with exponential backoff,
reusing the existing retry machinery.
Tool blocklist expanded:
- AskUserQuestion (SDK placeholder; we have mcp__nanoclaw__ask_user_question)
- EnterPlanMode, ExitPlanMode, EnterWorktree, ExitWorktree (Claude Code UI
affordances; would hang in headless containers)
PreToolUse hook is also defense-in-depth: if a disallowed tool name slips
through, it returns `{ decision: 'block' }` so the agent sees a clear
error instead of appearing stuck.
Removed:
- container-runner.ts: IDLE_TIMEOUT setTimeout, resetIdle callback on
activeContainers entry, resetContainerIdleTimer export.
- delivery.ts: the resetContainerIdleTimer call on successful delivery.
- poll-loop.ts: IDLE_END_MS + its setInterval. Keeping the query open is
cheaper than close+reopen (no cold prompt cache). Liveness is now a
host-side concern.
- host-sweep.ts: 10-min STALE_THRESHOLD_MS + getStuckProcessingIds in the
stale-detection path (still exported for kill reset).
Tests:
- src/host-sweep.test.ts — 9 tests for decideStuckAction covering: fresh
heartbeat, absolute ceiling, absent heartbeat, Bash-timeout extension
(both ceiling and per-claim), claim age below tolerance, heartbeat
touched after claim, unparseable timestamps.
Ref: docs/v1-vs-v2/ACTION-ITEMS.md items 9, 6a, 10.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -64,10 +64,58 @@ export function getOutboundDb(): Database {
|
||||
if (!cols.has('updated_at')) {
|
||||
_outbound.exec(`ALTER TABLE session_state ADD COLUMN updated_at TEXT NOT NULL DEFAULT ''`);
|
||||
}
|
||||
// container_state: tracks the current tool in flight (if any) so the host
|
||||
// sweep can widen its stuck tolerance when Bash is running with a user-
|
||||
// declared long timeout. Forward-compat for older outbound.db files.
|
||||
_outbound.exec(`
|
||||
CREATE TABLE IF NOT EXISTS container_state (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
current_tool TEXT,
|
||||
tool_declared_timeout_ms INTEGER,
|
||||
tool_started_at TEXT,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
}
|
||||
return _outbound;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record that a tool is starting. `declaredTimeoutMs` is the tool's own
|
||||
* timeout hint when one is available (Bash exposes it in the tool_use input);
|
||||
* omit for tools with no declared timeout.
|
||||
*/
|
||||
export function setContainerToolInFlight(tool: string, declaredTimeoutMs: number | null): void {
|
||||
const now = new Date().toISOString();
|
||||
getOutboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO container_state (id, current_tool, tool_declared_timeout_ms, tool_started_at, updated_at)
|
||||
VALUES (1, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
current_tool = excluded.current_tool,
|
||||
tool_declared_timeout_ms = excluded.tool_declared_timeout_ms,
|
||||
tool_started_at = excluded.tool_started_at,
|
||||
updated_at = excluded.updated_at`,
|
||||
)
|
||||
.run(tool, declaredTimeoutMs, now, now);
|
||||
}
|
||||
|
||||
/** Clear the in-flight tool — called on PostToolUse / PostToolUseFailure. */
|
||||
export function clearContainerToolInFlight(): void {
|
||||
const now = new Date().toISOString();
|
||||
getOutboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO container_state (id, current_tool, tool_declared_timeout_ms, tool_started_at, updated_at)
|
||||
VALUES (1, NULL, NULL, NULL, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
current_tool = NULL,
|
||||
tool_declared_timeout_ms = NULL,
|
||||
tool_started_at = NULL,
|
||||
updated_at = excluded.updated_at`,
|
||||
)
|
||||
.run(now);
|
||||
}
|
||||
|
||||
/**
|
||||
* Touch the heartbeat file — replaces the old touchProcessing() DB writes.
|
||||
* The host checks this file's mtime for stale container detection.
|
||||
@@ -157,6 +205,13 @@ export function initTestSessionDb(): { inbound: Database; outbound: Database } {
|
||||
value TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
CREATE TABLE container_state (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
current_tool TEXT,
|
||||
tool_declared_timeout_ms INTEGER,
|
||||
tool_started_at TEXT,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
|
||||
return { inbound: _inbound, outbound: _outbound };
|
||||
|
||||
@@ -8,7 +8,6 @@ import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types
|
||||
|
||||
const POLL_INTERVAL_MS = 1000;
|
||||
const ACTIVE_POLL_INTERVAL_MS = 500;
|
||||
const IDLE_END_MS = 20_000; // End stream after 20s with no SDK events
|
||||
|
||||
function log(msg: string): void {
|
||||
console.error(`[poll-loop] ${msg}`);
|
||||
@@ -267,9 +266,13 @@ interface QueryResult {
|
||||
async function processQuery(query: AgentQuery, routing: RoutingContext): Promise<QueryResult> {
|
||||
let queryContinuation: string | undefined;
|
||||
let done = false;
|
||||
let lastEventTime = Date.now();
|
||||
|
||||
// Concurrent polling: push follow-ups, checkpoint WAL, detect idle
|
||||
// Concurrent polling: push follow-ups into the active query as they arrive.
|
||||
// We do NOT force-end the stream on silence — keeping the query open is
|
||||
// strictly cheaper than close+reopen (no cold prompt cache, no reconnect).
|
||||
// Stream liveness is decided host-side via the heartbeat file + processing
|
||||
// claim age (see src/host-sweep.ts); if something is truly stuck, the host
|
||||
// will kill the container and messages get reset to pending.
|
||||
const pollHandle = setInterval(() => {
|
||||
if (done) return;
|
||||
|
||||
@@ -296,19 +299,11 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise
|
||||
query.push(prompt);
|
||||
|
||||
markCompleted(newIds);
|
||||
lastEventTime = Date.now(); // new input counts as activity
|
||||
}
|
||||
|
||||
// End stream when agent is idle: no SDK events and no pending messages
|
||||
if (Date.now() - lastEventTime > IDLE_END_MS) {
|
||||
log(`No SDK events for ${IDLE_END_MS / 1000}s, ending query`);
|
||||
query.end();
|
||||
}
|
||||
}, ACTIVE_POLL_INTERVAL_MS);
|
||||
|
||||
try {
|
||||
for await (const event of query.events) {
|
||||
lastEventTime = Date.now();
|
||||
handleEvent(event, routing);
|
||||
touchHeartbeat();
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import path from 'path';
|
||||
|
||||
import { query as sdkQuery, type HookCallback, type PreCompactHookInput } from '@anthropic-ai/claude-agent-sdk';
|
||||
|
||||
import { clearContainerToolInFlight, setContainerToolInFlight } from '../db/connection.js';
|
||||
import { registerProvider } from './provider-registry.js';
|
||||
import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent, ProviderOptions, QueryInput } from './types.js';
|
||||
|
||||
@@ -10,10 +11,28 @@ function log(msg: string): void {
|
||||
console.error(`[claude-provider] ${msg}`);
|
||||
}
|
||||
|
||||
// Deferred SDK builtins that would sidestep nanoclaw's own scheduling.
|
||||
// Scheduling goes through mcp__nanoclaw__schedule_task so that tasks are
|
||||
// durable across sessions/restarts and gated by our pre-task script hook.
|
||||
const SDK_DISALLOWED_TOOLS = ['CronCreate', 'CronDelete', 'CronList', 'ScheduleWakeup'];
|
||||
// Deferred SDK builtins that either sidestep nanoclaw's own scheduling or
|
||||
// don't fit our async message-passing model (they're designed for Claude
|
||||
// Code's interactive UI and would hang here).
|
||||
//
|
||||
// - CronCreate / CronDelete / CronList / ScheduleWakeup: we have durable
|
||||
// scheduling via mcp__nanoclaw__schedule_task.
|
||||
// - AskUserQuestion: SDK returns a placeholder instead of blocking on a
|
||||
// real answer — we have mcp__nanoclaw__ask_user_question that persists
|
||||
// the question and blocks on the real reply.
|
||||
// - EnterPlanMode / ExitPlanMode / EnterWorktree / ExitWorktree: Claude
|
||||
// Code UI affordances; in a headless container they'd appear stuck.
|
||||
const SDK_DISALLOWED_TOOLS = [
|
||||
'CronCreate',
|
||||
'CronDelete',
|
||||
'CronList',
|
||||
'ScheduleWakeup',
|
||||
'AskUserQuestion',
|
||||
'EnterPlanMode',
|
||||
'ExitPlanMode',
|
||||
'EnterWorktree',
|
||||
'ExitWorktree',
|
||||
];
|
||||
|
||||
// Tool allowlist for NanoClaw agent containers
|
||||
const TOOL_ALLOWLIST = [
|
||||
@@ -122,6 +141,43 @@ function formatTranscriptMarkdown(messages: ParsedMessage[], title?: string | nu
|
||||
return lines.join('\n');
|
||||
}
|
||||
|
||||
/**
|
||||
* PreToolUse hook: record the current tool + its declared timeout so the host
|
||||
* sweep can widen its stuck tolerance while Bash is running a long-declared
|
||||
* script. Defense-in-depth: if SDK_DISALLOWED_TOOLS slips through somehow,
|
||||
* block the call here instead of letting the agent hang.
|
||||
*/
|
||||
const preToolUseHook: HookCallback = async (input) => {
|
||||
const i = input as { tool_name?: string; tool_input?: Record<string, unknown> };
|
||||
const toolName = i.tool_name ?? '';
|
||||
if (SDK_DISALLOWED_TOOLS.includes(toolName)) {
|
||||
return {
|
||||
decision: 'block',
|
||||
stopReason: `Tool '${toolName}' is not available in this environment — use the nanoclaw equivalent.`,
|
||||
} as unknown as ReturnType<HookCallback>;
|
||||
}
|
||||
// Bash exposes its timeout via the tool_input.timeout field (ms). Any other
|
||||
// tool: no declared timeout.
|
||||
const declaredTimeoutMs =
|
||||
toolName === 'Bash' && typeof i.tool_input?.timeout === 'number' ? (i.tool_input.timeout as number) : null;
|
||||
try {
|
||||
setContainerToolInFlight(toolName, declaredTimeoutMs);
|
||||
} catch (err) {
|
||||
log(`PreToolUse: failed to record container_state: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
return { continue: true };
|
||||
};
|
||||
|
||||
/** Clear in-flight tool on PostToolUse / PostToolUseFailure. */
|
||||
const postToolUseHook: HookCallback = async () => {
|
||||
try {
|
||||
clearContainerToolInFlight();
|
||||
} catch (err) {
|
||||
log(`PostToolUse: failed to clear container_state: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
return { continue: true };
|
||||
};
|
||||
|
||||
function createPreCompactHook(assistantName?: string): HookCallback {
|
||||
return async (input) => {
|
||||
const preCompact = input as PreCompactHookInput;
|
||||
@@ -224,6 +280,9 @@ export class ClaudeProvider implements AgentProvider {
|
||||
settingSources: ['project', 'user'],
|
||||
mcpServers: this.mcpServers,
|
||||
hooks: {
|
||||
PreToolUse: [{ hooks: [preToolUseHook] }],
|
||||
PostToolUse: [{ hooks: [postToolUseHook] }],
|
||||
PostToolUseFailure: [{ hooks: [postToolUseHook] }],
|
||||
PreCompact: [{ hooks: [createPreCompactHook(this.assistantName)] }],
|
||||
},
|
||||
},
|
||||
|
||||
@@ -10,7 +10,6 @@ import type { ChannelAdapter, ChannelSetup, InboundMessage, OutboundMessage } fr
|
||||
// Mock container runner
|
||||
vi.mock('../container-runner.js', () => ({
|
||||
wakeContainer: vi.fn().mockResolvedValue(undefined),
|
||||
resetContainerIdleTimer: vi.fn(),
|
||||
isContainerRunning: vi.fn().mockReturnValue(false),
|
||||
getActiveContainerCount: vi.fn().mockReturnValue(0),
|
||||
killContainer: vi.fn(),
|
||||
|
||||
@@ -9,7 +9,7 @@ import path from 'path';
|
||||
|
||||
import { OneCLI } from '@onecli-sh/sdk';
|
||||
|
||||
import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZONE } from './config.js';
|
||||
import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, ONECLI_URL, TIMEZONE } from './config.js';
|
||||
import { readContainerConfig, writeContainerConfig } from './container-config.js';
|
||||
import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
@@ -26,12 +26,7 @@ import {
|
||||
type ProviderContainerContribution,
|
||||
type VolumeMount,
|
||||
} from './providers/provider-container-registry.js';
|
||||
import {
|
||||
markContainerRunning,
|
||||
markContainerStopped,
|
||||
sessionDir,
|
||||
writeSessionRouting,
|
||||
} from './session-manager.js';
|
||||
import { markContainerRunning, markContainerStopped, sessionDir, writeSessionRouting } from './session-manager.js';
|
||||
import type { AgentGroup, Session } from './types.js';
|
||||
|
||||
const onecli = new OneCLI({ url: ONECLI_URL });
|
||||
@@ -125,22 +120,12 @@ async function spawnContainer(session: Session): Promise<void> {
|
||||
// stdout is unused in v2 (all IO is via session DB)
|
||||
container.stdout?.on('data', () => {});
|
||||
|
||||
// Idle timeout: kill container after IDLE_TIMEOUT of no activity
|
||||
let idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT);
|
||||
|
||||
const resetIdle = () => {
|
||||
clearTimeout(idleTimer);
|
||||
idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT);
|
||||
};
|
||||
|
||||
// Reset idle timer when the host detects new messages_out (called by delivery.ts)
|
||||
const entry = activeContainers.get(session.id);
|
||||
if (entry) {
|
||||
(entry as { resetIdle?: () => void }).resetIdle = resetIdle;
|
||||
}
|
||||
// No host-side idle timeout. Stale/stuck detection is driven by the host
|
||||
// sweep reading heartbeat mtime + processing_ack claim age + container_state
|
||||
// (see src/host-sweep.ts). This avoids killing long-running legitimate work
|
||||
// on a wall-clock timer.
|
||||
|
||||
container.on('close', (code) => {
|
||||
clearTimeout(idleTimer);
|
||||
activeContainers.delete(session.id);
|
||||
markContainerStopped(session.id);
|
||||
stopTypingRefresh(session.id);
|
||||
@@ -148,7 +133,6 @@ async function spawnContainer(session: Session): Promise<void> {
|
||||
});
|
||||
|
||||
container.on('error', (err) => {
|
||||
clearTimeout(idleTimer);
|
||||
activeContainers.delete(session.id);
|
||||
markContainerStopped(session.id);
|
||||
stopTypingRefresh(session.id);
|
||||
@@ -156,12 +140,6 @@ async function spawnContainer(session: Session): Promise<void> {
|
||||
});
|
||||
}
|
||||
|
||||
/** Reset the idle timer for a session's container (called when messages_out are delivered). */
|
||||
export function resetContainerIdleTimer(sessionId: string): void {
|
||||
const entry = activeContainers.get(sessionId) as { resetIdle?: () => void } | undefined;
|
||||
entry?.resetIdle?.();
|
||||
}
|
||||
|
||||
/** Kill a container for a session. */
|
||||
export function killContainer(sessionId: string, reason: string): void {
|
||||
const entry = activeContainers.get(sessionId);
|
||||
|
||||
@@ -213,4 +213,16 @@ CREATE TABLE IF NOT EXISTS session_state (
|
||||
value TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- Current tool-in-flight state. Single-row table (id=1). Container writes on
|
||||
-- PreToolUse and clears on PostToolUse / PostToolUseFailure. Host reads in the
|
||||
-- sweep to extend the stuck-tolerance window when Bash is running with a
|
||||
-- declared timeout > 60s (long-running scripts shouldn't be flagged as stuck).
|
||||
CREATE TABLE IF NOT EXISTS container_state (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
current_tool TEXT,
|
||||
tool_declared_timeout_ms INTEGER,
|
||||
tool_started_at TEXT,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
@@ -161,6 +161,47 @@ export function getStuckProcessingIds(outDb: Database.Database): string[] {
|
||||
).map((r) => r.message_id);
|
||||
}
|
||||
|
||||
export interface ProcessingClaim {
|
||||
message_id: string;
|
||||
status_changed: string;
|
||||
}
|
||||
|
||||
/** Return processing_ack rows still in 'processing' with their claim timestamps. */
|
||||
export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[] {
|
||||
return outDb
|
||||
.prepare(
|
||||
"SELECT message_id, status_changed FROM processing_ack WHERE status = 'processing'",
|
||||
)
|
||||
.all() as ProcessingClaim[];
|
||||
}
|
||||
|
||||
export interface ContainerState {
|
||||
current_tool: string | null;
|
||||
tool_declared_timeout_ms: number | null;
|
||||
tool_started_at: string | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the container's current tool-in-flight state, if any. Returns null
|
||||
* when either the table doesn't exist yet (older session DB) or no tool is
|
||||
* active. Host sweep reads this to widen stuck-detection tolerance while
|
||||
* Bash is running with a long declared timeout.
|
||||
*/
|
||||
export function getContainerState(outDb: Database.Database): ContainerState | null {
|
||||
try {
|
||||
const row = outDb
|
||||
.prepare(
|
||||
`SELECT current_tool, tool_declared_timeout_ms, tool_started_at
|
||||
FROM container_state WHERE id = 1`,
|
||||
)
|
||||
.get() as ContainerState | undefined;
|
||||
return row ?? null;
|
||||
} catch {
|
||||
// Table not present on older session DBs — treat as "no tool in flight".
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// messages_out (read-only from host)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -14,7 +14,6 @@ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||
|
||||
vi.mock('./container-runner.js', () => ({
|
||||
wakeContainer: vi.fn().mockResolvedValue(undefined),
|
||||
resetContainerIdleTimer: vi.fn(),
|
||||
isContainerRunning: vi.fn().mockReturnValue(false),
|
||||
killContainer: vi.fn(),
|
||||
buildAgentGroupImage: vi.fn().mockResolvedValue(undefined),
|
||||
|
||||
@@ -23,7 +23,6 @@ import {
|
||||
import { log } from './log.js';
|
||||
import { normalizeOptions } from './channels/ask-question.js';
|
||||
import { clearOutbox, openInboundDb, openOutboundDb, readOutboxFiles } from './session-manager.js';
|
||||
import { resetContainerIdleTimer } from './container-runner.js';
|
||||
import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js';
|
||||
import type { OutboundFile } from './channels/adapter.js';
|
||||
import type { Session } from './types.js';
|
||||
@@ -193,7 +192,6 @@ async function drainSession(session: Session): Promise<void> {
|
||||
const platformMsgId = await deliverMessage(msg, session, inDb);
|
||||
markDelivered(inDb, msg.id, platformMsgId ?? null);
|
||||
deliveryAttempts.delete(msg.id);
|
||||
resetContainerIdleTimer(session.id);
|
||||
|
||||
// Pause the typing indicator after a real user-facing message
|
||||
// lands on the user's screen, so the client has time to visually
|
||||
|
||||
@@ -30,7 +30,6 @@ import type { InboundEvent } from './router.js';
|
||||
// Mock container runner to prevent actual Docker spawning
|
||||
vi.mock('./container-runner.js', () => ({
|
||||
wakeContainer: vi.fn().mockResolvedValue(undefined),
|
||||
resetContainerIdleTimer: vi.fn(),
|
||||
isContainerRunning: vi.fn().mockReturnValue(false),
|
||||
getActiveContainerCount: vi.fn().mockReturnValue(0),
|
||||
killContainer: vi.fn(),
|
||||
|
||||
128
src/host-sweep.test.ts
Normal file
128
src/host-sweep.test.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
/**
|
||||
* Unit tests for the stuck-container decision logic introduced by
|
||||
* ACTION-ITEMS item 9. Lives on the pure helper `decideStuckAction` so we
|
||||
* don't have to mock the filesystem or the container runner.
|
||||
*/
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, decideStuckAction } from './host-sweep.js';
|
||||
|
||||
const BASE = Date.parse('2026-04-20T12:00:00.000Z');
|
||||
|
||||
function claim(id: string, offsetMs: number) {
|
||||
return { message_id: id, status_changed: new Date(BASE - offsetMs).toISOString() };
|
||||
}
|
||||
|
||||
describe('decideStuckAction', () => {
|
||||
it('returns ok when heartbeat is fresh and no claims', () => {
|
||||
expect(
|
||||
decideStuckAction({
|
||||
now: BASE,
|
||||
heartbeatMtimeMs: BASE - 5_000,
|
||||
containerState: null,
|
||||
claims: [],
|
||||
}),
|
||||
).toEqual({ action: 'ok' });
|
||||
});
|
||||
|
||||
it('returns kill-ceiling when heartbeat older than 30 min', () => {
|
||||
const heartbeatMtimeMs = BASE - ABSOLUTE_CEILING_MS - 1_000;
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
heartbeatMtimeMs,
|
||||
containerState: null,
|
||||
claims: [],
|
||||
});
|
||||
expect(res.action).toBe('kill-ceiling');
|
||||
if (res.action !== 'kill-ceiling') return;
|
||||
expect(res.ceilingMs).toBe(ABSOLUTE_CEILING_MS);
|
||||
expect(res.heartbeatAgeMs).toBeGreaterThan(ABSOLUTE_CEILING_MS);
|
||||
});
|
||||
|
||||
it('treats an absent heartbeat file as infinitely stale', () => {
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
heartbeatMtimeMs: 0,
|
||||
containerState: null,
|
||||
claims: [],
|
||||
});
|
||||
expect(res.action).toBe('kill-ceiling');
|
||||
});
|
||||
|
||||
it('extends the ceiling when Bash has a declared timeout longer than 30 min', () => {
|
||||
const twoHrMs = 2 * 60 * 60 * 1000;
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
// 45 min — over the default ceiling, but under the Bash timeout
|
||||
heartbeatMtimeMs: BASE - 45 * 60 * 1000,
|
||||
containerState: {
|
||||
current_tool: 'Bash',
|
||||
tool_declared_timeout_ms: twoHrMs,
|
||||
tool_started_at: new Date(BASE - 45 * 60 * 1000).toISOString(),
|
||||
},
|
||||
claims: [],
|
||||
});
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
|
||||
it('returns kill-claim when a claim is past 60s and heartbeat has not moved', () => {
|
||||
const claimedAgeMs = CLAIM_STUCK_MS + 10_000;
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
heartbeatMtimeMs: BASE - claimedAgeMs - 5_000, // older than the claim
|
||||
containerState: null,
|
||||
claims: [claim('msg-1', claimedAgeMs)],
|
||||
});
|
||||
expect(res.action).toBe('kill-claim');
|
||||
if (res.action !== 'kill-claim') return;
|
||||
expect(res.messageId).toBe('msg-1');
|
||||
expect(res.toleranceMs).toBe(CLAIM_STUCK_MS);
|
||||
});
|
||||
|
||||
it('does not kill when heartbeat has been touched since the claim', () => {
|
||||
const claimedAgeMs = CLAIM_STUCK_MS + 10_000;
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
heartbeatMtimeMs: BASE - 2_000, // fresh, updated after the claim
|
||||
containerState: null,
|
||||
claims: [claim('msg-1', claimedAgeMs)],
|
||||
});
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
|
||||
it('does not kill when claim age is below tolerance', () => {
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
heartbeatMtimeMs: BASE - CLAIM_STUCK_MS - 10_000, // old, but claim is recent
|
||||
containerState: null,
|
||||
claims: [claim('msg-1', 5_000)],
|
||||
});
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
|
||||
it('widens per-claim tolerance for a running Bash with long timeout', () => {
|
||||
const tenMinMs = 10 * 60 * 1000;
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
// 5 min since claim, over the 60s default but under the declared Bash timeout
|
||||
heartbeatMtimeMs: BASE - (5 * 60 * 1000) - 5_000,
|
||||
containerState: {
|
||||
current_tool: 'Bash',
|
||||
tool_declared_timeout_ms: tenMinMs,
|
||||
tool_started_at: new Date(BASE - 5 * 60 * 1000).toISOString(),
|
||||
},
|
||||
claims: [claim('msg-1', 5 * 60 * 1000)],
|
||||
});
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
|
||||
it('ignores claims with unparseable timestamps', () => {
|
||||
const res = decideStuckAction({
|
||||
now: BASE,
|
||||
heartbeatMtimeMs: BASE - 5_000,
|
||||
containerState: null,
|
||||
claims: [{ message_id: 'x', status_changed: 'not-a-date' }],
|
||||
});
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
});
|
||||
@@ -2,10 +2,29 @@
|
||||
* Host sweep — periodic maintenance of all session DBs.
|
||||
*
|
||||
* Two-DB architecture:
|
||||
* - Reads processing_ack from outbound.db to sync message status
|
||||
* - Writes to inbound.db (host-owned) for status updates and recurrence
|
||||
* - Uses heartbeat file mtime for stale container detection (not DB writes)
|
||||
* - Reads processing_ack + container_state from outbound.db
|
||||
* - Writes to inbound.db (host-owned) for status updates + recurrence
|
||||
* - Uses heartbeat file mtime for liveness (never polls DB for it)
|
||||
* - Never writes to outbound.db — preserves single-writer-per-file invariant
|
||||
*
|
||||
* Stuck / idle detection (replaces the old IDLE_TIMEOUT setTimeout + 10-min
|
||||
* heartbeat threshold):
|
||||
*
|
||||
* If the container isn't running and there are 'processing' rows left over
|
||||
* (e.g. it crashed mid-turn) → reset them to pending with backoff +
|
||||
* tries++. Existing retry machinery does the rest.
|
||||
*
|
||||
* If the container IS running:
|
||||
* 1. Absolute ceiling: heartbeat age > max(30 min, current_bash_timeout)
|
||||
* → kill. Covers the "alive but silent for 30 min" case. Extended
|
||||
* only while Bash is declared as running longer, honouring the
|
||||
* user's own timeout directive. Kill then resets processing rows.
|
||||
*
|
||||
* 2. Message-scoped stuck: for each 'processing' row, tolerance =
|
||||
* max(60s, current_bash_timeout_ms_if_Bash_running). If
|
||||
* (claim_age > tolerance) AND (heartbeat_mtime <= status_changed)
|
||||
* → kill + reset this message + tries++. Semantics: "container
|
||||
* claimed a message and went quiet past tolerance since the claim."
|
||||
*/
|
||||
import type Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
@@ -14,22 +33,68 @@ import { getActiveSessions } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import {
|
||||
countDueMessages,
|
||||
syncProcessingAcks,
|
||||
getStuckProcessingIds,
|
||||
getContainerState,
|
||||
getMessageForRetry,
|
||||
getProcessingClaims,
|
||||
markMessageFailed,
|
||||
retryWithBackoff,
|
||||
syncProcessingAcks,
|
||||
type ContainerState,
|
||||
} from './db/session-db.js';
|
||||
import { log } from './log.js';
|
||||
import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js';
|
||||
import { wakeContainer, isContainerRunning } from './container-runner.js';
|
||||
import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
const SWEEP_INTERVAL_MS = 60_000;
|
||||
const STALE_THRESHOLD_MS = 10 * 60 * 1000; // 10 minutes
|
||||
// Absolute idle ceiling for a running container. If the heartbeat file hasn't
|
||||
// been touched in this long, the container is either stuck or doing genuinely
|
||||
// nothing — kill and restart on the next inbound.
|
||||
export const ABSOLUTE_CEILING_MS = 30 * 60 * 1000;
|
||||
// Stuck tolerance window applied per 'processing' claim — "did we see any
|
||||
// signs of life since this message was claimed?"
|
||||
export const CLAIM_STUCK_MS = 60 * 1000;
|
||||
const MAX_TRIES = 5;
|
||||
const BACKOFF_BASE_MS = 5000;
|
||||
|
||||
export type StuckDecision =
|
||||
| { action: 'ok' }
|
||||
| { action: 'kill-ceiling'; heartbeatAgeMs: number; ceilingMs: number }
|
||||
| { action: 'kill-claim'; messageId: string; claimAgeMs: number; toleranceMs: number };
|
||||
|
||||
/**
|
||||
* Pure decision for whether a running container should be killed this sweep
|
||||
* tick. Inputs are all deterministic; filesystem + DB reads happen in the
|
||||
* caller.
|
||||
*/
|
||||
export function decideStuckAction(args: {
|
||||
now: number;
|
||||
heartbeatMtimeMs: number; // 0 when heartbeat file absent
|
||||
containerState: ContainerState | null;
|
||||
claims: Array<{ message_id: string; status_changed: string }>;
|
||||
}): StuckDecision {
|
||||
const { now, heartbeatMtimeMs, containerState, claims } = args;
|
||||
const declaredBashMs = bashTimeoutMs(containerState);
|
||||
const heartbeatAge = heartbeatMtimeMs === 0 ? Infinity : now - heartbeatMtimeMs;
|
||||
|
||||
const ceiling = Math.max(ABSOLUTE_CEILING_MS, declaredBashMs ?? 0);
|
||||
if (heartbeatAge > ceiling) {
|
||||
return { action: 'kill-ceiling', heartbeatAgeMs: heartbeatAge, ceilingMs: ceiling };
|
||||
}
|
||||
|
||||
const tolerance = Math.max(CLAIM_STUCK_MS, declaredBashMs ?? 0);
|
||||
for (const claim of claims) {
|
||||
const claimedAt = Date.parse(claim.status_changed);
|
||||
if (Number.isNaN(claimedAt)) continue;
|
||||
const claimAge = now - claimedAt;
|
||||
if (claimAge <= tolerance) continue;
|
||||
if (heartbeatMtimeMs > claimedAt) continue;
|
||||
return { action: 'kill-claim', messageId: claim.message_id, claimAgeMs: claimAge, toleranceMs: tolerance };
|
||||
}
|
||||
|
||||
return { action: 'ok' };
|
||||
}
|
||||
|
||||
let running = false;
|
||||
|
||||
export function startHostSweep(): void {
|
||||
@@ -84,20 +149,26 @@ async function sweepSession(session: Session): Promise<void> {
|
||||
syncProcessingAcks(inDb, outDb);
|
||||
}
|
||||
|
||||
// 2. Check for due pending messages → wake container
|
||||
const dueCount = countDueMessages(inDb);
|
||||
const alive = isContainerRunning(session.id);
|
||||
|
||||
// 2. Crashed-container cleanup: processing rows left behind get retried.
|
||||
if (!alive && outDb) {
|
||||
resetStuckProcessingRows(inDb, outDb, session, 'container not running');
|
||||
}
|
||||
|
||||
// 3. Running-container SLA: absolute ceiling + per-claim stuck rules.
|
||||
if (alive && outDb) {
|
||||
enforceRunningContainerSla(inDb, outDb, session, agentGroup.id);
|
||||
}
|
||||
|
||||
// 4. Wake a container if new work is due and nothing is running.
|
||||
const dueCount = countDueMessages(inDb);
|
||||
if (dueCount > 0 && !isContainerRunning(session.id)) {
|
||||
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
|
||||
await wakeContainer(session);
|
||||
}
|
||||
|
||||
// 3. Detect stale containers via heartbeat file
|
||||
if (outDb) {
|
||||
detectStaleContainers(inDb, outDb, session, agentGroup.id);
|
||||
}
|
||||
|
||||
// 4. Handle recurrence for completed messages.
|
||||
// 5. Recurrence fanout for completed recurring tasks.
|
||||
// MODULE-HOOK:scheduling-recurrence:start
|
||||
const { handleRecurrence } = await import('./modules/scheduling/recurrence.js');
|
||||
await handleRecurrence(inDb, session);
|
||||
@@ -108,45 +179,84 @@ async function sweepSession(session: Session): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect stale containers using heartbeat file mtime.
|
||||
* If the heartbeat is older than STALE_THRESHOLD and processing_ack has
|
||||
* 'processing' entries, the container likely crashed — reset with backoff.
|
||||
*/
|
||||
function detectStaleContainers(
|
||||
function heartbeatMtimeMs(agentGroupId: string, sessionId: string): number {
|
||||
const hbPath = heartbeatPath(agentGroupId, sessionId);
|
||||
try {
|
||||
return fs.statSync(hbPath).mtimeMs;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
function bashTimeoutMs(state: ContainerState | null): number | null {
|
||||
if (!state || state.current_tool !== 'Bash') return null;
|
||||
return typeof state.tool_declared_timeout_ms === 'number' ? state.tool_declared_timeout_ms : null;
|
||||
}
|
||||
|
||||
function enforceRunningContainerSla(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
agentGroupId: string,
|
||||
): void {
|
||||
const hbPath = heartbeatPath(agentGroupId, session.id);
|
||||
let heartbeatAge = Infinity;
|
||||
try {
|
||||
const stat = fs.statSync(hbPath);
|
||||
heartbeatAge = Date.now() - stat.mtimeMs;
|
||||
} catch {
|
||||
// No heartbeat file — container may never have started, or it's very old
|
||||
const decision = decideStuckAction({
|
||||
now: Date.now(),
|
||||
heartbeatMtimeMs: heartbeatMtimeMs(agentGroupId, session.id),
|
||||
containerState: getContainerState(outDb),
|
||||
claims: getProcessingClaims(outDb),
|
||||
});
|
||||
|
||||
if (decision.action === 'ok') return;
|
||||
|
||||
if (decision.action === 'kill-ceiling') {
|
||||
log.warn('Killing container past absolute ceiling', {
|
||||
sessionId: session.id,
|
||||
heartbeatAgeMs: decision.heartbeatAgeMs,
|
||||
ceilingMs: decision.ceilingMs,
|
||||
});
|
||||
killContainer(session.id, 'absolute-ceiling');
|
||||
resetStuckProcessingRows(inDb, outDb, session, 'absolute-ceiling');
|
||||
return;
|
||||
}
|
||||
|
||||
if (heartbeatAge < STALE_THRESHOLD_MS) return; // Container is alive
|
||||
log.warn('Killing container — message claimed then silent', {
|
||||
sessionId: session.id,
|
||||
messageId: decision.messageId,
|
||||
claimAgeMs: decision.claimAgeMs,
|
||||
toleranceMs: decision.toleranceMs,
|
||||
});
|
||||
killContainer(session.id, 'claim-stuck');
|
||||
resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck');
|
||||
}
|
||||
|
||||
// Heartbeat is stale — check for stuck processing entries
|
||||
const processingIds = getStuckProcessingIds(outDb);
|
||||
if (processingIds.length === 0) return;
|
||||
|
||||
for (const messageId of processingIds) {
|
||||
const msg = getMessageForRetry(inDb, messageId, 'pending');
|
||||
function resetStuckProcessingRows(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
reason: string,
|
||||
): void {
|
||||
const claims = getProcessingClaims(outDb);
|
||||
for (const { message_id } of claims) {
|
||||
const msg = getMessageForRetry(inDb, message_id, 'pending');
|
||||
if (!msg) continue;
|
||||
|
||||
if (msg.tries >= MAX_TRIES) {
|
||||
markMessageFailed(inDb, msg.id);
|
||||
log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id });
|
||||
log.warn('Message marked as failed after max retries', {
|
||||
messageId: msg.id,
|
||||
sessionId: session.id,
|
||||
reason,
|
||||
});
|
||||
} else {
|
||||
const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries);
|
||||
const backoffSec = Math.floor(backoffMs / 1000);
|
||||
retryWithBackoff(inDb, msg.id, backoffSec);
|
||||
log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs });
|
||||
log.info('Reset stale message with backoff', {
|
||||
messageId: msg.id,
|
||||
tries: msg.tries,
|
||||
backoffMs,
|
||||
reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user