/** * Delivery race tests. * * The active poll (1s, running sessions) and the sweep poll (60s, all * active sessions) both call deliverSessionMessages. A running session * sits in both result sets, so the two timer chains can race on the same * outbound row — read-undelivered → call channel API → markDelivered. The * INSERT OR IGNORE in markDelivered makes the DB write idempotent, but * the channel API has already fired twice → user sees the message twice. */ import Database from 'better-sqlite3'; import fs from 'fs'; import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; vi.mock('./container-runner.js', () => ({ wakeContainer: vi.fn().mockResolvedValue(undefined), isContainerRunning: vi.fn().mockReturnValue(false), killContainer: vi.fn(), buildAgentGroupImage: vi.fn().mockResolvedValue(undefined), })); vi.mock('./config.js', async () => { const actual = await vi.importActual('./config.js'); return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-delivery' }; }); const TEST_DIR = '/tmp/nanoclaw-test-delivery'; import { initTestDb, closeDb, runMigrations, createAgentGroup, createMessagingGroup, createMessagingGroupAgent, } from './db/index.js'; import { getDeliveredIds } from './db/session-db.js'; import { resolveSession, outboundDbPath, openInboundDb } from './session-manager.js'; import { deliverSessionMessages, setDeliveryAdapter } from './delivery.js'; function now(): string { return new Date().toISOString(); } function seedAgentAndChannel(): void { createAgentGroup({ id: 'ag-1', name: 'Test Agent', folder: 'test-agent', agent_provider: null, created_at: now(), }); createMessagingGroup({ id: 'mg-1', channel_type: 'telegram', platform_id: 'telegram:123', name: 'Test Chat', is_group: 0, unknown_sender_policy: 'public', created_at: now(), }); } function insertOutbound(agentGroupId: string, sessionId: string, msgId: string): void { const db = new Database(outboundDbPath(agentGroupId, sessionId)); db.prepare( `INSERT INTO messages_out (id, timestamp, kind, platform_id, channel_type, content) VALUES (?, datetime('now'), 'chat', 'telegram:123', 'telegram', ?)`, ).run(msgId, JSON.stringify({ text: 'hello' })); db.close(); } beforeEach(() => { if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); fs.mkdirSync(TEST_DIR, { recursive: true }); const db = initTestDb(); runMigrations(db); }); afterEach(() => { closeDb(); if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); }); describe('deliverSessionMessages — concurrent invocations', () => { it('delivers a message exactly once when active and sweep polls overlap', async () => { seedAgentAndChannel(); const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); insertOutbound('ag-1', session.id, 'out-1'); const calls: string[] = []; setDeliveryAdapter({ async deliver(_channelType, _platformId, _threadId, _kind, content) { calls.push(content); // Hold long enough that the second concurrent caller can race the // read-undelivered → markDelivered window. await new Promise((r) => setTimeout(r, 100)); return 'plat-msg-1'; }, }); // Two concurrent calls — simulating active (1s) and sweep (60s) polls // hitting the same running session at the same moment. await Promise.all([deliverSessionMessages(session), deliverSessionMessages(session)]); expect(calls).toHaveLength(1); }); it('still delivers on a subsequent call after the first finishes', async () => { seedAgentAndChannel(); const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); insertOutbound('ag-1', session.id, 'out-first'); const calls: string[] = []; setDeliveryAdapter({ async deliver(_channelType, _platformId, _threadId, _kind, content) { calls.push(content); return 'plat-msg-id'; }, }); await deliverSessionMessages(session); expect(calls).toHaveLength(1); // Insert a second outbound message and deliver again — the lock from // the first call must have been released. insertOutbound('ag-1', session.id, 'out-second'); await deliverSessionMessages(session); expect(calls).toHaveLength(2); }); it('does not re-deliver when retried after a successful send (cleanup-after-send safety)', async () => { // If something post-send throws (e.g. outbox cleanup), the message has // still landed on the user's screen — the catch path must not trigger // a re-send. We simulate by having the adapter succeed on the first // call and recording how many times it's invoked across two attempts. seedAgentAndChannel(); const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); insertOutbound('ag-1', session.id, 'out-once'); let callCount = 0; setDeliveryAdapter({ async deliver() { callCount++; return 'plat-msg-id'; }, }); await deliverSessionMessages(session); // Re-invoke — should be idempotent because the message is now in the // delivered table; the channel adapter must not be called again. await deliverSessionMessages(session); expect(callCount).toBe(1); }); }); describe('deliverSessionMessages — retry and permanent failure', () => { it('retries on adapter failure and marks failed after MAX_DELIVERY_ATTEMPTS (3)', async () => { seedAgentAndChannel(); const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); insertOutbound('ag-1', session.id, 'out-flaky'); let callCount = 0; setDeliveryAdapter({ async deliver() { callCount++; throw new Error('network timeout'); }, }); // Attempt 1 await deliverSessionMessages(session); expect(callCount).toBe(1); // Attempt 2 await deliverSessionMessages(session); expect(callCount).toBe(2); // Attempt 3 — should mark as permanently failed await deliverSessionMessages(session); expect(callCount).toBe(3); // Attempt 4 — message is now in delivered (as failed), adapter not called await deliverSessionMessages(session); expect(callCount).toBe(3); // Verify the message is in the delivered table with 'failed' status const inDb = openInboundDb('ag-1', session.id); const delivered = getDeliveredIds(inDb); inDb.close(); expect(delivered.has('out-flaky')).toBe(true); }); it('clears attempt counter on successful delivery', async () => { seedAgentAndChannel(); const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); insertOutbound('ag-1', session.id, 'out-retry-ok'); let callCount = 0; setDeliveryAdapter({ async deliver() { callCount++; if (callCount === 1) throw new Error('transient'); return 'plat-ok'; }, }); // Attempt 1 — fails await deliverSessionMessages(session); expect(callCount).toBe(1); // Attempt 2 — succeeds await deliverSessionMessages(session); expect(callCount).toBe(2); // Attempt 3 — not called, message already delivered await deliverSessionMessages(session); expect(callCount).toBe(2); }); }); describe('deliverSessionMessages — permission check', () => { it('rejects delivery to an unauthorized channel destination', async () => { seedAgentAndChannel(); // Create a second messaging group that the agent is NOT wired to createMessagingGroup({ id: 'mg-2', channel_type: 'discord', platform_id: 'discord:456', name: 'Unauthorized Chat', is_group: 0, unknown_sender_policy: 'public', created_at: now(), }); // Session is on mg-1 (telegram) const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); // Insert an outbound message targeting mg-2 (discord) — not the origin chat const outDb = new Database(outboundDbPath('ag-1', session.id)); outDb .prepare( `INSERT INTO messages_out (id, timestamp, kind, platform_id, channel_type, content) VALUES (?, datetime('now'), 'chat', 'discord:456', 'discord', ?)`, ) .run('out-unauth', JSON.stringify({ text: 'sneaky' })); outDb.close(); const calls: string[] = []; setDeliveryAdapter({ async deliver(_ct, _pid, _tid, _kind, content) { calls.push(content); return 'plat-msg'; }, }); // Deliver 3 times to exhaust retries await deliverSessionMessages(session); await deliverSessionMessages(session); await deliverSessionMessages(session); // Adapter never called — permission check throws before reaching it expect(calls).toHaveLength(0); // Message is marked as permanently failed const inDb = openInboundDb('ag-1', session.id); const delivered = getDeliveredIds(inDb); inDb.close(); expect(delivered.has('out-unauth')).toBe(true); }); });