/** * SQL operations on per-session inbound/outbound DBs. * * These are NOT the central app DB — they're the cross-mount SQLite files * shared between host and container. Callers own the connection lifecycle * (open-write-close per op). See session-manager.ts header for invariants. */ import Database from 'better-sqlite3'; import { INBOUND_SCHEMA, OUTBOUND_SCHEMA } from './schema.js'; /** Apply the inbound or outbound schema to a DB file. Idempotent. */ export function ensureSchema(dbPath: string, schema: 'inbound' | 'outbound'): void { const db = new Database(dbPath); db.pragma('journal_mode = DELETE'); db.exec(schema === 'inbound' ? INBOUND_SCHEMA : OUTBOUND_SCHEMA); db.close(); } /** Open the inbound DB for a session (host reads/writes). */ export function openInboundDb(dbPath: string): Database.Database { const db = new Database(dbPath); db.pragma('journal_mode = DELETE'); db.pragma('busy_timeout = 5000'); return db; } /** Open the outbound DB for a session (host reads only). */ export function openOutboundDb(dbPath: string): Database.Database { const db = new Database(dbPath, { readonly: true }); db.pragma('busy_timeout = 5000'); return db; } /** Open the outbound DB for a session with write access. Only safe to call when no container is running. */ export function openOutboundDbRw(dbPath: string): Database.Database { const db = new Database(dbPath); db.pragma('journal_mode = DELETE'); db.pragma('busy_timeout = 5000'); return db; } export function upsertSessionRouting( db: Database.Database, routing: { channel_type: string | null; platform_id: string | null; thread_id: string | null }, ): void { db.prepare( `INSERT INTO session_routing (id, channel_type, platform_id, thread_id) VALUES (1, @channel_type, @platform_id, @thread_id) ON CONFLICT(id) DO UPDATE SET channel_type = excluded.channel_type, platform_id = excluded.platform_id, thread_id = excluded.thread_id`, ).run(routing); } export interface DestinationRow { name: string; display_name: string | null; type: 'channel' | 'agent'; channel_type: string | null; platform_id: string | null; agent_group_id: string | null; } export function replaceDestinations(db: Database.Database, entries: DestinationRow[]): void { const tx = db.transaction((rows: DestinationRow[]) => { db.prepare('DELETE FROM destinations').run(); const stmt = db.prepare( `INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id) VALUES (@name, @display_name, @type, @channel_type, @platform_id, @agent_group_id)`, ); for (const row of rows) stmt.run(row); }); tx(entries); } // --------------------------------------------------------------------------- // messages_in // --------------------------------------------------------------------------- /** * Next even seq number for host-owned inbound.db. * * Exported so the scheduling module's task helpers can maintain the * host-writes-even-seq invariant without duplicating the logic. Not part of * the general public API — imported by `src/modules/scheduling/db.ts` only. */ export function nextEvenSeq(db: Database.Database): number { const maxSeq = (db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; return maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); } export function insertMessage( db: Database.Database, message: { id: string; kind: string; timestamp: string; platformId: string | null; channelType: string | null; threadId: string | null; content: string; processAfter: string | null; recurrence: string | null; /** * 1 = wake the agent (default); 0 = accumulate as context only. * Host countDueMessages gates on this; container reads everything. */ trigger?: 0 | 1; }, ): void { db.prepare( `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger) VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger)`, ).run({ ...message, trigger: message.trigger ?? 1, seq: nextEvenSeq(db), }); } export function countDueMessages(db: Database.Database): number { return ( db .prepare( `SELECT COUNT(*) as count FROM messages_in WHERE status = 'pending' AND trigger = 1 AND (process_after IS NULL OR datetime(process_after) <= datetime('now'))`, ) .get() as { count: number } ).count; } export function markMessageFailed(db: Database.Database, messageId: string): void { db.prepare("UPDATE messages_in SET status = 'failed' WHERE id = ?").run(messageId); } export function retryWithBackoff(db: Database.Database, messageId: string, backoffSec: number): void { db.prepare( `UPDATE messages_in SET tries = tries + 1, process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`, ).run(messageId); } export function getMessageForRetry( db: Database.Database, messageId: string, status: string, ): { id: string; tries: number; processAfter: string | null } | undefined { return db .prepare('SELECT id, tries, process_after as processAfter FROM messages_in WHERE id = ? AND status = ?') .get(messageId, status) as { id: string; tries: number; processAfter: string | null } | undefined; } export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void { const completed = outDb .prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')") .all() as Array<{ message_id: string }>; if (completed.length === 0) return; const updateStmt = inDb.prepare("UPDATE messages_in SET status = 'completed' WHERE id = ? AND status != 'completed'"); inDb.transaction(() => { for (const { message_id } of completed) { updateStmt.run(message_id); } })(); } export function getStuckProcessingIds(outDb: Database.Database): string[] { return ( outDb.prepare("SELECT message_id FROM processing_ack WHERE status = 'processing'").all() as Array<{ message_id: string; }> ).map((r) => r.message_id); } export interface ProcessingClaim { message_id: string; status_changed: string; } /** Return processing_ack rows still in 'processing' with their claim timestamps. */ export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[] { return outDb .prepare("SELECT message_id, status_changed FROM processing_ack WHERE status = 'processing'") .all() as ProcessingClaim[]; } /** * Delete orphan 'processing' rows. Called by the host after killing a * container so the leftover claim doesn't trip claim-stuck on the next sweep * tick (which would kill the freshly respawned container before its * agent-runner can run its own startup cleanup). * * Safe because the host only writes to outbound.db when no container is * running (we just killed it). Returns the number of rows deleted. */ export function deleteOrphanProcessingClaims(outDb: Database.Database): number { return outDb.prepare("DELETE FROM processing_ack WHERE status = 'processing'").run().changes; } export interface ContainerState { current_tool: string | null; tool_declared_timeout_ms: number | null; tool_started_at: string | null; } /** * Read the container's current tool-in-flight state, if any. Returns null * when either the table doesn't exist yet (older session DB) or no tool is * active. Host sweep reads this to widen stuck-detection tolerance while * Bash is running with a long declared timeout. */ export function getContainerState(outDb: Database.Database): ContainerState | null { try { const row = outDb .prepare( `SELECT current_tool, tool_declared_timeout_ms, tool_started_at FROM container_state WHERE id = 1`, ) .get() as ContainerState | undefined; return row ?? null; } catch { // Table not present on older session DBs — treat as "no tool in flight". return null; } } // --------------------------------------------------------------------------- // messages_out (read-only from host) // --------------------------------------------------------------------------- export interface OutboundMessage { id: string; kind: string; platform_id: string | null; channel_type: string | null; thread_id: string | null; content: string; } export function getDueOutboundMessages(db: Database.Database): OutboundMessage[] { return db .prepare( `SELECT * FROM messages_out WHERE (deliver_after IS NULL OR deliver_after <= datetime('now')) ORDER BY timestamp ASC`, ) .all() as OutboundMessage[]; } // --------------------------------------------------------------------------- // delivered // --------------------------------------------------------------------------- export function getDeliveredIds(db: Database.Database): Set { return new Set( (db.prepare('SELECT message_out_id FROM delivered').all() as Array<{ message_out_id: string }>).map( (r) => r.message_out_id, ), ); } export function markDelivered(db: Database.Database, messageOutId: string, platformMessageId: string | null): void { db.prepare( "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, ?, 'delivered', datetime('now'))", ).run(messageOutId, platformMessageId ?? null); } export function markDeliveryFailed(db: Database.Database, messageOutId: string): void { db.prepare( "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, NULL, 'failed', datetime('now'))", ).run(messageOutId); } /** Ensure the delivered table has columns added after initial schema. */ export function migrateDeliveredTable(db: Database.Database): void { const cols = new Set( (db.prepare("PRAGMA table_info('delivered')").all() as Array<{ name: string }>).map((c) => c.name), ); if (!cols.has('platform_message_id')) { db.prepare('ALTER TABLE delivered ADD COLUMN platform_message_id TEXT').run(); } if (!cols.has('status')) { db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run(); } } // Adds columns added to messages_in after the initial v2 schema to // pre-existing session DBs. No-op on fresh installs where the columns are // in the baseline schema. Backfills existing rows so invariants hold. export function migrateMessagesInTable(db: Database.Database): void { const cols = new Set( (db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name), ); if (!cols.has('series_id')) { db.prepare('ALTER TABLE messages_in ADD COLUMN series_id TEXT').run(); db.prepare('UPDATE messages_in SET series_id = id WHERE series_id IS NULL').run(); db.prepare('CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id)').run(); } if (!cols.has('trigger')) { // All pre-existing rows got written with the old "every inbound wakes // the agent" semantics, so backfill 1 and default 1 for new inserts. db.prepare('ALTER TABLE messages_in ADD COLUMN trigger INTEGER NOT NULL DEFAULT 1').run(); } }