test: add delivery retry, permission check, and poll-loop error recovery coverage
Delivery: - Retry exhaustion: adapter fails 3x → markDeliveryFailed - Retry recovery: transient failure then success clears counter - Permission check: unauthorized channel destination blocked Poll-loop (container): - Provider error: error written to outbound, loop continues - Stale session: isSessionInvalid → continuation cleared - /clear command: session wiped, confirmation written Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
|
|||||||
import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js';
|
import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js';
|
||||||
import { getUndeliveredMessages } from './db/messages-out.js';
|
import { getUndeliveredMessages } from './db/messages-out.js';
|
||||||
import { getPendingMessages } from './db/messages-in.js';
|
import { getPendingMessages } from './db/messages-in.js';
|
||||||
|
import { getContinuation, setContinuation } from './db/session-state.js';
|
||||||
import { MockProvider } from './providers/mock.js';
|
import { MockProvider } from './providers/mock.js';
|
||||||
import { runPollLoop } from './poll-loop.js';
|
import { runPollLoop } from './poll-loop.js';
|
||||||
|
|
||||||
@@ -429,3 +430,142 @@ async function waitFor(condition: () => boolean, timeoutMs: number): Promise<voi
|
|||||||
function sleep(ms: number): Promise<void> {
|
function sleep(ms: number): Promise<void> {
|
||||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
describe('poll loop — provider error recovery', () => {
|
||||||
|
it('writes error to outbound and continues loop on provider throw', async () => {
|
||||||
|
insertMessage('m1', { sender: 'Alice', text: 'trigger error' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||||
|
|
||||||
|
const provider = new ThrowingProvider('API rate limit exceeded');
|
||||||
|
const controller = new AbortController();
|
||||||
|
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2000);
|
||||||
|
|
||||||
|
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||||
|
controller.abort();
|
||||||
|
|
||||||
|
const out = getUndeliveredMessages();
|
||||||
|
expect(out).toHaveLength(1);
|
||||||
|
expect(JSON.parse(out[0].content).text).toContain('Error:');
|
||||||
|
expect(JSON.parse(out[0].content).text).toContain('API rate limit exceeded');
|
||||||
|
|
||||||
|
// Input message should be marked completed despite the error
|
||||||
|
const pending = getPendingMessages();
|
||||||
|
expect(pending).toHaveLength(0);
|
||||||
|
|
||||||
|
await loopPromise.catch(() => {});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('poll loop — stale session recovery', () => {
|
||||||
|
it('clears continuation when provider reports session invalid', async () => {
|
||||||
|
// Pre-seed a continuation so the local variable in runPollLoop is set.
|
||||||
|
// Without this, the `if (continuation && isSessionInvalid)` check skips.
|
||||||
|
setContinuation('mock', 'pre-existing-session');
|
||||||
|
|
||||||
|
insertMessage('m1', { sender: 'Alice', text: 'stale session' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||||
|
|
||||||
|
const provider = new InvalidSessionProvider();
|
||||||
|
const controller = new AbortController();
|
||||||
|
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2000);
|
||||||
|
|
||||||
|
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||||
|
controller.abort();
|
||||||
|
|
||||||
|
// Error was written to outbound
|
||||||
|
const out = getUndeliveredMessages();
|
||||||
|
expect(out).toHaveLength(1);
|
||||||
|
expect(JSON.parse(out[0].content).text).toContain('Error:');
|
||||||
|
|
||||||
|
// Continuation was cleared (isSessionInvalid returned true)
|
||||||
|
expect(getContinuation('mock')).toBeUndefined();
|
||||||
|
|
||||||
|
await loopPromise.catch(() => {});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('poll loop — /clear command', () => {
|
||||||
|
it('clears session, writes confirmation, skips query', async () => {
|
||||||
|
// Seed a continuation so we can verify it gets cleared
|
||||||
|
setContinuation('mock', 'existing-session-id');
|
||||||
|
expect(getContinuation('mock')).toBe('existing-session-id');
|
||||||
|
|
||||||
|
// Insert a /clear command
|
||||||
|
getInboundDb()
|
||||||
|
.prepare(
|
||||||
|
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content)
|
||||||
|
VALUES ('m-clear', 'chat', datetime('now'), 'pending', 'chan-1', 'discord', ?)`,
|
||||||
|
)
|
||||||
|
.run(JSON.stringify({ text: '/clear' }));
|
||||||
|
|
||||||
|
const provider = new MockProvider({}, () => '<message to="discord-test">should not run</message>');
|
||||||
|
const controller = new AbortController();
|
||||||
|
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||||
|
|
||||||
|
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||||
|
controller.abort();
|
||||||
|
|
||||||
|
const out = getUndeliveredMessages();
|
||||||
|
expect(out).toHaveLength(1);
|
||||||
|
expect(JSON.parse(out[0].content).text).toBe('Session cleared.');
|
||||||
|
|
||||||
|
// Continuation was cleared
|
||||||
|
expect(getContinuation('mock')).toBeUndefined();
|
||||||
|
|
||||||
|
// Command message was completed
|
||||||
|
const pending = getPendingMessages();
|
||||||
|
expect(pending).toHaveLength(0);
|
||||||
|
|
||||||
|
await loopPromise.catch(() => {});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provider that throws on every query, simulating API failures.
|
||||||
|
*/
|
||||||
|
class ThrowingProvider {
|
||||||
|
readonly supportsNativeSlashCommands = false;
|
||||||
|
private errorMessage: string;
|
||||||
|
|
||||||
|
constructor(errorMessage: string) {
|
||||||
|
this.errorMessage = errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
isSessionInvalid(): boolean {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
query(_input: { prompt: string; cwd: string }) {
|
||||||
|
const errorMessage = this.errorMessage;
|
||||||
|
return {
|
||||||
|
push() {},
|
||||||
|
end() {},
|
||||||
|
abort() {},
|
||||||
|
events: (async function* () {
|
||||||
|
throw new Error(errorMessage);
|
||||||
|
})(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provider that throws with an error that triggers isSessionInvalid.
|
||||||
|
* First emits an init event (setting continuation), then throws.
|
||||||
|
*/
|
||||||
|
class InvalidSessionProvider {
|
||||||
|
readonly supportsNativeSlashCommands = false;
|
||||||
|
|
||||||
|
isSessionInvalid(): boolean {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
query(_input: { prompt: string; cwd: string }) {
|
||||||
|
return {
|
||||||
|
push() {},
|
||||||
|
end() {},
|
||||||
|
abort() {},
|
||||||
|
events: (async function* () {
|
||||||
|
yield { type: 'init' as const, continuation: 'doomed-session' };
|
||||||
|
throw new Error('session not found');
|
||||||
|
})(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -26,8 +26,9 @@ vi.mock('./config.js', async () => {
|
|||||||
|
|
||||||
const TEST_DIR = '/tmp/nanoclaw-test-delivery';
|
const TEST_DIR = '/tmp/nanoclaw-test-delivery';
|
||||||
|
|
||||||
import { initTestDb, closeDb, runMigrations, createAgentGroup, createMessagingGroup } from './db/index.js';
|
import { initTestDb, closeDb, runMigrations, createAgentGroup, createMessagingGroup, createMessagingGroupAgent } from './db/index.js';
|
||||||
import { resolveSession, outboundDbPath } from './session-manager.js';
|
import { getDeliveredIds } from './db/session-db.js';
|
||||||
|
import { resolveSession, outboundDbPath, openInboundDb } from './session-manager.js';
|
||||||
import { deliverSessionMessages, setDeliveryAdapter } from './delivery.js';
|
import { deliverSessionMessages, setDeliveryAdapter } from './delivery.js';
|
||||||
|
|
||||||
function now(): string {
|
function now(): string {
|
||||||
@@ -146,3 +147,118 @@ describe('deliverSessionMessages — concurrent invocations', () => {
|
|||||||
expect(callCount).toBe(1);
|
expect(callCount).toBe(1);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('deliverSessionMessages — retry and permanent failure', () => {
|
||||||
|
it('retries on adapter failure and marks failed after MAX_DELIVERY_ATTEMPTS (3)', async () => {
|
||||||
|
seedAgentAndChannel();
|
||||||
|
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||||
|
insertOutbound('ag-1', session.id, 'out-flaky');
|
||||||
|
|
||||||
|
let callCount = 0;
|
||||||
|
setDeliveryAdapter({
|
||||||
|
async deliver() {
|
||||||
|
callCount++;
|
||||||
|
throw new Error('network timeout');
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Attempt 1
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
expect(callCount).toBe(1);
|
||||||
|
|
||||||
|
// Attempt 2
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
expect(callCount).toBe(2);
|
||||||
|
|
||||||
|
// Attempt 3 — should mark as permanently failed
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
expect(callCount).toBe(3);
|
||||||
|
|
||||||
|
// Attempt 4 — message is now in delivered (as failed), adapter not called
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
expect(callCount).toBe(3);
|
||||||
|
|
||||||
|
// Verify the message is in the delivered table with 'failed' status
|
||||||
|
const inDb = openInboundDb('ag-1', session.id);
|
||||||
|
const delivered = getDeliveredIds(inDb);
|
||||||
|
inDb.close();
|
||||||
|
expect(delivered.has('out-flaky')).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('clears attempt counter on successful delivery', async () => {
|
||||||
|
seedAgentAndChannel();
|
||||||
|
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||||
|
insertOutbound('ag-1', session.id, 'out-retry-ok');
|
||||||
|
|
||||||
|
let callCount = 0;
|
||||||
|
setDeliveryAdapter({
|
||||||
|
async deliver() {
|
||||||
|
callCount++;
|
||||||
|
if (callCount === 1) throw new Error('transient');
|
||||||
|
return 'plat-ok';
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Attempt 1 — fails
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
expect(callCount).toBe(1);
|
||||||
|
|
||||||
|
// Attempt 2 — succeeds
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
expect(callCount).toBe(2);
|
||||||
|
|
||||||
|
// Attempt 3 — not called, message already delivered
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
expect(callCount).toBe(2);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('deliverSessionMessages — permission check', () => {
|
||||||
|
it('rejects delivery to an unauthorized channel destination', async () => {
|
||||||
|
seedAgentAndChannel();
|
||||||
|
|
||||||
|
// Create a second messaging group that the agent is NOT wired to
|
||||||
|
createMessagingGroup({
|
||||||
|
id: 'mg-2',
|
||||||
|
channel_type: 'discord',
|
||||||
|
platform_id: 'discord:456',
|
||||||
|
name: 'Unauthorized Chat',
|
||||||
|
is_group: 0,
|
||||||
|
unknown_sender_policy: 'public',
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Session is on mg-1 (telegram)
|
||||||
|
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||||
|
|
||||||
|
// Insert an outbound message targeting mg-2 (discord) — not the origin chat
|
||||||
|
const outDb = new Database(outboundDbPath('ag-1', session.id));
|
||||||
|
outDb.prepare(
|
||||||
|
`INSERT INTO messages_out (id, timestamp, kind, platform_id, channel_type, content)
|
||||||
|
VALUES (?, datetime('now'), 'chat', 'discord:456', 'discord', ?)`,
|
||||||
|
).run('out-unauth', JSON.stringify({ text: 'sneaky' }));
|
||||||
|
outDb.close();
|
||||||
|
|
||||||
|
const calls: string[] = [];
|
||||||
|
setDeliveryAdapter({
|
||||||
|
async deliver(_ct, _pid, _tid, _kind, content) {
|
||||||
|
calls.push(content);
|
||||||
|
return 'plat-msg';
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Deliver 3 times to exhaust retries
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
await deliverSessionMessages(session);
|
||||||
|
|
||||||
|
// Adapter never called — permission check throws before reaching it
|
||||||
|
expect(calls).toHaveLength(0);
|
||||||
|
|
||||||
|
// Message is marked as permanently failed
|
||||||
|
const inDb = openInboundDb('ag-1', session.id);
|
||||||
|
const delivered = getDeliveredIds(inDb);
|
||||||
|
inDb.close();
|
||||||
|
expect(delivered.has('out-unauth')).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user