v2: fix agent-runner lifecycle and session DB reliability

- Use DELETE journal mode for session DBs instead of WAL. WAL doesn't
  sync reliably across Docker volume mounts (VirtioFS), causing dropped
  writes and duplicate deliveries.
- Add 20s idle detection to end the query stream. The concurrent poll
  tracks SDK activity via a new 'activity' provider event. When no SDK
  events arrive for 20s and no messages are pending, the stream ends
  and the poll loop continues.
- Add touchProcessing heartbeat so the host can distinguish active
  agents from idle ones by checking status_changed recency.
- Catch query errors in the poll loop and write error responses to
  messages_out instead of crashing the process.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-09 01:34:59 +03:00
parent 7201fe5032
commit 6f2a7314d0
8 changed files with 64 additions and 28 deletions

View File

@@ -7,7 +7,7 @@ let _db: Database.Database | null = null;
export function getSessionDb(): Database.Database { export function getSessionDb(): Database.Database {
if (!_db) { if (!_db) {
_db = new Database(process.env.SESSION_DB_PATH || SESSION_DB_PATH); _db = new Database(process.env.SESSION_DB_PATH || SESSION_DB_PATH);
_db.pragma('journal_mode = WAL'); _db.pragma('journal_mode = DELETE');
_db.pragma('foreign_keys = ON'); _db.pragma('foreign_keys = ON');
} }
return _db; return _db;

View File

@@ -47,6 +47,14 @@ export function markCompleted(ids: string[]): void {
})(); })();
} }
/** Update status_changed on processing messages (heartbeat for host idle detection). */
export function touchProcessing(ids: string[]): void {
if (ids.length === 0) return;
const db = getSessionDb();
const stmt = db.prepare("UPDATE messages_in SET status_changed = datetime('now') WHERE id = ? AND status = 'processing'");
for (const id of ids) stmt.run(id);
}
/** Mark a single message as failed. */ /** Mark a single message as failed. */
export function markFailed(id: string): void { export function markFailed(id: string): void {
getSessionDb().prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(id); getSessionDb().prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(id);

View File

@@ -120,10 +120,11 @@ describe('mock provider', () => {
events.push(event); events.push(event);
} }
expect(events.length).toBeGreaterThanOrEqual(2); const typed = events.filter((e) => e.type !== 'activity');
expect(events[0].type).toBe('init'); expect(typed.length).toBeGreaterThanOrEqual(2);
expect(events[1].type).toBe('result'); expect(typed[0].type).toBe('init');
expect((events[1] as { text: string }).text).toBe('Echo: Hello'); expect(typed[1].type).toBe('result');
expect((typed[1] as { text: string }).text).toBe('Echo: Hello');
}); });
it('should handle push() during active query', async () => { it('should handle push() during active query', async () => {

View File

@@ -1,10 +1,11 @@
import { getPendingMessages, markProcessing, markCompleted } from './db/messages-in.js'; import { getPendingMessages, markProcessing, markCompleted, touchProcessing } from './db/messages-in.js';
import { writeMessageOut } from './db/messages-out.js'; import { writeMessageOut } from './db/messages-out.js';
import { formatMessages, extractRouting, type RoutingContext } from './formatter.js'; import { formatMessages, extractRouting, type RoutingContext } from './formatter.js';
import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js'; import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js';
const POLL_INTERVAL_MS = 1000; const POLL_INTERVAL_MS = 1000;
const ACTIVE_POLL_INTERVAL_MS = 500; const ACTIVE_POLL_INTERVAL_MS = 500;
const IDLE_END_MS = 20_000; // End stream after 20s with no SDK events
function log(msg: string): void { function log(msg: string): void {
console.error(`[poll-loop] ${msg}`); console.error(`[poll-loop] ${msg}`);
@@ -68,10 +69,22 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
}); });
// Process the query while concurrently polling for new messages // Process the query while concurrently polling for new messages
const result = await processQuery(query, routing, config); try {
const result = await processQuery(query, routing, config, ids);
if (result.sessionId) sessionId = result.sessionId; if (result.sessionId) sessionId = result.sessionId;
if (result.resumeAt) resumeAt = result.resumeAt; if (result.resumeAt) resumeAt = result.resumeAt;
} catch (err) {
log(`Query error: ${err instanceof Error ? err.message : String(err)}`);
// Write error response so the user knows something went wrong
writeMessageOut({
id: generateId(),
kind: 'chat',
platform_id: routing.platformId,
channel_type: routing.channelType,
thread_id: routing.threadId,
content: JSON.stringify({ text: `Error: ${err instanceof Error ? err.message : String(err)}` }),
});
}
markCompleted(ids); markCompleted(ids);
log(`Completed ${ids.length} message(s)`); log(`Completed ${ids.length} message(s)`);
@@ -83,16 +96,17 @@ interface QueryResult {
resumeAt?: string; resumeAt?: string;
} }
async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig): Promise<QueryResult> { async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig, processingIds: string[]): Promise<QueryResult> {
let querySessionId: string | undefined; let querySessionId: string | undefined;
let done = false; let done = false;
let lastEventTime = Date.now();
// Concurrent polling: push new messages into the active query // Concurrent polling: push follow-ups, checkpoint WAL, detect idle
const pollHandle = setInterval(() => { const pollHandle = setInterval(() => {
if (done) return; if (done) return;
const newMessages = getPendingMessages();
if (newMessages.length === 0) return;
const newMessages = getPendingMessages();
if (newMessages.length > 0) {
const newIds = newMessages.map((m) => m.id); const newIds = newMessages.map((m) => m.id);
markProcessing(newIds); markProcessing(newIds);
@@ -100,17 +114,25 @@ async function processQuery(query: AgentQuery, routing: RoutingContext, config:
log(`Pushing ${newMessages.length} follow-up message(s) into active query`); log(`Pushing ${newMessages.length} follow-up message(s) into active query`);
query.push(prompt); query.push(prompt);
// Update routing env for MCP tools with latest message context
const newRouting = extractRouting(newMessages); const newRouting = extractRouting(newMessages);
setRoutingEnv(newRouting, config.env); setRoutingEnv(newRouting, config.env);
// Mark these completed immediately (they've been pushed to the provider)
markCompleted(newIds); markCompleted(newIds);
lastEventTime = Date.now(); // new input counts as activity
}
// End stream when agent is idle: no SDK events and no pending messages
if (Date.now() - lastEventTime > IDLE_END_MS) {
log(`No SDK events for ${IDLE_END_MS / 1000}s, ending query`);
query.end();
}
}, ACTIVE_POLL_INTERVAL_MS); }, ACTIVE_POLL_INTERVAL_MS);
try { try {
for await (const event of query.events) { for await (const event of query.events) {
lastEventTime = Date.now();
handleEvent(event, routing); handleEvent(event, routing);
touchProcessing(processingIds);
if (event.type === 'init') { if (event.type === 'init') {
querySessionId = event.sessionId; querySessionId = event.sessionId;

View File

@@ -200,6 +200,9 @@ export class ClaudeProvider implements AgentProvider {
if (aborted) return; if (aborted) return;
messageCount++; messageCount++;
// Yield activity for every SDK event so the poll loop knows the agent is working
yield { type: 'activity' };
if (message.type === 'system' && message.subtype === 'init') { if (message.type === 'system' && message.subtype === 'init') {
yield { type: 'init', sessionId: message.session_id }; yield { type: 'init', sessionId: message.session_id };
} else if (message.type === 'result') { } else if (message.type === 'result') {
@@ -213,7 +216,6 @@ export class ClaudeProvider implements AgentProvider {
const tn = message as { summary?: string }; const tn = message as { summary?: string };
yield { type: 'progress', message: tn.summary || 'Task notification' }; yield { type: 'progress', message: tn.summary || 'Task notification' };
} }
// All other message types are logged but not emitted
} }
log(`Query completed after ${messageCount} SDK messages`); log(`Query completed after ${messageCount} SDK messages`);
} }

View File

@@ -20,9 +20,11 @@ export class MockProvider implements AgentProvider {
const events: AsyncIterable<ProviderEvent> = { const events: AsyncIterable<ProviderEvent> = {
async *[Symbol.asyncIterator]() { async *[Symbol.asyncIterator]() {
yield { type: 'activity' };
yield { type: 'init', sessionId: `mock-session-${Date.now()}` }; yield { type: 'init', sessionId: `mock-session-${Date.now()}` };
// Process initial prompt // Process initial prompt
yield { type: 'activity' };
yield { type: 'result', text: responseFactory(input.prompt) }; yield { type: 'result', text: responseFactory(input.prompt) };
// Process any pushed follow-ups // Process any pushed follow-ups

View File

@@ -53,4 +53,5 @@ export type ProviderEvent =
| { type: 'init'; sessionId: string } | { type: 'init'; sessionId: string }
| { type: 'result'; text: string | null } | { type: 'result'; text: string | null }
| { type: 'error'; message: string; retryable: boolean; classification?: string } | { type: 'error'; message: string; retryable: boolean; classification?: string }
| { type: 'progress'; message: string }; | { type: 'progress'; message: string }
| { type: 'activity' };

View File

@@ -80,7 +80,7 @@ export function initSessionFolder(agentGroupId: string, sessionId: string): void
const dbPath = sessionDbPath(agentGroupId, sessionId); const dbPath = sessionDbPath(agentGroupId, sessionId);
if (!fs.existsSync(dbPath)) { if (!fs.existsSync(dbPath)) {
const db = new Database(dbPath); const db = new Database(dbPath);
db.pragma('journal_mode = WAL'); db.pragma('journal_mode = DELETE');
db.exec(SESSION_SCHEMA); db.exec(SESSION_SCHEMA);
db.close(); db.close();
log.debug('Session DB created', { dbPath }); log.debug('Session DB created', { dbPath });
@@ -105,7 +105,7 @@ export function writeSessionMessage(
): void { ): void {
const dbPath = sessionDbPath(agentGroupId, sessionId); const dbPath = sessionDbPath(agentGroupId, sessionId);
const db = new Database(dbPath); const db = new Database(dbPath);
db.pragma('journal_mode = WAL'); db.pragma('journal_mode = DELETE');
try { try {
db.prepare( db.prepare(
@@ -134,7 +134,7 @@ export function writeSessionMessage(
export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database { export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database {
const dbPath = sessionDbPath(agentGroupId, sessionId); const dbPath = sessionDbPath(agentGroupId, sessionId);
const db = new Database(dbPath); const db = new Database(dbPath);
db.pragma('journal_mode = WAL'); db.pragma('journal_mode = DELETE');
return db; return db;
} }