From 85850874ab4ad854bafd8a733306da8925c76f0a Mon Sep 17 00:00:00 2001 From: gavrielc Date: Fri, 8 May 2026 15:24:37 +0300 Subject: [PATCH] test: add delivery retry, permission check, and poll-loop error recovery coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Delivery: - Retry exhaustion: adapter fails 3x → markDeliveryFailed - Retry recovery: transient failure then success clears counter - Permission check: unauthorized channel destination blocked Poll-loop (container): - Provider error: error written to outbound, loop continues - Stale session: isSessionInvalid → continuation cleared - /clear command: session wiped, confirmation written Co-Authored-By: Claude Opus 4.6 (1M context) --- .../agent-runner/src/integration.test.ts | 140 ++++++++++++++++++ src/delivery.test.ts | 120 ++++++++++++++- 2 files changed, 258 insertions(+), 2 deletions(-) diff --git a/container/agent-runner/src/integration.test.ts b/container/agent-runner/src/integration.test.ts index 4a2b806..7396cfe 100644 --- a/container/agent-runner/src/integration.test.ts +++ b/container/agent-runner/src/integration.test.ts @@ -3,6 +3,7 @@ import { describe, it, expect, beforeEach, afterEach } from 'bun:test'; import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js'; import { getUndeliveredMessages } from './db/messages-out.js'; import { getPendingMessages } from './db/messages-in.js'; +import { getContinuation, setContinuation } from './db/session-state.js'; import { MockProvider } from './providers/mock.js'; import { runPollLoop } from './poll-loop.js'; @@ -429,3 +430,142 @@ async function waitFor(condition: () => boolean, timeoutMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } + +describe('poll loop — provider error recovery', () => { + it('writes error to outbound and continues loop on provider throw', async () => { + insertMessage('m1', { sender: 'Alice', text: 'trigger error' }, { platformId: 'chan-1', channelType: 'discord' }); + + const provider = new ThrowingProvider('API rate limit exceeded'); + const controller = new AbortController(); + const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2000); + + await waitFor(() => getUndeliveredMessages().length > 0, 2000); + controller.abort(); + + const out = getUndeliveredMessages(); + expect(out).toHaveLength(1); + expect(JSON.parse(out[0].content).text).toContain('Error:'); + expect(JSON.parse(out[0].content).text).toContain('API rate limit exceeded'); + + // Input message should be marked completed despite the error + const pending = getPendingMessages(); + expect(pending).toHaveLength(0); + + await loopPromise.catch(() => {}); + }); +}); + +describe('poll loop — stale session recovery', () => { + it('clears continuation when provider reports session invalid', async () => { + // Pre-seed a continuation so the local variable in runPollLoop is set. + // Without this, the `if (continuation && isSessionInvalid)` check skips. + setContinuation('mock', 'pre-existing-session'); + + insertMessage('m1', { sender: 'Alice', text: 'stale session' }, { platformId: 'chan-1', channelType: 'discord' }); + + const provider = new InvalidSessionProvider(); + const controller = new AbortController(); + const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2000); + + await waitFor(() => getUndeliveredMessages().length > 0, 2000); + controller.abort(); + + // Error was written to outbound + const out = getUndeliveredMessages(); + expect(out).toHaveLength(1); + expect(JSON.parse(out[0].content).text).toContain('Error:'); + + // Continuation was cleared (isSessionInvalid returned true) + expect(getContinuation('mock')).toBeUndefined(); + + await loopPromise.catch(() => {}); + }); +}); + +describe('poll loop — /clear command', () => { + it('clears session, writes confirmation, skips query', async () => { + // Seed a continuation so we can verify it gets cleared + setContinuation('mock', 'existing-session-id'); + expect(getContinuation('mock')).toBe('existing-session-id'); + + // Insert a /clear command + getInboundDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content) + VALUES ('m-clear', 'chat', datetime('now'), 'pending', 'chan-1', 'discord', ?)`, + ) + .run(JSON.stringify({ text: '/clear' })); + + const provider = new MockProvider({}, () => 'should not run'); + const controller = new AbortController(); + const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000); + + await waitFor(() => getUndeliveredMessages().length > 0, 2000); + controller.abort(); + + const out = getUndeliveredMessages(); + expect(out).toHaveLength(1); + expect(JSON.parse(out[0].content).text).toBe('Session cleared.'); + + // Continuation was cleared + expect(getContinuation('mock')).toBeUndefined(); + + // Command message was completed + const pending = getPendingMessages(); + expect(pending).toHaveLength(0); + + await loopPromise.catch(() => {}); + }); +}); + +/** + * Provider that throws on every query, simulating API failures. + */ +class ThrowingProvider { + readonly supportsNativeSlashCommands = false; + private errorMessage: string; + + constructor(errorMessage: string) { + this.errorMessage = errorMessage; + } + + isSessionInvalid(): boolean { + return false; + } + + query(_input: { prompt: string; cwd: string }) { + const errorMessage = this.errorMessage; + return { + push() {}, + end() {}, + abort() {}, + events: (async function* () { + throw new Error(errorMessage); + })(), + }; + } +} + +/** + * Provider that throws with an error that triggers isSessionInvalid. + * First emits an init event (setting continuation), then throws. + */ +class InvalidSessionProvider { + readonly supportsNativeSlashCommands = false; + + isSessionInvalid(): boolean { + return true; + } + + query(_input: { prompt: string; cwd: string }) { + return { + push() {}, + end() {}, + abort() {}, + events: (async function* () { + yield { type: 'init' as const, continuation: 'doomed-session' }; + throw new Error('session not found'); + })(), + }; + } +} diff --git a/src/delivery.test.ts b/src/delivery.test.ts index a5e1efd..5d23536 100644 --- a/src/delivery.test.ts +++ b/src/delivery.test.ts @@ -26,8 +26,9 @@ vi.mock('./config.js', async () => { const TEST_DIR = '/tmp/nanoclaw-test-delivery'; -import { initTestDb, closeDb, runMigrations, createAgentGroup, createMessagingGroup } from './db/index.js'; -import { resolveSession, outboundDbPath } from './session-manager.js'; +import { initTestDb, closeDb, runMigrations, createAgentGroup, createMessagingGroup, createMessagingGroupAgent } from './db/index.js'; +import { getDeliveredIds } from './db/session-db.js'; +import { resolveSession, outboundDbPath, openInboundDb } from './session-manager.js'; import { deliverSessionMessages, setDeliveryAdapter } from './delivery.js'; function now(): string { @@ -146,3 +147,118 @@ describe('deliverSessionMessages — concurrent invocations', () => { expect(callCount).toBe(1); }); }); + +describe('deliverSessionMessages — retry and permanent failure', () => { + it('retries on adapter failure and marks failed after MAX_DELIVERY_ATTEMPTS (3)', async () => { + seedAgentAndChannel(); + const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); + insertOutbound('ag-1', session.id, 'out-flaky'); + + let callCount = 0; + setDeliveryAdapter({ + async deliver() { + callCount++; + throw new Error('network timeout'); + }, + }); + + // Attempt 1 + await deliverSessionMessages(session); + expect(callCount).toBe(1); + + // Attempt 2 + await deliverSessionMessages(session); + expect(callCount).toBe(2); + + // Attempt 3 — should mark as permanently failed + await deliverSessionMessages(session); + expect(callCount).toBe(3); + + // Attempt 4 — message is now in delivered (as failed), adapter not called + await deliverSessionMessages(session); + expect(callCount).toBe(3); + + // Verify the message is in the delivered table with 'failed' status + const inDb = openInboundDb('ag-1', session.id); + const delivered = getDeliveredIds(inDb); + inDb.close(); + expect(delivered.has('out-flaky')).toBe(true); + }); + + it('clears attempt counter on successful delivery', async () => { + seedAgentAndChannel(); + const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); + insertOutbound('ag-1', session.id, 'out-retry-ok'); + + let callCount = 0; + setDeliveryAdapter({ + async deliver() { + callCount++; + if (callCount === 1) throw new Error('transient'); + return 'plat-ok'; + }, + }); + + // Attempt 1 — fails + await deliverSessionMessages(session); + expect(callCount).toBe(1); + + // Attempt 2 — succeeds + await deliverSessionMessages(session); + expect(callCount).toBe(2); + + // Attempt 3 — not called, message already delivered + await deliverSessionMessages(session); + expect(callCount).toBe(2); + }); +}); + +describe('deliverSessionMessages — permission check', () => { + it('rejects delivery to an unauthorized channel destination', async () => { + seedAgentAndChannel(); + + // Create a second messaging group that the agent is NOT wired to + createMessagingGroup({ + id: 'mg-2', + channel_type: 'discord', + platform_id: 'discord:456', + name: 'Unauthorized Chat', + is_group: 0, + unknown_sender_policy: 'public', + created_at: now(), + }); + + // Session is on mg-1 (telegram) + const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); + + // Insert an outbound message targeting mg-2 (discord) — not the origin chat + const outDb = new Database(outboundDbPath('ag-1', session.id)); + outDb.prepare( + `INSERT INTO messages_out (id, timestamp, kind, platform_id, channel_type, content) + VALUES (?, datetime('now'), 'chat', 'discord:456', 'discord', ?)`, + ).run('out-unauth', JSON.stringify({ text: 'sneaky' })); + outDb.close(); + + const calls: string[] = []; + setDeliveryAdapter({ + async deliver(_ct, _pid, _tid, _kind, content) { + calls.push(content); + return 'plat-msg'; + }, + }); + + // Deliver 3 times to exhaust retries + await deliverSessionMessages(session); + await deliverSessionMessages(session); + await deliverSessionMessages(session); + + // Adapter never called — permission check throws before reaching it + expect(calls).toHaveLength(0); + + // Message is marked as permanently failed + const inDb = openInboundDb('ag-1', session.id); + const delivered = getDeliveredIds(inDb); + inDb.close(); + expect(delivered.has('out-unauth')).toBe(true); + }); +});