fix(agent-runner): open inbound.db fresh per messages_in read

Cached singleton can return stale rows on virtiofs/NFS mounts,
causing follow-up messages to silently never be polled. Add
openInboundDb() with mmap_size=0 and switch the three messages_in
readers to it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claw
2026-04-30 15:06:01 -04:00
parent 941a75f65d
commit ccfdf2dd75
2 changed files with 71 additions and 32 deletions

View File

@@ -28,11 +28,37 @@ let _inbound: Database | null = null;
let _outbound: Database | null = null; let _outbound: Database | null = null;
let _heartbeatPath: string = DEFAULT_HEARTBEAT_PATH; let _heartbeatPath: string = DEFAULT_HEARTBEAT_PATH;
/** Inbound DB — container opens read-only (host is the sole writer). */ /**
* Avoid all cached db reads; open inbound.db read-only with mmap and page cache disabled.
*
* Use this (not getInboundDb) for readers that need to see host-written rows
* promptly — e.g. messages_in polling. Caller must .close() the returned
* connection (try/finally).
*
* Needed for mounts where host writes don't reliably invalidate
* SQLite's caches: virtiofs (Colima, Lima, Podman Machine, Apple
* Container), NFS.
*
* Cost is microseconds per query, so safe for universal use.
*/
export function openInboundDb(): Database {
const db = new Database(DEFAULT_INBOUND_PATH, { readonly: true });
db.exec('PRAGMA busy_timeout = 5000');
db.exec('PRAGMA mmap_size = 0');
return db;
}
/**
* Inbound DB — long-lived singleton, OK for tables the host writes once
* at spawn and never again (destinations, session_routing). For
* messages_in polling — where the host writes continuously and a stale
* view causes the pollHandle hang — use `openInboundDb()` instead.
*/
export function getInboundDb(): Database { export function getInboundDb(): Database {
if (!_inbound) { if (!_inbound) {
_inbound = new Database(DEFAULT_INBOUND_PATH, { readonly: true }); _inbound = new Database(DEFAULT_INBOUND_PATH, { readonly: true });
_inbound.exec('PRAGMA busy_timeout = 5000'); _inbound.exec('PRAGMA busy_timeout = 5000');
_inbound.exec('PRAGMA mmap_size = 0');
} }
return _inbound; return _inbound;
} }

View File

@@ -8,7 +8,7 @@
* processing_ack. The host reads processing_ack to sync message lifecycle. * processing_ack. The host reads processing_ack to sync message lifecycle.
*/ */
import { getConfig } from '../config.js'; import { getConfig } from '../config.js';
import { getInboundDb, getOutboundDb } from './connection.js'; import { openInboundDb, getOutboundDb } from './connection.js';
export interface MessageInRow { export interface MessageInRow {
id: string; id: string;
@@ -50,31 +50,35 @@ function getMaxMessagesPerPrompt(): number {
* trigger=1 separately (see src/db/session-db.ts). * trigger=1 separately (see src/db/session-db.ts).
*/ */
export function getPendingMessages(): MessageInRow[] { export function getPendingMessages(): MessageInRow[] {
const inbound = getInboundDb(); const inbound = openInboundDb();
const outbound = getOutboundDb(); const outbound = getOutboundDb();
const pending = inbound try {
.prepare( const pending = inbound
`SELECT * FROM messages_in .prepare(
WHERE status = 'pending' `SELECT * FROM messages_in
AND (process_after IS NULL OR datetime(process_after) <= datetime('now')) WHERE status = 'pending'
ORDER BY seq DESC AND (process_after IS NULL OR datetime(process_after) <= datetime('now'))
LIMIT ?`, ORDER BY seq DESC
) LIMIT ?`,
.all(getMaxMessagesPerPrompt()) as MessageInRow[]; )
.all(getMaxMessagesPerPrompt()) as MessageInRow[];
if (pending.length === 0) return []; if (pending.length === 0) return [];
// Filter out messages already acknowledged in outbound.db // Filter out messages already acknowledged in outbound.db
const ackedIds = new Set( const ackedIds = new Set(
(outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map( (outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map(
(r) => r.message_id, (r) => r.message_id,
), ),
); );
// Reverse: we fetched DESC to take the most recent N, but the agent // Reverse: we fetched DESC to take the most recent N, but the agent
// should see them in chronological order (oldest first). // should see them in chronological order (oldest first).
return pending.filter((m) => !ackedIds.has(m.id)).reverse(); return pending.filter((m) => !ackedIds.has(m.id)).reverse();
} finally {
inbound.close();
}
} }
/** Mark messages as processing — writes to processing_ack in outbound.db. */ /** Mark messages as processing — writes to processing_ack in outbound.db. */
@@ -112,7 +116,12 @@ export function markFailed(id: string): void {
/** Get a message by ID (read from inbound.db). */ /** Get a message by ID (read from inbound.db). */
export function getMessageIn(id: string): MessageInRow | undefined { export function getMessageIn(id: string): MessageInRow | undefined {
return getInboundDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined; const inbound = openInboundDb();
try {
return inbound.prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined;
} finally {
inbound.close();
}
} }
/** /**
@@ -120,19 +129,23 @@ export function getMessageIn(id: string): MessageInRow | undefined {
* Reads from inbound.db, checks processing_ack to skip already-handled responses. * Reads from inbound.db, checks processing_ack to skip already-handled responses.
*/ */
export function findQuestionResponse(questionId: string): MessageInRow | undefined { export function findQuestionResponse(questionId: string): MessageInRow | undefined {
const inbound = getInboundDb(); const inbound = openInboundDb();
const outbound = getOutboundDb(); const outbound = getOutboundDb();
const response = inbound try {
.prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?") const response = inbound
.get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined; .prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?")
.get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined;
if (!response) return undefined; if (!response) return undefined;
// Check it hasn't been acked already // Check it hasn't been acked already
const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id); const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id);
if (acked) return undefined; if (acked) return undefined;
return response; return response;
} finally {
inbound.close();
}
} }