From ccfdf2dd7576603ba9832a5eeed430ee43e2e35a Mon Sep 17 00:00:00 2001 From: Claw <728255-_ky@users.noreply.gitlab.com> Date: Thu, 30 Apr 2026 15:06:01 -0400 Subject: [PATCH] fix(agent-runner): open inbound.db fresh per messages_in read Cached singleton can return stale rows on virtiofs/NFS mounts, causing follow-up messages to silently never be polled. Add openInboundDb() with mmap_size=0 and switch the three messages_in readers to it. Co-Authored-By: Claude Opus 4.7 (1M context) --- container/agent-runner/src/db/connection.ts | 28 +++++++- container/agent-runner/src/db/messages-in.ts | 75 ++++++++++++-------- 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 3f0e73b..3ca44a8 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -28,11 +28,37 @@ let _inbound: Database | null = null; let _outbound: Database | null = null; let _heartbeatPath: string = DEFAULT_HEARTBEAT_PATH; -/** Inbound DB — container opens read-only (host is the sole writer). */ +/** + * Avoid all cached db reads; open inbound.db read-only with mmap and page cache disabled. + * + * Use this (not getInboundDb) for readers that need to see host-written rows + * promptly — e.g. messages_in polling. Caller must .close() the returned + * connection (try/finally). + * + * Needed for mounts where host writes don't reliably invalidate + * SQLite's caches: virtiofs (Colima, Lima, Podman Machine, Apple + * Container), NFS. + * + * Cost is microseconds per query, so safe for universal use. + */ +export function openInboundDb(): Database { + const db = new Database(DEFAULT_INBOUND_PATH, { readonly: true }); + db.exec('PRAGMA busy_timeout = 5000'); + db.exec('PRAGMA mmap_size = 0'); + return db; +} + +/** + * Inbound DB — long-lived singleton, OK for tables the host writes once + * at spawn and never again (destinations, session_routing). For + * messages_in polling — where the host writes continuously and a stale + * view causes the pollHandle hang — use `openInboundDb()` instead. + */ export function getInboundDb(): Database { if (!_inbound) { _inbound = new Database(DEFAULT_INBOUND_PATH, { readonly: true }); _inbound.exec('PRAGMA busy_timeout = 5000'); + _inbound.exec('PRAGMA mmap_size = 0'); } return _inbound; } diff --git a/container/agent-runner/src/db/messages-in.ts b/container/agent-runner/src/db/messages-in.ts index 4ecf818..88906ed 100644 --- a/container/agent-runner/src/db/messages-in.ts +++ b/container/agent-runner/src/db/messages-in.ts @@ -8,7 +8,7 @@ * processing_ack. The host reads processing_ack to sync message lifecycle. */ import { getConfig } from '../config.js'; -import { getInboundDb, getOutboundDb } from './connection.js'; +import { openInboundDb, getOutboundDb } from './connection.js'; export interface MessageInRow { id: string; @@ -50,31 +50,35 @@ function getMaxMessagesPerPrompt(): number { * trigger=1 separately (see src/db/session-db.ts). */ export function getPendingMessages(): MessageInRow[] { - const inbound = getInboundDb(); + const inbound = openInboundDb(); const outbound = getOutboundDb(); - const pending = inbound - .prepare( - `SELECT * FROM messages_in - WHERE status = 'pending' - AND (process_after IS NULL OR datetime(process_after) <= datetime('now')) - ORDER BY seq DESC - LIMIT ?`, - ) - .all(getMaxMessagesPerPrompt()) as MessageInRow[]; + try { + const pending = inbound + .prepare( + `SELECT * FROM messages_in + WHERE status = 'pending' + AND (process_after IS NULL OR datetime(process_after) <= datetime('now')) + ORDER BY seq DESC + LIMIT ?`, + ) + .all(getMaxMessagesPerPrompt()) as MessageInRow[]; - if (pending.length === 0) return []; + if (pending.length === 0) return []; - // Filter out messages already acknowledged in outbound.db - const ackedIds = new Set( - (outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map( - (r) => r.message_id, - ), - ); + // Filter out messages already acknowledged in outbound.db + const ackedIds = new Set( + (outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map( + (r) => r.message_id, + ), + ); - // Reverse: we fetched DESC to take the most recent N, but the agent - // should see them in chronological order (oldest first). - return pending.filter((m) => !ackedIds.has(m.id)).reverse(); + // Reverse: we fetched DESC to take the most recent N, but the agent + // should see them in chronological order (oldest first). + return pending.filter((m) => !ackedIds.has(m.id)).reverse(); + } finally { + inbound.close(); + } } /** Mark messages as processing — writes to processing_ack in outbound.db. */ @@ -112,7 +116,12 @@ export function markFailed(id: string): void { /** Get a message by ID (read from inbound.db). */ export function getMessageIn(id: string): MessageInRow | undefined { - return getInboundDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined; + const inbound = openInboundDb(); + try { + return inbound.prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined; + } finally { + inbound.close(); + } } /** @@ -120,19 +129,23 @@ export function getMessageIn(id: string): MessageInRow | undefined { * Reads from inbound.db, checks processing_ack to skip already-handled responses. */ export function findQuestionResponse(questionId: string): MessageInRow | undefined { - const inbound = getInboundDb(); + const inbound = openInboundDb(); const outbound = getOutboundDb(); - const response = inbound - .prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?") - .get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined; + try { + const response = inbound + .prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?") + .get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined; - if (!response) return undefined; + if (!response) return undefined; - // Check it hasn't been acked already - const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id); - if (acked) return undefined; + // Check it hasn't been acked already + const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id); + if (acked) return undefined; - return response; + return response; + } finally { + inbound.close(); + } }