diff --git a/src/db/session-db.ts b/src/db/session-db.ts index aea255d..48e9297 100644 --- a/src/db/session-db.ts +++ b/src/db/session-db.ts @@ -139,10 +139,10 @@ export function getMessageForRetry( db: Database.Database, messageId: string, status: string, -): { id: string; tries: number } | undefined { - return db.prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?').get(messageId, status) as - | { id: string; tries: number } - | undefined; +): { id: string; tries: number; processAfter: string | null } | undefined { + return db + .prepare('SELECT id, tries, process_after as processAfter FROM messages_in WHERE id = ? AND status = ?') + .get(messageId, status) as { id: string; tries: number; processAfter: string | null } | undefined; } export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void { diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 1a2901c..4dc2fb7 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -159,23 +159,31 @@ async function sweepSession(session: Session): Promise { syncProcessingAcks(inDb, outDb); } - const alive = isContainerRunning(session.id); - - // 2. Crashed-container cleanup: processing rows left behind get retried. - if (!alive && outDb) { - resetStuckProcessingRows(inDb, outDb, session, 'container not running'); + // 2. Wake a container if work is due and nothing is running. Ordered + // before the crashed-container cleanup so a fresh container gets a chance + // to clean its own orphan processing_ack rows on startup (see + // container/agent-runner/src/db/connection.ts). Otherwise the reset path + // would keep bumping process_after into the future, dueCount would stay 0, + // and the wake would never fire. + const dueCount = countDueMessages(inDb); + if (dueCount > 0 && !isContainerRunning(session.id)) { + log.info('Waking container for due messages', { sessionId: session.id, count: dueCount }); + await wakeContainer(session); } + const alive = isContainerRunning(session.id); + // 3. Running-container SLA: absolute ceiling + per-claim stuck rules. if (alive && outDb) { enforceRunningContainerSla(inDb, outDb, session, agentGroup.id); } - // 4. Wake a container if new work is due and nothing is running. - const dueCount = countDueMessages(inDb); - if (dueCount > 0 && !isContainerRunning(session.id)) { - log.info('Waking container for due messages', { sessionId: session.id, count: dueCount }); - await wakeContainer(session); + // 4. Crashed-container cleanup: processing rows left behind get retried. + // Only fires when wake in step 2 didn't pick up the work (no due messages, + // or wake failed). resetStuckProcessingRows itself is idempotent — it + // skips messages already scheduled for a future retry. + if (!alive && outDb) { + resetStuckProcessingRows(inDb, outDb, session, 'container not running'); } // 5. Recurrence fanout for completed recurring tasks. @@ -246,10 +254,16 @@ function resetStuckProcessingRows( reason: string, ): void { const claims = getProcessingClaims(outDb); + const now = Date.now(); for (const { message_id } of claims) { const msg = getMessageForRetry(inDb, message_id, 'pending'); if (!msg) continue; + // Already rescheduled for a future retry — don't bump tries again. The + // wake path (sweep step 2) will fire when process_after elapses and a + // fresh container will clean the orphan claim on startup. + if (msg.processAfter && Date.parse(msg.processAfter) > now) continue; + if (msg.tries >= MAX_TRIES) { markMessageFailed(inDb, msg.id); log.warn('Message marked as failed after max retries', {