Merge pull request #1941 from qwibitai/fix/container-restart-recovery
fix: container restart recovery — stale heartbeat + orphan claim loop
This commit is contained in:
@@ -36,7 +36,13 @@ import {
|
|||||||
type ProviderContainerContribution,
|
type ProviderContainerContribution,
|
||||||
type VolumeMount,
|
type VolumeMount,
|
||||||
} from './providers/provider-container-registry.js';
|
} from './providers/provider-container-registry.js';
|
||||||
import { markContainerRunning, markContainerStopped, sessionDir, writeSessionRouting } from './session-manager.js';
|
import {
|
||||||
|
heartbeatPath,
|
||||||
|
markContainerRunning,
|
||||||
|
markContainerStopped,
|
||||||
|
sessionDir,
|
||||||
|
writeSessionRouting,
|
||||||
|
} from './session-manager.js';
|
||||||
import type { AgentGroup, Session } from './types.js';
|
import type { AgentGroup, Session } from './types.js';
|
||||||
|
|
||||||
const onecli = new OneCLI({ url: ONECLI_URL, apiKey: ONECLI_API_KEY });
|
const onecli = new OneCLI({ url: ONECLI_URL, apiKey: ONECLI_API_KEY });
|
||||||
@@ -131,6 +137,12 @@ async function spawnContainer(session: Session): Promise<void> {
|
|||||||
|
|
||||||
log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName });
|
log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName });
|
||||||
|
|
||||||
|
// Clear any orphan heartbeat from a previous container instance — the
|
||||||
|
// sweep's ceiling check treats a missing file as "fresh spawn, give grace"
|
||||||
|
// (host-sweep.ts line 87). Without this, the stale mtime can trigger an
|
||||||
|
// immediate kill before the new container touches the file itself.
|
||||||
|
fs.rmSync(heartbeatPath(agentGroup.id, session.id), { force: true });
|
||||||
|
|
||||||
const container = spawn(CONTAINER_RUNTIME_BIN, args, { stdio: ['ignore', 'pipe', 'pipe'] });
|
const container = spawn(CONTAINER_RUNTIME_BIN, args, { stdio: ['ignore', 'pipe', 'pipe'] });
|
||||||
|
|
||||||
activeContainers.set(session.id, { process: container, containerName });
|
activeContainers.set(session.id, { process: container, containerName });
|
||||||
|
|||||||
@@ -139,10 +139,10 @@ export function getMessageForRetry(
|
|||||||
db: Database.Database,
|
db: Database.Database,
|
||||||
messageId: string,
|
messageId: string,
|
||||||
status: string,
|
status: string,
|
||||||
): { id: string; tries: number } | undefined {
|
): { id: string; tries: number; processAfter: string | null } | undefined {
|
||||||
return db.prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?').get(messageId, status) as
|
return db
|
||||||
| { id: string; tries: number }
|
.prepare('SELECT id, tries, process_after as processAfter FROM messages_in WHERE id = ? AND status = ?')
|
||||||
| undefined;
|
.get(messageId, status) as { id: string; tries: number; processAfter: string | null } | undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
|
export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
|
||||||
|
|||||||
@@ -159,23 +159,31 @@ async function sweepSession(session: Session): Promise<void> {
|
|||||||
syncProcessingAcks(inDb, outDb);
|
syncProcessingAcks(inDb, outDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
const alive = isContainerRunning(session.id);
|
// 2. Wake a container if work is due and nothing is running. Ordered
|
||||||
|
// before the crashed-container cleanup so a fresh container gets a chance
|
||||||
// 2. Crashed-container cleanup: processing rows left behind get retried.
|
// to clean its own orphan processing_ack rows on startup (see
|
||||||
if (!alive && outDb) {
|
// container/agent-runner/src/db/connection.ts). Otherwise the reset path
|
||||||
resetStuckProcessingRows(inDb, outDb, session, 'container not running');
|
// would keep bumping process_after into the future, dueCount would stay 0,
|
||||||
|
// and the wake would never fire.
|
||||||
|
const dueCount = countDueMessages(inDb);
|
||||||
|
if (dueCount > 0 && !isContainerRunning(session.id)) {
|
||||||
|
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
|
||||||
|
await wakeContainer(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const alive = isContainerRunning(session.id);
|
||||||
|
|
||||||
// 3. Running-container SLA: absolute ceiling + per-claim stuck rules.
|
// 3. Running-container SLA: absolute ceiling + per-claim stuck rules.
|
||||||
if (alive && outDb) {
|
if (alive && outDb) {
|
||||||
enforceRunningContainerSla(inDb, outDb, session, agentGroup.id);
|
enforceRunningContainerSla(inDb, outDb, session, agentGroup.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Wake a container if new work is due and nothing is running.
|
// 4. Crashed-container cleanup: processing rows left behind get retried.
|
||||||
const dueCount = countDueMessages(inDb);
|
// Only fires when wake in step 2 didn't pick up the work (no due messages,
|
||||||
if (dueCount > 0 && !isContainerRunning(session.id)) {
|
// or wake failed). resetStuckProcessingRows itself is idempotent — it
|
||||||
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
|
// skips messages already scheduled for a future retry.
|
||||||
await wakeContainer(session);
|
if (!alive && outDb) {
|
||||||
|
resetStuckProcessingRows(inDb, outDb, session, 'container not running');
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Recurrence fanout for completed recurring tasks.
|
// 5. Recurrence fanout for completed recurring tasks.
|
||||||
@@ -246,10 +254,16 @@ function resetStuckProcessingRows(
|
|||||||
reason: string,
|
reason: string,
|
||||||
): void {
|
): void {
|
||||||
const claims = getProcessingClaims(outDb);
|
const claims = getProcessingClaims(outDb);
|
||||||
|
const now = Date.now();
|
||||||
for (const { message_id } of claims) {
|
for (const { message_id } of claims) {
|
||||||
const msg = getMessageForRetry(inDb, message_id, 'pending');
|
const msg = getMessageForRetry(inDb, message_id, 'pending');
|
||||||
if (!msg) continue;
|
if (!msg) continue;
|
||||||
|
|
||||||
|
// Already rescheduled for a future retry — don't bump tries again. The
|
||||||
|
// wake path (sweep step 2) will fire when process_after elapses and a
|
||||||
|
// fresh container will clean the orphan claim on startup.
|
||||||
|
if (msg.processAfter && Date.parse(msg.processAfter) > now) continue;
|
||||||
|
|
||||||
if (msg.tries >= MAX_TRIES) {
|
if (msg.tries >= MAX_TRIES) {
|
||||||
markMessageFailed(inDb, msg.id);
|
markMessageFailed(inDb, msg.id);
|
||||||
log.warn('Message marked as failed after max retries', {
|
log.warn('Message marked as failed after max retries', {
|
||||||
|
|||||||
Reference in New Issue
Block a user