Merge pull request #2183 from cfis/fix/host-sweep-outbound-db-rw
fix(host-sweep): reopen outbound DB as writable for orphan claim cleanup
This commit is contained in:
@@ -32,6 +32,14 @@ export function openOutboundDb(dbPath: string): Database.Database {
|
|||||||
return db;
|
return db;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Open the outbound DB for a session with write access. Only safe to call when no container is running. */
|
||||||
|
export function openOutboundDbRw(dbPath: string): Database.Database {
|
||||||
|
const db = new Database(dbPath);
|
||||||
|
db.pragma('journal_mode = DELETE');
|
||||||
|
db.pragma('busy_timeout = 5000');
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
|
||||||
export function upsertSessionRouting(
|
export function upsertSessionRouting(
|
||||||
db: Database.Database,
|
db: Database.Database,
|
||||||
routing: { channel_type: string | null; platform_id: string | null; thread_id: string | null },
|
routing: { channel_type: string | null; platform_id: string | null; thread_id: string | null },
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ import {
|
|||||||
type ContainerState,
|
type ContainerState,
|
||||||
} from './db/session-db.js';
|
} from './db/session-db.js';
|
||||||
import { log } from './log.js';
|
import { log } from './log.js';
|
||||||
import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js';
|
import { openInboundDb, openOutboundDb, openOutboundDbRw, inboundDbPath, heartbeatPath } from './session-manager.js';
|
||||||
import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js';
|
import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js';
|
||||||
import type { Session } from './types.js';
|
import type { Session } from './types.js';
|
||||||
|
|
||||||
@@ -302,8 +302,17 @@ function resetStuckProcessingRows(
|
|||||||
// agent-runner has a chance to run clearStaleProcessingAcks() on startup.
|
// agent-runner has a chance to run clearStaleProcessingAcks() on startup.
|
||||||
// We're safe to write outbound.db here because we just killed the container
|
// We're safe to write outbound.db here because we just killed the container
|
||||||
// that owned it (or it crashed and left no writer behind).
|
// that owned it (or it crashed and left no writer behind).
|
||||||
const cleared = deleteOrphanProcessingClaims(outDb);
|
// outDb was opened readonly for reads above; reopen with write access for this delete.
|
||||||
if (cleared > 0) {
|
let outDbRw: Database.Database | null = null;
|
||||||
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
|
try {
|
||||||
|
outDbRw = openOutboundDbRw(session.agent_group_id, session.id);
|
||||||
|
const cleared = deleteOrphanProcessingClaims(outDbRw);
|
||||||
|
if (cleared > 0) {
|
||||||
|
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.warn('Failed to clear orphan processing claims', { sessionId: session.id, err });
|
||||||
|
} finally {
|
||||||
|
outDbRw?.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import {
|
|||||||
ensureSchema,
|
ensureSchema,
|
||||||
openInboundDb as openInboundDbRaw,
|
openInboundDb as openInboundDbRaw,
|
||||||
openOutboundDb as openOutboundDbRaw,
|
openOutboundDb as openOutboundDbRaw,
|
||||||
|
openOutboundDbRw as openOutboundDbRwRaw,
|
||||||
upsertSessionRouting,
|
upsertSessionRouting,
|
||||||
insertMessage,
|
insertMessage,
|
||||||
migrateMessagesInTable,
|
migrateMessagesInTable,
|
||||||
@@ -355,6 +356,11 @@ export function openOutboundDb(agentGroupId: string, sessionId: string): Databas
|
|||||||
return openOutboundDbRaw(outboundDbPath(agentGroupId, sessionId));
|
return openOutboundDbRaw(outboundDbPath(agentGroupId, sessionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Open the outbound DB for a session with write access. Only safe to call when no container is running. */
|
||||||
|
export function openOutboundDbRw(agentGroupId: string, sessionId: string): Database.Database {
|
||||||
|
return openOutboundDbRwRaw(outboundDbPath(agentGroupId, sessionId));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write a message directly to a session's outbound DB so the host delivery
|
* Write a message directly to a session's outbound DB so the host delivery
|
||||||
* loop picks it up. Used by the command gate to send denial responses
|
* loop picks it up. Used by the command gate to send denial responses
|
||||||
|
|||||||
Reference in New Issue
Block a user