fix(sweep): wake before reset + idempotent retry for orphan claims
When a container exits with an unresolved processing_ack claim, the sweep's crashed-container cleanup would reset the matching inbound message with tries++ and a future process_after. dueCount then dropped to 0, so the wake step never fired — and the next sweep tick found the same orphan claim, bumped tries again, and pushed process_after further out. The message reached MAX_TRIES and was marked failed without any container ever being spawned. Two changes: 1. Reorder sweep so the wake step runs before crashed-container cleanup. A fresh container clears orphan 'processing' rows on its own startup (container/agent-runner/src/db/connection.ts), so once we get it running the claim resolves itself. 2. Make resetStuckProcessingRows idempotent: if a message already has process_after set to a future time, skip the retry bump. The wake path will pick it up when the backoff elapses. Requires returning process_after from getMessageForRetry. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -139,10 +139,10 @@ export function getMessageForRetry(
|
|||||||
db: Database.Database,
|
db: Database.Database,
|
||||||
messageId: string,
|
messageId: string,
|
||||||
status: string,
|
status: string,
|
||||||
): { id: string; tries: number } | undefined {
|
): { id: string; tries: number; processAfter: string | null } | undefined {
|
||||||
return db.prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?').get(messageId, status) as
|
return db
|
||||||
| { id: string; tries: number }
|
.prepare('SELECT id, tries, process_after as processAfter FROM messages_in WHERE id = ? AND status = ?')
|
||||||
| undefined;
|
.get(messageId, status) as { id: string; tries: number; processAfter: string | null } | undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
|
export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
|
||||||
|
|||||||
@@ -159,23 +159,31 @@ async function sweepSession(session: Session): Promise<void> {
|
|||||||
syncProcessingAcks(inDb, outDb);
|
syncProcessingAcks(inDb, outDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
const alive = isContainerRunning(session.id);
|
// 2. Wake a container if work is due and nothing is running. Ordered
|
||||||
|
// before the crashed-container cleanup so a fresh container gets a chance
|
||||||
// 2. Crashed-container cleanup: processing rows left behind get retried.
|
// to clean its own orphan processing_ack rows on startup (see
|
||||||
if (!alive && outDb) {
|
// container/agent-runner/src/db/connection.ts). Otherwise the reset path
|
||||||
resetStuckProcessingRows(inDb, outDb, session, 'container not running');
|
// would keep bumping process_after into the future, dueCount would stay 0,
|
||||||
|
// and the wake would never fire.
|
||||||
|
const dueCount = countDueMessages(inDb);
|
||||||
|
if (dueCount > 0 && !isContainerRunning(session.id)) {
|
||||||
|
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
|
||||||
|
await wakeContainer(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const alive = isContainerRunning(session.id);
|
||||||
|
|
||||||
// 3. Running-container SLA: absolute ceiling + per-claim stuck rules.
|
// 3. Running-container SLA: absolute ceiling + per-claim stuck rules.
|
||||||
if (alive && outDb) {
|
if (alive && outDb) {
|
||||||
enforceRunningContainerSla(inDb, outDb, session, agentGroup.id);
|
enforceRunningContainerSla(inDb, outDb, session, agentGroup.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Wake a container if new work is due and nothing is running.
|
// 4. Crashed-container cleanup: processing rows left behind get retried.
|
||||||
const dueCount = countDueMessages(inDb);
|
// Only fires when wake in step 2 didn't pick up the work (no due messages,
|
||||||
if (dueCount > 0 && !isContainerRunning(session.id)) {
|
// or wake failed). resetStuckProcessingRows itself is idempotent — it
|
||||||
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
|
// skips messages already scheduled for a future retry.
|
||||||
await wakeContainer(session);
|
if (!alive && outDb) {
|
||||||
|
resetStuckProcessingRows(inDb, outDb, session, 'container not running');
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Recurrence fanout for completed recurring tasks.
|
// 5. Recurrence fanout for completed recurring tasks.
|
||||||
@@ -246,10 +254,16 @@ function resetStuckProcessingRows(
|
|||||||
reason: string,
|
reason: string,
|
||||||
): void {
|
): void {
|
||||||
const claims = getProcessingClaims(outDb);
|
const claims = getProcessingClaims(outDb);
|
||||||
|
const now = Date.now();
|
||||||
for (const { message_id } of claims) {
|
for (const { message_id } of claims) {
|
||||||
const msg = getMessageForRetry(inDb, message_id, 'pending');
|
const msg = getMessageForRetry(inDb, message_id, 'pending');
|
||||||
if (!msg) continue;
|
if (!msg) continue;
|
||||||
|
|
||||||
|
// Already rescheduled for a future retry — don't bump tries again. The
|
||||||
|
// wake path (sweep step 2) will fire when process_after elapses and a
|
||||||
|
// fresh container will clean the orphan claim on startup.
|
||||||
|
if (msg.processAfter && Date.parse(msg.processAfter) > now) continue;
|
||||||
|
|
||||||
if (msg.tries >= MAX_TRIES) {
|
if (msg.tries >= MAX_TRIES) {
|
||||||
markMessageFailed(inDb, msg.id);
|
markMessageFailed(inDb, msg.id);
|
||||||
log.warn('Message marked as failed after max retries', {
|
log.warn('Message marked as failed after max retries', {
|
||||||
|
|||||||
Reference in New Issue
Block a user