refactor(v2): extract session DB operations into src/db/session-db.ts

Move all raw SQL out of session-manager, delivery, and host-sweep into
a dedicated DB module. Make session schemas idempotent (IF NOT EXISTS)
so initSessionFolder always applies them. Revert the markdown
plain-text retry from 4c477ac.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-12 16:50:03 +03:00
parent 2376c88aaf
commit 669a8444ef
6 changed files with 376 additions and 270 deletions

View File

@@ -7,11 +7,22 @@
* - Uses heartbeat file mtime for stale container detection (not DB writes)
* - Never writes to outbound.db — preserves single-writer-per-file invariant
*/
import Database from 'better-sqlite3';
import type Database from 'better-sqlite3';
import fs from 'fs';
import { getActiveSessions, updateSession } from './db/sessions.js';
import { getAgentGroup } from './db/agent-groups.js';
import {
countDueMessages,
syncProcessingAcks,
getStuckProcessingIds,
getMessageForRetry,
markMessageFailed,
retryWithBackoff,
getCompletedRecurring,
insertRecurrence,
clearRecurrence,
} from './db/session-db.js';
import { log } from './log.js';
import { openInboundDb, openOutboundDb, inboundDbPath, outboundDbPath, heartbeatPath } from './session-manager.js';
import { wakeContainer, isContainerRunning } from './container-runner.js';
@@ -77,16 +88,10 @@ async function sweepSession(session: Session): Promise<void> {
}
// 2. Check for due pending messages → wake container
const dueMessages = inDb
.prepare(
`SELECT COUNT(*) as count FROM messages_in
WHERE status = 'pending'
AND (process_after IS NULL OR process_after <= datetime('now'))`,
)
.get() as { count: number };
const dueCount = countDueMessages(inDb);
if (dueMessages.count > 0 && !isContainerRunning(session.id)) {
log.info('Waking container for due messages', { sessionId: session.id, count: dueMessages.count });
if (dueCount > 0 && !isContainerRunning(session.id)) {
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
await wakeContainer(session);
}
@@ -103,26 +108,6 @@ async function sweepSession(session: Session): Promise<void> {
}
}
/**
* Sync completed/failed processing_ack entries → messages_in.status.
* Only syncs terminal states — 'processing' is handled by stale detection.
*/
function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
const completed = outDb
.prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')")
.all() as Array<{ message_id: string }>;
if (completed.length === 0) return;
// Batch-update messages_in status for completed/failed messages
const updateStmt = inDb.prepare("UPDATE messages_in SET status = 'completed' WHERE id = ? AND status != 'completed'");
inDb.transaction(() => {
for (const { message_id } of completed) {
updateStmt.run(message_id);
}
})();
}
/**
* Detect stale containers using heartbeat file mtime.
* If the heartbeat is older than STALE_THRESHOLD and processing_ack has
@@ -146,30 +131,20 @@ function detectStaleContainers(
if (heartbeatAge < STALE_THRESHOLD_MS) return; // Container is alive
// Heartbeat is stale — check for stuck processing entries
const processing = outDb.prepare("SELECT message_id FROM processing_ack WHERE status = 'processing'").all() as Array<{
message_id: string;
}>;
if (processing.length === 0) return;
for (const { message_id } of processing) {
const msg = inDb
.prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?')
.get(message_id, 'pending') as { id: string; tries: number } | undefined;
const processingIds = getStuckProcessingIds(outDb);
if (processingIds.length === 0) return;
for (const messageId of processingIds) {
const msg = getMessageForRetry(inDb, messageId, 'pending');
if (!msg) continue;
if (msg.tries >= MAX_TRIES) {
inDb.prepare("UPDATE messages_in SET status = 'failed' WHERE id = ?").run(msg.id);
markMessageFailed(inDb, msg.id);
log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id });
} else {
const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries);
const backoffSec = Math.floor(backoffMs / 1000);
inDb
.prepare(
`UPDATE messages_in SET tries = tries + 1, process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`,
)
.run(msg.id);
retryWithBackoff(inDb, msg.id, backoffSec);
log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs });
}
}
@@ -177,49 +152,17 @@ function detectStaleContainers(
/** Insert next occurrence for completed recurring messages. */
async function handleRecurrence(inDb: Database.Database, session: Session): Promise<void> {
const completedRecurring = inDb
.prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL")
.all() as Array<{
id: string;
kind: string;
content: string;
recurrence: string;
process_after: string | null;
platform_id: string | null;
channel_type: string | null;
thread_id: string | null;
}>;
const recurring = getCompletedRecurring(inDb);
for (const msg of completedRecurring) {
for (const msg of recurring) {
try {
const { CronExpressionParser } = await import('cron-parser');
const interval = CronExpressionParser.parse(msg.recurrence);
const nextRun = interval.next().toISOString();
const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
// Host uses even seq numbers
const maxSeq = (inDb.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m;
const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2);
inDb
.prepare(
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content)
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`,
)
.run(
newId,
nextSeq,
msg.kind,
nextRun,
msg.recurrence,
msg.platform_id,
msg.channel_type,
msg.thread_id,
msg.content,
);
// Remove recurrence from the completed message so it doesn't spawn again
inDb.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id);
insertRecurrence(inDb, msg, newId, nextRun);
clearRecurrence(inDb, msg.id);
log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun });
} catch (err) {