From 6f2a7314d01966fc7ceb31f1c479b9b8246a303b Mon Sep 17 00:00:00 2001 From: gavrielc Date: Thu, 9 Apr 2026 01:34:59 +0300 Subject: [PATCH] v2: fix agent-runner lifecycle and session DB reliability - Use DELETE journal mode for session DBs instead of WAL. WAL doesn't sync reliably across Docker volume mounts (VirtioFS), causing dropped writes and duplicate deliveries. - Add 20s idle detection to end the query stream. The concurrent poll tracks SDK activity via a new 'activity' provider event. When no SDK events arrive for 20s and no messages are pending, the stream ends and the poll loop continues. - Add touchProcessing heartbeat so the host can distinguish active agents from idle ones by checking status_changed recency. - Catch query errors in the poll loop and write error responses to messages_out instead of crashing the process. Co-Authored-By: Claude Opus 4.6 (1M context) --- container/agent-runner/src/db/connection.ts | 2 +- container/agent-runner/src/db/messages-in.ts | 8 +++ container/agent-runner/src/poll-loop.test.ts | 9 +-- container/agent-runner/src/poll-loop.ts | 58 +++++++++++++------ .../agent-runner/src/providers/claude.ts | 4 +- container/agent-runner/src/providers/mock.ts | 2 + container/agent-runner/src/providers/types.ts | 3 +- src/session-manager.ts | 6 +- 8 files changed, 64 insertions(+), 28 deletions(-) diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 9e71e58..a59d731 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -7,7 +7,7 @@ let _db: Database.Database | null = null; export function getSessionDb(): Database.Database { if (!_db) { _db = new Database(process.env.SESSION_DB_PATH || SESSION_DB_PATH); - _db.pragma('journal_mode = WAL'); + _db.pragma('journal_mode = DELETE'); _db.pragma('foreign_keys = ON'); } return _db; diff --git a/container/agent-runner/src/db/messages-in.ts b/container/agent-runner/src/db/messages-in.ts index a68071b..d97a4ba 100644 --- a/container/agent-runner/src/db/messages-in.ts +++ b/container/agent-runner/src/db/messages-in.ts @@ -47,6 +47,14 @@ export function markCompleted(ids: string[]): void { })(); } +/** Update status_changed on processing messages (heartbeat for host idle detection). */ +export function touchProcessing(ids: string[]): void { + if (ids.length === 0) return; + const db = getSessionDb(); + const stmt = db.prepare("UPDATE messages_in SET status_changed = datetime('now') WHERE id = ? AND status = 'processing'"); + for (const id of ids) stmt.run(id); +} + /** Mark a single message as failed. */ export function markFailed(id: string): void { getSessionDb().prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(id); diff --git a/container/agent-runner/src/poll-loop.test.ts b/container/agent-runner/src/poll-loop.test.ts index 7cc3074..03fc0c7 100644 --- a/container/agent-runner/src/poll-loop.test.ts +++ b/container/agent-runner/src/poll-loop.test.ts @@ -120,10 +120,11 @@ describe('mock provider', () => { events.push(event); } - expect(events.length).toBeGreaterThanOrEqual(2); - expect(events[0].type).toBe('init'); - expect(events[1].type).toBe('result'); - expect((events[1] as { text: string }).text).toBe('Echo: Hello'); + const typed = events.filter((e) => e.type !== 'activity'); + expect(typed.length).toBeGreaterThanOrEqual(2); + expect(typed[0].type).toBe('init'); + expect(typed[1].type).toBe('result'); + expect((typed[1] as { text: string }).text).toBe('Echo: Hello'); }); it('should handle push() during active query', async () => { diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index e2712a5..8ae1238 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -1,10 +1,11 @@ -import { getPendingMessages, markProcessing, markCompleted } from './db/messages-in.js'; +import { getPendingMessages, markProcessing, markCompleted, touchProcessing } from './db/messages-in.js'; import { writeMessageOut } from './db/messages-out.js'; import { formatMessages, extractRouting, type RoutingContext } from './formatter.js'; import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js'; const POLL_INTERVAL_MS = 1000; const ACTIVE_POLL_INTERVAL_MS = 500; +const IDLE_END_MS = 20_000; // End stream after 20s with no SDK events function log(msg: string): void { console.error(`[poll-loop] ${msg}`); @@ -68,10 +69,22 @@ export async function runPollLoop(config: PollLoopConfig): Promise { }); // Process the query while concurrently polling for new messages - const result = await processQuery(query, routing, config); - - if (result.sessionId) sessionId = result.sessionId; - if (result.resumeAt) resumeAt = result.resumeAt; + try { + const result = await processQuery(query, routing, config, ids); + if (result.sessionId) sessionId = result.sessionId; + if (result.resumeAt) resumeAt = result.resumeAt; + } catch (err) { + log(`Query error: ${err instanceof Error ? err.message : String(err)}`); + // Write error response so the user knows something went wrong + writeMessageOut({ + id: generateId(), + kind: 'chat', + platform_id: routing.platformId, + channel_type: routing.channelType, + thread_id: routing.threadId, + content: JSON.stringify({ text: `Error: ${err instanceof Error ? err.message : String(err)}` }), + }); + } markCompleted(ids); log(`Completed ${ids.length} message(s)`); @@ -83,34 +96,43 @@ interface QueryResult { resumeAt?: string; } -async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig): Promise { +async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig, processingIds: string[]): Promise { let querySessionId: string | undefined; let done = false; + let lastEventTime = Date.now(); - // Concurrent polling: push new messages into the active query + // Concurrent polling: push follow-ups, checkpoint WAL, detect idle const pollHandle = setInterval(() => { if (done) return; + const newMessages = getPendingMessages(); - if (newMessages.length === 0) return; + if (newMessages.length > 0) { + const newIds = newMessages.map((m) => m.id); + markProcessing(newIds); - const newIds = newMessages.map((m) => m.id); - markProcessing(newIds); + const prompt = formatMessages(newMessages); + log(`Pushing ${newMessages.length} follow-up message(s) into active query`); + query.push(prompt); - const prompt = formatMessages(newMessages); - log(`Pushing ${newMessages.length} follow-up message(s) into active query`); - query.push(prompt); + const newRouting = extractRouting(newMessages); + setRoutingEnv(newRouting, config.env); - // Update routing env for MCP tools with latest message context - const newRouting = extractRouting(newMessages); - setRoutingEnv(newRouting, config.env); + markCompleted(newIds); + lastEventTime = Date.now(); // new input counts as activity + } - // Mark these completed immediately (they've been pushed to the provider) - markCompleted(newIds); + // End stream when agent is idle: no SDK events and no pending messages + if (Date.now() - lastEventTime > IDLE_END_MS) { + log(`No SDK events for ${IDLE_END_MS / 1000}s, ending query`); + query.end(); + } }, ACTIVE_POLL_INTERVAL_MS); try { for await (const event of query.events) { + lastEventTime = Date.now(); handleEvent(event, routing); + touchProcessing(processingIds); if (event.type === 'init') { querySessionId = event.sessionId; diff --git a/container/agent-runner/src/providers/claude.ts b/container/agent-runner/src/providers/claude.ts index c25ff37..e17c5c5 100644 --- a/container/agent-runner/src/providers/claude.ts +++ b/container/agent-runner/src/providers/claude.ts @@ -200,6 +200,9 @@ export class ClaudeProvider implements AgentProvider { if (aborted) return; messageCount++; + // Yield activity for every SDK event so the poll loop knows the agent is working + yield { type: 'activity' }; + if (message.type === 'system' && message.subtype === 'init') { yield { type: 'init', sessionId: message.session_id }; } else if (message.type === 'result') { @@ -213,7 +216,6 @@ export class ClaudeProvider implements AgentProvider { const tn = message as { summary?: string }; yield { type: 'progress', message: tn.summary || 'Task notification' }; } - // All other message types are logged but not emitted } log(`Query completed after ${messageCount} SDK messages`); } diff --git a/container/agent-runner/src/providers/mock.ts b/container/agent-runner/src/providers/mock.ts index ed5cad1..0794557 100644 --- a/container/agent-runner/src/providers/mock.ts +++ b/container/agent-runner/src/providers/mock.ts @@ -20,9 +20,11 @@ export class MockProvider implements AgentProvider { const events: AsyncIterable = { async *[Symbol.asyncIterator]() { + yield { type: 'activity' }; yield { type: 'init', sessionId: `mock-session-${Date.now()}` }; // Process initial prompt + yield { type: 'activity' }; yield { type: 'result', text: responseFactory(input.prompt) }; // Process any pushed follow-ups diff --git a/container/agent-runner/src/providers/types.ts b/container/agent-runner/src/providers/types.ts index 6e43f3b..b0ad4da 100644 --- a/container/agent-runner/src/providers/types.ts +++ b/container/agent-runner/src/providers/types.ts @@ -53,4 +53,5 @@ export type ProviderEvent = | { type: 'init'; sessionId: string } | { type: 'result'; text: string | null } | { type: 'error'; message: string; retryable: boolean; classification?: string } - | { type: 'progress'; message: string }; + | { type: 'progress'; message: string } + | { type: 'activity' }; diff --git a/src/session-manager.ts b/src/session-manager.ts index 361c198..4048cfb 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -80,7 +80,7 @@ export function initSessionFolder(agentGroupId: string, sessionId: string): void const dbPath = sessionDbPath(agentGroupId, sessionId); if (!fs.existsSync(dbPath)) { const db = new Database(dbPath); - db.pragma('journal_mode = WAL'); + db.pragma('journal_mode = DELETE'); db.exec(SESSION_SCHEMA); db.close(); log.debug('Session DB created', { dbPath }); @@ -105,7 +105,7 @@ export function writeSessionMessage( ): void { const dbPath = sessionDbPath(agentGroupId, sessionId); const db = new Database(dbPath); - db.pragma('journal_mode = WAL'); + db.pragma('journal_mode = DELETE'); try { db.prepare( @@ -134,7 +134,7 @@ export function writeSessionMessage( export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database { const dbPath = sessionDbPath(agentGroupId, sessionId); const db = new Database(dbPath); - db.pragma('journal_mode = WAL'); + db.pragma('journal_mode = DELETE'); return db; }