v2: split session DB into inbound/outbound for write isolation
Eliminates SQLite write contention across the host-container mount boundary by splitting the single session.db into two files, each with exactly one writer: inbound.db — host writes (messages_in, delivered tracking) outbound.db — container writes (messages_out, processing_ack) Key changes: - Host uses even seq numbers, container uses odd (collision-free) - Container heartbeat via file touch instead of DB UPDATE - Scheduling MCP tools now emit system actions via messages_out (host applies them to inbound.db during delivery) - Host sweep reads processing_ack + heartbeat file for stale detection - OneCLI ensureAgent() call added (was missing from v2, caused applyContainerConfig to reject unknown agent identifiers) Verified: tsc clean, 327 tests pass, real e2e through Docker works. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -162,7 +162,7 @@ describe('channel + router integration', () => {
|
||||
it('should route inbound message from adapter to session DB', async () => {
|
||||
const { routeInbound } = await import('../router.js');
|
||||
const { findSession } = await import('../db/sessions.js');
|
||||
const { sessionDbPath } = await import('../session-manager.js');
|
||||
const { inboundDbPath } = await import('../session-manager.js');
|
||||
|
||||
// Simulate what the adapter bridge does: stringify content, call routeInbound
|
||||
const inboundContent = { sender: 'TestUser', senderId: 'u1', text: 'Hello from adapter', isFromMe: false };
|
||||
@@ -183,7 +183,7 @@ describe('channel + router integration', () => {
|
||||
const session = findSession('mg-1', null);
|
||||
expect(session).toBeDefined();
|
||||
|
||||
const dbPath = sessionDbPath('ag-1', session!.id);
|
||||
const dbPath = inboundDbPath('ag-1', session!.id);
|
||||
const db = new Database(dbPath);
|
||||
const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{ id: string; content: string }>;
|
||||
db.close();
|
||||
|
||||
@@ -19,7 +19,6 @@ import {
|
||||
markContainerIdle,
|
||||
markContainerRunning,
|
||||
markContainerStopped,
|
||||
sessionDbPath,
|
||||
sessionDir,
|
||||
} from './session-manager.js';
|
||||
import type { AgentGroup, Session } from './types.js';
|
||||
@@ -135,7 +134,7 @@ function buildMounts(agentGroup: AgentGroup, session: Session): VolumeMount[] {
|
||||
const sessDir = sessionDir(agentGroup.id, session.id);
|
||||
const groupDir = path.resolve(GROUPS_DIR, agentGroup.folder);
|
||||
|
||||
// Session folder at /workspace (contains session.db, outbox/, .claude/)
|
||||
// Session folder at /workspace (contains inbound.db, outbound.db, outbox/, .claude/)
|
||||
mounts.push({ hostPath: sessDir, containerPath: '/workspace', readonly: false });
|
||||
|
||||
// Agent group folder at /workspace/agent
|
||||
@@ -226,7 +225,10 @@ async function buildContainerArgs(
|
||||
// Environment
|
||||
args.push('-e', `TZ=${TIMEZONE}`);
|
||||
args.push('-e', `AGENT_PROVIDER=${session.agent_provider || agentGroup.agent_provider || 'claude'}`);
|
||||
args.push('-e', `SESSION_DB_PATH=/workspace/session.db`);
|
||||
// Two-DB split: container reads inbound.db, writes outbound.db
|
||||
args.push('-e', 'SESSION_INBOUND_DB_PATH=/workspace/inbound.db');
|
||||
args.push('-e', 'SESSION_OUTBOUND_DB_PATH=/workspace/outbound.db');
|
||||
args.push('-e', 'SESSION_HEARTBEAT_PATH=/workspace/.heartbeat');
|
||||
|
||||
// Pass admin user ID and assistant name from messaging group/agent group
|
||||
if (session.messaging_group_id) {
|
||||
@@ -239,10 +241,22 @@ async function buildContainerArgs(
|
||||
args.push('-e', `NANOCLAW_ASSISTANT_NAME=${agentGroup.name}`);
|
||||
}
|
||||
|
||||
// OneCLI gateway
|
||||
const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier });
|
||||
if (onecliApplied) {
|
||||
log.debug('OneCLI gateway applied', { containerName });
|
||||
// OneCLI gateway — injects HTTPS_PROXY + certs so container API calls
|
||||
// are routed through the agent vault for credential injection.
|
||||
// Must ensureAgent first for non-admin groups, otherwise applyContainerConfig
|
||||
// rejects the unknown agent identifier and returns false.
|
||||
try {
|
||||
if (agentIdentifier) {
|
||||
await onecli.ensureAgent({ name: agentGroup.name, identifier: agentIdentifier });
|
||||
}
|
||||
const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier });
|
||||
if (onecliApplied) {
|
||||
log.info('OneCLI gateway applied', { containerName });
|
||||
} else {
|
||||
log.warn('OneCLI gateway not applied — container will have no credentials', { containerName });
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn('OneCLI gateway error — container will have no credentials', { containerName, err });
|
||||
}
|
||||
|
||||
// Host gateway
|
||||
|
||||
@@ -69,16 +69,21 @@ CREATE TABLE pending_questions (
|
||||
`;
|
||||
|
||||
/**
|
||||
* Session DB schema — created fresh by the host for each session.
|
||||
* Session DB schemas — split into two files so each has exactly one writer.
|
||||
* This eliminates SQLite write contention across the host-container mount boundary.
|
||||
*
|
||||
* inbound.db — host writes, container reads (read-only mount or open read-only)
|
||||
* outbound.db — container writes, host reads (read-only open)
|
||||
*/
|
||||
export const SESSION_SCHEMA = `
|
||||
|
||||
/** Host-owned: inbound messages + delivery tracking. */
|
||||
export const INBOUND_SCHEMA = `
|
||||
CREATE TABLE messages_in (
|
||||
id TEXT PRIMARY KEY,
|
||||
seq INTEGER UNIQUE,
|
||||
kind TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
status_changed TEXT,
|
||||
process_after TEXT,
|
||||
recurrence TEXT,
|
||||
tries INTEGER DEFAULT 0,
|
||||
@@ -88,12 +93,21 @@ CREATE TABLE messages_in (
|
||||
content TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- Host tracks which messages_out IDs have been delivered.
|
||||
-- Avoids writing to outbound.db (container-owned).
|
||||
CREATE TABLE delivered (
|
||||
message_out_id TEXT PRIMARY KEY,
|
||||
delivered_at TEXT NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
/** Container-owned: outbound messages + processing acknowledgments. */
|
||||
export const OUTBOUND_SCHEMA = `
|
||||
CREATE TABLE messages_out (
|
||||
id TEXT PRIMARY KEY,
|
||||
seq INTEGER UNIQUE,
|
||||
in_reply_to TEXT,
|
||||
timestamp TEXT NOT NULL,
|
||||
delivered INTEGER DEFAULT 0,
|
||||
deliver_after TEXT,
|
||||
recurrence TEXT,
|
||||
kind TEXT NOT NULL,
|
||||
@@ -102,4 +116,13 @@ CREATE TABLE messages_out (
|
||||
thread_id TEXT,
|
||||
content TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- Container tracks processing status here instead of updating messages_in.
|
||||
-- Host reads this to know which messages have been processed.
|
||||
-- On container startup, stale 'processing' entries are cleared (crash recovery).
|
||||
CREATE TABLE processing_ack (
|
||||
message_id TEXT PRIMARY KEY,
|
||||
status TEXT NOT NULL,
|
||||
status_changed TEXT NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
126
src/delivery.ts
126
src/delivery.ts
@@ -1,6 +1,11 @@
|
||||
/**
|
||||
* Outbound message delivery.
|
||||
* Polls active session DBs for undelivered messages_out, delivers through channel adapters.
|
||||
* Polls session outbound DBs for undelivered messages, delivers through channel adapters.
|
||||
*
|
||||
* Two-DB architecture:
|
||||
* - Reads messages_out from outbound.db (container-owned, opened read-only)
|
||||
* - Tracks delivery in inbound.db's `delivered` table (host-owned)
|
||||
* - Never writes to outbound.db — preserves single-writer-per-file invariant
|
||||
*/
|
||||
import Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
@@ -9,7 +14,7 @@ import path from 'path';
|
||||
import { getRunningSessions, getActiveSessions, createPendingQuestion } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import { log } from './log.js';
|
||||
import { openSessionDb, sessionDir } from './session-manager.js';
|
||||
import { openInboundDb, openOutboundDb, sessionDir, inboundDbPath } from './session-manager.js';
|
||||
import { resetContainerIdleTimer } from './container-runner.js';
|
||||
import type { OutboundFile } from './channels/adapter.js';
|
||||
import type { Session } from './types.js';
|
||||
@@ -85,19 +90,21 @@ async function deliverSessionMessages(session: Session): Promise<void> {
|
||||
const agentGroup = getAgentGroup(session.agent_group_id);
|
||||
if (!agentGroup) return;
|
||||
|
||||
let db: Database.Database;
|
||||
let outDb: Database.Database;
|
||||
let inDb: Database.Database;
|
||||
try {
|
||||
db = openSessionDb(agentGroup.id, session.id);
|
||||
outDb = openOutboundDb(agentGroup.id, session.id);
|
||||
inDb = openInboundDb(agentGroup.id, session.id);
|
||||
} catch {
|
||||
return; // Session DB might not exist yet
|
||||
return; // DBs might not exist yet
|
||||
}
|
||||
|
||||
try {
|
||||
const undelivered = db
|
||||
// Read all due messages from outbound.db (read-only)
|
||||
const allDue = outDb
|
||||
.prepare(
|
||||
`SELECT * FROM messages_out
|
||||
WHERE delivered = 0
|
||||
AND (deliver_after IS NULL OR deliver_after <= datetime('now'))
|
||||
WHERE (deliver_after IS NULL OR deliver_after <= datetime('now'))
|
||||
ORDER BY timestamp ASC`,
|
||||
)
|
||||
.all() as Array<{
|
||||
@@ -109,19 +116,32 @@ async function deliverSessionMessages(session: Session): Promise<void> {
|
||||
content: string;
|
||||
}>;
|
||||
|
||||
if (allDue.length === 0) return;
|
||||
|
||||
// Filter out already-delivered messages using inbound.db's delivered table
|
||||
const deliveredIds = new Set(
|
||||
(inDb.prepare('SELECT message_out_id FROM delivered').all() as Array<{ message_out_id: string }>).map(
|
||||
(r) => r.message_out_id,
|
||||
),
|
||||
);
|
||||
const undelivered = allDue.filter((m) => !deliveredIds.has(m.id));
|
||||
if (undelivered.length === 0) return;
|
||||
|
||||
for (const msg of undelivered) {
|
||||
try {
|
||||
await deliverMessage(msg, session);
|
||||
db.prepare('UPDATE messages_out SET delivered = 1 WHERE id = ?').run(msg.id);
|
||||
await deliverMessage(msg, session, inDb);
|
||||
// Track delivery in inbound.db (host-owned) — not outbound.db
|
||||
inDb.prepare("INSERT OR IGNORE INTO delivered (message_out_id, delivered_at) VALUES (?, datetime('now'))").run(
|
||||
msg.id,
|
||||
);
|
||||
resetContainerIdleTimer(session.id);
|
||||
} catch (err) {
|
||||
log.error('Failed to deliver message', { messageId: msg.id, sessionId: session.id, err });
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
db.close();
|
||||
outDb.close();
|
||||
inDb.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +155,7 @@ async function deliverMessage(
|
||||
content: string;
|
||||
},
|
||||
session: Session,
|
||||
inDb: Database.Database,
|
||||
): Promise<void> {
|
||||
if (!deliveryAdapter) {
|
||||
log.warn('No delivery adapter configured, dropping message', { id: msg.id });
|
||||
@@ -143,10 +164,9 @@ async function deliverMessage(
|
||||
|
||||
const content = JSON.parse(msg.content);
|
||||
|
||||
// System actions — handle internally
|
||||
// System actions — handle internally (schedule_task, cancel_task, etc.)
|
||||
if (msg.kind === 'system') {
|
||||
log.info('System action from agent', { sessionId: session.id, action: content.action });
|
||||
// TODO: handle system actions (register_group, reset_session, etc.)
|
||||
await handleSystemAction(content, session, inDb);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -207,6 +227,84 @@ async function deliverMessage(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle system actions from the container agent.
|
||||
* These are written to messages_out because the container can't write to inbound.db.
|
||||
* The host applies them to inbound.db here.
|
||||
*/
|
||||
async function handleSystemAction(
|
||||
content: Record<string, unknown>,
|
||||
session: Session,
|
||||
inDb: Database.Database,
|
||||
): Promise<void> {
|
||||
const action = content.action as string;
|
||||
log.info('System action from agent', { sessionId: session.id, action });
|
||||
|
||||
switch (action) {
|
||||
case 'schedule_task': {
|
||||
const taskId = content.taskId as string;
|
||||
const prompt = content.prompt as string;
|
||||
const script = content.script as string | null;
|
||||
const processAfter = content.processAfter as string;
|
||||
const recurrence = (content.recurrence as string) || null;
|
||||
|
||||
// Compute next even seq for host-owned inbound.db
|
||||
const maxSeq = (
|
||||
inDb.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }
|
||||
).m;
|
||||
const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2);
|
||||
|
||||
inDb
|
||||
.prepare(
|
||||
`INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content)
|
||||
VALUES (@id, @seq, datetime('now'), 'pending', 0, @process_after, @recurrence, 'task', @platform_id, @channel_type, @thread_id, @content)`,
|
||||
)
|
||||
.run({
|
||||
id: taskId,
|
||||
seq: nextSeq,
|
||||
process_after: processAfter,
|
||||
recurrence,
|
||||
platform_id: content.platformId ?? null,
|
||||
channel_type: content.channelType ?? null,
|
||||
thread_id: content.threadId ?? null,
|
||||
content: JSON.stringify({ prompt, script }),
|
||||
});
|
||||
log.info('Scheduled task created', { taskId, processAfter, recurrence });
|
||||
break;
|
||||
}
|
||||
|
||||
case 'cancel_task': {
|
||||
const taskId = content.taskId as string;
|
||||
inDb
|
||||
.prepare("UPDATE messages_in SET status = 'completed' WHERE id = ? AND kind = 'task' AND status IN ('pending', 'paused')")
|
||||
.run(taskId);
|
||||
log.info('Task cancelled', { taskId });
|
||||
break;
|
||||
}
|
||||
|
||||
case 'pause_task': {
|
||||
const taskId = content.taskId as string;
|
||||
inDb
|
||||
.prepare("UPDATE messages_in SET status = 'paused' WHERE id = ? AND kind = 'task' AND status = 'pending'")
|
||||
.run(taskId);
|
||||
log.info('Task paused', { taskId });
|
||||
break;
|
||||
}
|
||||
|
||||
case 'resume_task': {
|
||||
const taskId = content.taskId as string;
|
||||
inDb
|
||||
.prepare("UPDATE messages_in SET status = 'pending' WHERE id = ? AND kind = 'task' AND status = 'paused'")
|
||||
.run(taskId);
|
||||
log.info('Task resumed', { taskId });
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
log.warn('Unknown system action', { action });
|
||||
}
|
||||
}
|
||||
|
||||
export function stopDeliveryPolls(): void {
|
||||
activePolling = false;
|
||||
sweepPolling = false;
|
||||
|
||||
@@ -21,7 +21,8 @@ import {
|
||||
writeSessionMessage,
|
||||
initSessionFolder,
|
||||
sessionDir,
|
||||
sessionDbPath,
|
||||
inboundDbPath,
|
||||
outboundDbPath,
|
||||
sessionsBaseDir,
|
||||
} from './session-manager.js';
|
||||
import { getSession, findSession } from './db/sessions.js';
|
||||
@@ -84,22 +85,29 @@ describe('session manager', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('should create session folder and DB', () => {
|
||||
it('should create session folder and both DBs', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const dir = sessionDir('ag-1', 'sess-test');
|
||||
expect(fs.existsSync(dir)).toBe(true);
|
||||
expect(fs.existsSync(path.join(dir, 'outbox'))).toBe(true);
|
||||
|
||||
const dbPath = sessionDbPath('ag-1', 'sess-test');
|
||||
expect(fs.existsSync(dbPath)).toBe(true);
|
||||
// Verify inbound.db
|
||||
const inPath = inboundDbPath('ag-1', 'sess-test');
|
||||
expect(fs.existsSync(inPath)).toBe(true);
|
||||
const inDb = new Database(inPath);
|
||||
const inTables = inDb.prepare("SELECT name FROM sqlite_master WHERE type='table'").all() as Array<{ name: string }>;
|
||||
expect(inTables.map((t) => t.name)).toContain('messages_in');
|
||||
expect(inTables.map((t) => t.name)).toContain('delivered');
|
||||
inDb.close();
|
||||
|
||||
// Verify session DB has the right tables
|
||||
const db = new Database(dbPath);
|
||||
const tables = db.prepare("SELECT name FROM sqlite_master WHERE type='table'").all() as Array<{ name: string }>;
|
||||
const tableNames = tables.map((t) => t.name);
|
||||
expect(tableNames).toContain('messages_in');
|
||||
expect(tableNames).toContain('messages_out');
|
||||
db.close();
|
||||
// Verify outbound.db
|
||||
const outPath = outboundDbPath('ag-1', 'sess-test');
|
||||
expect(fs.existsSync(outPath)).toBe(true);
|
||||
const outDb = new Database(outPath);
|
||||
const outTables = outDb.prepare("SELECT name FROM sqlite_master WHERE type='table'").all() as Array<{ name: string }>;
|
||||
expect(outTables.map((t) => t.name)).toContain('messages_out');
|
||||
expect(outTables.map((t) => t.name)).toContain('processing_ack');
|
||||
outDb.close();
|
||||
});
|
||||
|
||||
it('should resolve to existing session (shared mode)', () => {
|
||||
@@ -124,7 +132,7 @@ describe('session manager', () => {
|
||||
expect(s2.id).toBe(s1.id);
|
||||
});
|
||||
|
||||
it('should write message to session DB', () => {
|
||||
it('should write message to inbound DB', () => {
|
||||
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
|
||||
writeSessionMessage('ag-1', session.id, {
|
||||
@@ -137,8 +145,8 @@ describe('session manager', () => {
|
||||
content: JSON.stringify({ sender: 'User', text: 'Hello' }),
|
||||
});
|
||||
|
||||
// Read from the session DB
|
||||
const dbPath = sessionDbPath('ag-1', session.id);
|
||||
// Read from the inbound DB
|
||||
const dbPath = inboundDbPath('ag-1', session.id);
|
||||
const db = new Database(dbPath);
|
||||
const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{
|
||||
id: string;
|
||||
@@ -223,8 +231,8 @@ describe('router', () => {
|
||||
const session = findSession('mg-1', null);
|
||||
expect(session).toBeDefined();
|
||||
|
||||
// Verify message was written to session DB
|
||||
const dbPath = sessionDbPath('ag-1', session!.id);
|
||||
// Verify message was written to inbound DB
|
||||
const dbPath = inboundDbPath('ag-1', session!.id);
|
||||
const db = new Database(dbPath);
|
||||
const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{ id: string; content: string }>;
|
||||
db.close();
|
||||
@@ -239,8 +247,6 @@ describe('router', () => {
|
||||
it('should auto-create messaging group for unknown platform', async () => {
|
||||
const { routeInbound } = await import('./router.js');
|
||||
|
||||
// This platform ID isn't registered — but since there's no agent configured for it,
|
||||
// it should create the messaging group but not route (no agents configured)
|
||||
const event: InboundEvent = {
|
||||
channelType: 'slack',
|
||||
platformId: 'C-NEW-CHANNEL',
|
||||
@@ -255,7 +261,6 @@ describe('router', () => {
|
||||
|
||||
await routeInbound(event);
|
||||
|
||||
// Messaging group should be created
|
||||
const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js');
|
||||
const mg = getMessagingGroupByPlatform('slack', 'C-NEW-CHANNEL');
|
||||
expect(mg).toBeDefined();
|
||||
@@ -285,7 +290,7 @@ describe('router', () => {
|
||||
|
||||
// Both should be in the same session
|
||||
const session = findSession('mg-1', null);
|
||||
const dbPath = sessionDbPath('ag-1', session!.id);
|
||||
const dbPath = inboundDbPath('ag-1', session!.id);
|
||||
const db = new Database(dbPath);
|
||||
const rows = db.prepare('SELECT * FROM messages_in ORDER BY timestamp').all();
|
||||
db.close();
|
||||
@@ -295,7 +300,7 @@ describe('router', () => {
|
||||
});
|
||||
|
||||
describe('delivery', () => {
|
||||
it('should detect undelivered messages in session DB', () => {
|
||||
it('should detect undelivered messages in outbound DB', () => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
@@ -317,16 +322,15 @@ describe('delivery', () => {
|
||||
|
||||
const { session } = resolveSession('ag-1', 'mg-test', null, 'shared');
|
||||
|
||||
// Write a response to the session DB (simulating what the agent-runner does)
|
||||
const dbPath = sessionDbPath('ag-1', session.id);
|
||||
// Write a response to the outbound DB (simulating what the agent-runner does)
|
||||
const dbPath = outboundDbPath('ag-1', session.id);
|
||||
const db = new Database(dbPath);
|
||||
db.pragma('journal_mode = WAL');
|
||||
db.prepare(
|
||||
`INSERT INTO messages_out (id, timestamp, delivered, kind, platform_id, channel_type, content)
|
||||
VALUES ('out-1', datetime('now'), 0, 'chat', 'chan-123', 'discord', ?)`,
|
||||
`INSERT INTO messages_out (id, timestamp, kind, platform_id, channel_type, content)
|
||||
VALUES ('out-1', datetime('now'), 'chat', 'chan-123', 'discord', ?)`,
|
||||
).run(JSON.stringify({ text: 'Agent response' }));
|
||||
|
||||
const undelivered = db.prepare('SELECT * FROM messages_out WHERE delivered = 0').all() as Array<{
|
||||
const undelivered = db.prepare('SELECT * FROM messages_out').all() as Array<{
|
||||
id: string;
|
||||
content: string;
|
||||
}>;
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
/**
|
||||
* Host sweep — periodic maintenance of all session DBs.
|
||||
*
|
||||
* - Wake containers for sessions with due messages (process_after)
|
||||
* - Detect stale processing messages (container crash) → reset with backoff
|
||||
* - Insert next occurrence for recurring messages
|
||||
* - Kill idle containers past timeout
|
||||
* Two-DB architecture:
|
||||
* - Reads processing_ack from outbound.db to sync message status
|
||||
* - Writes to inbound.db (host-owned) for status updates and recurrence
|
||||
* - Uses heartbeat file mtime for stale container detection (not DB writes)
|
||||
* - Never writes to outbound.db — preserves single-writer-per-file invariant
|
||||
*/
|
||||
import Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
@@ -12,7 +13,7 @@ import fs from 'fs';
|
||||
import { getActiveSessions, updateSession } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import { log } from './log.js';
|
||||
import { openSessionDb, sessionDbPath } from './session-manager.js';
|
||||
import { openInboundDb, openOutboundDb, inboundDbPath, outboundDbPath, heartbeatPath } from './session-manager.js';
|
||||
import { wakeContainer, isContainerRunning } from './container-runner.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
@@ -52,21 +53,31 @@ async function sweepSession(session: Session): Promise<void> {
|
||||
const agentGroup = getAgentGroup(session.agent_group_id);
|
||||
if (!agentGroup) return;
|
||||
|
||||
const dbPath = sessionDbPath(agentGroup.id, session.id);
|
||||
if (!fs.existsSync(dbPath)) return;
|
||||
const inPath = inboundDbPath(agentGroup.id, session.id);
|
||||
if (!fs.existsSync(inPath)) return;
|
||||
|
||||
let db: Database.Database;
|
||||
let inDb: Database.Database;
|
||||
let outDb: Database.Database | null = null;
|
||||
try {
|
||||
db = new Database(dbPath);
|
||||
db.pragma('journal_mode = DELETE');
|
||||
db.pragma('busy_timeout = 5000');
|
||||
inDb = openInboundDb(agentGroup.id, session.id);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 1. Check for due pending messages → wake container
|
||||
const dueMessages = db
|
||||
outDb = openOutboundDb(agentGroup.id, session.id);
|
||||
} catch {
|
||||
// outbound.db might not exist yet (container hasn't started)
|
||||
}
|
||||
|
||||
try {
|
||||
// 1. Sync processing_ack → messages_in status
|
||||
if (outDb) {
|
||||
syncProcessingAcks(inDb, outDb);
|
||||
}
|
||||
|
||||
// 2. Check for due pending messages → wake container
|
||||
const dueMessages = inDb
|
||||
.prepare(
|
||||
`SELECT COUNT(*) as count FROM messages_in
|
||||
WHERE status = 'pending'
|
||||
@@ -79,90 +90,134 @@ async function sweepSession(session: Session): Promise<void> {
|
||||
await wakeContainer(session);
|
||||
}
|
||||
|
||||
// 2. Detect stale processing messages
|
||||
const staleMessages = db
|
||||
.prepare(
|
||||
`SELECT id, tries FROM messages_in
|
||||
WHERE status = 'processing'
|
||||
AND status_changed < datetime('now', '-${Math.floor(STALE_THRESHOLD_MS / 1000)} seconds')`,
|
||||
)
|
||||
.all() as Array<{ id: string; tries: number }>;
|
||||
|
||||
for (const msg of staleMessages) {
|
||||
if (msg.tries >= MAX_TRIES) {
|
||||
db.prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(
|
||||
msg.id,
|
||||
);
|
||||
log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id });
|
||||
} else {
|
||||
const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries);
|
||||
const backoffSec = Math.floor(backoffMs / 1000);
|
||||
db.prepare(
|
||||
`UPDATE messages_in SET status = 'pending', status_changed = datetime('now'), process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`,
|
||||
).run(msg.id);
|
||||
log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs });
|
||||
}
|
||||
// 3. Detect stale containers via heartbeat file
|
||||
if (outDb) {
|
||||
detectStaleContainers(inDb, outDb, session, agentGroup.id);
|
||||
}
|
||||
|
||||
// 3. Handle recurrence for completed messages
|
||||
const completedRecurring = db
|
||||
.prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL")
|
||||
.all() as Array<{
|
||||
id: string;
|
||||
kind: string;
|
||||
content: string;
|
||||
recurrence: string;
|
||||
process_after: string | null;
|
||||
platform_id: string | null;
|
||||
channel_type: string | null;
|
||||
thread_id: string | null;
|
||||
}>;
|
||||
|
||||
for (const msg of completedRecurring) {
|
||||
try {
|
||||
// Dynamic import to avoid loading cron-parser at module level
|
||||
const { CronExpressionParser } = await import('cron-parser');
|
||||
const interval = CronExpressionParser.parse(msg.recurrence);
|
||||
const nextRun = interval.next().toISOString();
|
||||
const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
|
||||
// Compute next seq from both tables (same pattern as session-manager.ts)
|
||||
const nextSeq = (
|
||||
db
|
||||
.prepare(
|
||||
`SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM (
|
||||
SELECT seq FROM messages_in WHERE seq IS NOT NULL
|
||||
UNION ALL
|
||||
SELECT seq FROM messages_out WHERE seq IS NOT NULL
|
||||
)`,
|
||||
)
|
||||
.get() as { next: number }
|
||||
).next;
|
||||
|
||||
db.prepare(
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content)
|
||||
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`,
|
||||
).run(
|
||||
newId,
|
||||
nextSeq,
|
||||
msg.kind,
|
||||
nextRun,
|
||||
msg.recurrence,
|
||||
msg.platform_id,
|
||||
msg.channel_type,
|
||||
msg.thread_id,
|
||||
msg.content,
|
||||
);
|
||||
|
||||
// Remove recurrence from the completed message so it doesn't spawn again
|
||||
db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id);
|
||||
|
||||
log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun });
|
||||
} catch (err) {
|
||||
log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err });
|
||||
}
|
||||
}
|
||||
// 4. Handle recurrence for completed messages
|
||||
handleRecurrence(inDb, session);
|
||||
} finally {
|
||||
db.close();
|
||||
inDb.close();
|
||||
outDb?.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync completed/failed processing_ack entries → messages_in.status.
|
||||
* Only syncs terminal states — 'processing' is handled by stale detection.
|
||||
*/
|
||||
function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
|
||||
const completed = outDb
|
||||
.prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')")
|
||||
.all() as Array<{ message_id: string }>;
|
||||
|
||||
if (completed.length === 0) return;
|
||||
|
||||
// Batch-update messages_in status for completed/failed messages
|
||||
const updateStmt = inDb.prepare(
|
||||
"UPDATE messages_in SET status = 'completed' WHERE id = ? AND status != 'completed'",
|
||||
);
|
||||
inDb.transaction(() => {
|
||||
for (const { message_id } of completed) {
|
||||
updateStmt.run(message_id);
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect stale containers using heartbeat file mtime.
|
||||
* If the heartbeat is older than STALE_THRESHOLD and processing_ack has
|
||||
* 'processing' entries, the container likely crashed — reset with backoff.
|
||||
*/
|
||||
function detectStaleContainers(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
agentGroupId: string,
|
||||
): void {
|
||||
const hbPath = heartbeatPath(agentGroupId, session.id);
|
||||
let heartbeatAge = Infinity;
|
||||
try {
|
||||
const stat = fs.statSync(hbPath);
|
||||
heartbeatAge = Date.now() - stat.mtimeMs;
|
||||
} catch {
|
||||
// No heartbeat file — container may never have started, or it's very old
|
||||
}
|
||||
|
||||
if (heartbeatAge < STALE_THRESHOLD_MS) return; // Container is alive
|
||||
|
||||
// Heartbeat is stale — check for stuck processing entries
|
||||
const processing = outDb
|
||||
.prepare("SELECT message_id FROM processing_ack WHERE status = 'processing'")
|
||||
.all() as Array<{ message_id: string }>;
|
||||
|
||||
if (processing.length === 0) return;
|
||||
|
||||
for (const { message_id } of processing) {
|
||||
const msg = inDb
|
||||
.prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?')
|
||||
.get(message_id, 'pending') as { id: string; tries: number } | undefined;
|
||||
|
||||
if (!msg) continue;
|
||||
|
||||
if (msg.tries >= MAX_TRIES) {
|
||||
inDb.prepare("UPDATE messages_in SET status = 'failed' WHERE id = ?").run(msg.id);
|
||||
log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id });
|
||||
} else {
|
||||
const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries);
|
||||
const backoffSec = Math.floor(backoffMs / 1000);
|
||||
inDb
|
||||
.prepare(
|
||||
`UPDATE messages_in SET tries = tries + 1, process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`,
|
||||
)
|
||||
.run(msg.id);
|
||||
log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Insert next occurrence for completed recurring messages. */
|
||||
async function handleRecurrence(inDb: Database.Database, session: Session): Promise<void> {
|
||||
const completedRecurring = inDb
|
||||
.prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL")
|
||||
.all() as Array<{
|
||||
id: string;
|
||||
kind: string;
|
||||
content: string;
|
||||
recurrence: string;
|
||||
process_after: string | null;
|
||||
platform_id: string | null;
|
||||
channel_type: string | null;
|
||||
thread_id: string | null;
|
||||
}>;
|
||||
|
||||
for (const msg of completedRecurring) {
|
||||
try {
|
||||
const { CronExpressionParser } = await import('cron-parser');
|
||||
const interval = CronExpressionParser.parse(msg.recurrence);
|
||||
const nextRun = interval.next().toISOString();
|
||||
const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
|
||||
// Host uses even seq numbers
|
||||
const maxSeq = (
|
||||
inDb.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }
|
||||
).m;
|
||||
const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2);
|
||||
|
||||
inDb
|
||||
.prepare(
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content)
|
||||
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`,
|
||||
)
|
||||
.run(newId, nextSeq, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content);
|
||||
|
||||
// Remove recurrence from the completed message so it doesn't spawn again
|
||||
inDb.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id);
|
||||
|
||||
log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun });
|
||||
} catch (err) {
|
||||
log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
/**
|
||||
* Session lifecycle management.
|
||||
* Creates session folders + DBs, writes messages, manages container status.
|
||||
*
|
||||
* Two-DB architecture: each session has inbound.db (host-owned) and outbound.db
|
||||
* (container-owned). This eliminates SQLite write contention across the
|
||||
* host-container mount boundary — each file has exactly one writer.
|
||||
*/
|
||||
import Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
@@ -9,7 +13,7 @@ import path from 'path';
|
||||
import { DATA_DIR } from './config.js';
|
||||
import { createSession, findSession, getSession, updateSession } from './db/sessions.js';
|
||||
import { log } from './log.js';
|
||||
import { SESSION_SCHEMA } from './db/schema.js';
|
||||
import { INBOUND_SCHEMA, OUTBOUND_SCHEMA } from './db/schema.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
/** Root directory for all session data. */
|
||||
@@ -22,9 +26,27 @@ export function sessionDir(agentGroupId: string, sessionId: string): string {
|
||||
return path.join(sessionsBaseDir(), agentGroupId, sessionId);
|
||||
}
|
||||
|
||||
/** Path to a session's SQLite DB. */
|
||||
/** Path to the host-owned inbound DB (messages_in + delivered). */
|
||||
export function inboundDbPath(agentGroupId: string, sessionId: string): string {
|
||||
return path.join(sessionDir(agentGroupId, sessionId), 'inbound.db');
|
||||
}
|
||||
|
||||
/** Path to the container-owned outbound DB (messages_out + processing_ack). */
|
||||
export function outboundDbPath(agentGroupId: string, sessionId: string): string {
|
||||
return path.join(sessionDir(agentGroupId, sessionId), 'outbound.db');
|
||||
}
|
||||
|
||||
/** Path to the container heartbeat file (touched instead of DB writes). */
|
||||
export function heartbeatPath(agentGroupId: string, sessionId: string): string {
|
||||
return path.join(sessionDir(agentGroupId, sessionId), '.heartbeat');
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use inboundDbPath / outboundDbPath instead.
|
||||
* Kept temporarily for test compatibility during migration.
|
||||
*/
|
||||
export function sessionDbPath(agentGroupId: string, sessionId: string): string {
|
||||
return path.join(sessionDir(agentGroupId, sessionId), 'session.db');
|
||||
return inboundDbPath(agentGroupId, sessionId);
|
||||
}
|
||||
|
||||
function generateId(): string {
|
||||
@@ -41,8 +63,6 @@ export function resolveSession(
|
||||
threadId: string | null,
|
||||
sessionMode: 'shared' | 'per-thread',
|
||||
): { session: Session; created: boolean } {
|
||||
// For shared mode, look for any active session with this messaging group (threadId ignored)
|
||||
// For per-thread mode, look for an active session with this specific thread
|
||||
const lookupThreadId = sessionMode === 'shared' ? null : threadId;
|
||||
const existing = findSession(messagingGroupId, lookupThreadId);
|
||||
|
||||
@@ -50,7 +70,6 @@ export function resolveSession(
|
||||
return { session: existing, created: false };
|
||||
}
|
||||
|
||||
// Create new session
|
||||
const id = generateId();
|
||||
const session: Session = {
|
||||
id,
|
||||
@@ -71,23 +90,32 @@ export function resolveSession(
|
||||
return { session, created: true };
|
||||
}
|
||||
|
||||
/** Create the session folder and initialize the session DB. */
|
||||
/** Create the session folder and initialize both DBs. */
|
||||
export function initSessionFolder(agentGroupId: string, sessionId: string): void {
|
||||
const dir = sessionDir(agentGroupId, sessionId);
|
||||
fs.mkdirSync(dir, { recursive: true });
|
||||
fs.mkdirSync(path.join(dir, 'outbox'), { recursive: true });
|
||||
|
||||
const dbPath = sessionDbPath(agentGroupId, sessionId);
|
||||
if (!fs.existsSync(dbPath)) {
|
||||
const db = new Database(dbPath);
|
||||
const inPath = inboundDbPath(agentGroupId, sessionId);
|
||||
if (!fs.existsSync(inPath)) {
|
||||
const db = new Database(inPath);
|
||||
db.pragma('journal_mode = DELETE');
|
||||
db.exec(SESSION_SCHEMA);
|
||||
db.exec(INBOUND_SCHEMA);
|
||||
db.close();
|
||||
log.debug('Session DB created', { dbPath });
|
||||
log.debug('Inbound DB created', { dbPath: inPath });
|
||||
}
|
||||
|
||||
const outPath = outboundDbPath(agentGroupId, sessionId);
|
||||
if (!fs.existsSync(outPath)) {
|
||||
const db = new Database(outPath);
|
||||
db.pragma('journal_mode = DELETE');
|
||||
db.exec(OUTBOUND_SCHEMA);
|
||||
db.close();
|
||||
log.debug('Outbound DB created', { dbPath: outPath });
|
||||
}
|
||||
}
|
||||
|
||||
/** Write a message to a session's messages_in table. */
|
||||
/** Write a message to a session's inbound DB (messages_in). Host-only. */
|
||||
export function writeSessionMessage(
|
||||
agentGroupId: string,
|
||||
sessionId: string,
|
||||
@@ -103,22 +131,19 @@ export function writeSessionMessage(
|
||||
recurrence?: string | null;
|
||||
},
|
||||
): void {
|
||||
const dbPath = sessionDbPath(agentGroupId, sessionId);
|
||||
const dbPath = inboundDbPath(agentGroupId, sessionId);
|
||||
const db = new Database(dbPath);
|
||||
db.pragma('journal_mode = DELETE');
|
||||
db.pragma('busy_timeout = 5000');
|
||||
|
||||
try {
|
||||
const nextSeq = (
|
||||
db
|
||||
.prepare(
|
||||
`SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM (
|
||||
SELECT seq FROM messages_in WHERE seq IS NOT NULL
|
||||
UNION ALL
|
||||
SELECT seq FROM messages_out WHERE seq IS NOT NULL
|
||||
)`,
|
||||
)
|
||||
.get() as { next: number }
|
||||
).next;
|
||||
// Host uses even seq numbers, container uses odd — prevents collisions
|
||||
// across the two-DB boundary without cross-DB coordination.
|
||||
const maxSeq = (
|
||||
db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }
|
||||
).m;
|
||||
const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); // next even
|
||||
|
||||
db.prepare(
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence)
|
||||
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`,
|
||||
@@ -138,18 +163,33 @@ export function writeSessionMessage(
|
||||
db.close();
|
||||
}
|
||||
|
||||
// Update last_active
|
||||
updateSession(sessionId, { last_active: new Date().toISOString() });
|
||||
}
|
||||
|
||||
/** Open a session DB for reading (e.g., polling messages_out). */
|
||||
export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database {
|
||||
const dbPath = sessionDbPath(agentGroupId, sessionId);
|
||||
/** Open the inbound DB for a session (host reads/writes). */
|
||||
export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database {
|
||||
const dbPath = inboundDbPath(agentGroupId, sessionId);
|
||||
const db = new Database(dbPath);
|
||||
db.pragma('journal_mode = DELETE');
|
||||
db.pragma('busy_timeout = 5000');
|
||||
return db;
|
||||
}
|
||||
|
||||
/** Open the outbound DB for a session (host reads only). */
|
||||
export function openOutboundDb(agentGroupId: string, sessionId: string): Database.Database {
|
||||
const dbPath = outboundDbPath(agentGroupId, sessionId);
|
||||
const db = new Database(dbPath, { readonly: true });
|
||||
db.pragma('busy_timeout = 5000');
|
||||
return db;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use openInboundDb / openOutboundDb instead.
|
||||
*/
|
||||
export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database {
|
||||
return openInboundDb(agentGroupId, sessionId);
|
||||
}
|
||||
|
||||
/** Mark a container as running for a session. */
|
||||
export function markContainerRunning(sessionId: string): void {
|
||||
updateSession(sessionId, { container_status: 'running', last_active: new Date().toISOString() });
|
||||
|
||||
Reference in New Issue
Block a user