From 18d0b6e53f1af94a8c41dcaf7f7df6a7277bcca9 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Wed, 8 Apr 2026 23:40:00 +0300 Subject: [PATCH] v2: add agent-runner integration tests Poll loop end-to-end with mock provider: message pickup, batch processing, concurrent polling for late arrivals. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../agent-runner/src/integration.test.ts | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 container/agent-runner/src/integration.test.ts diff --git a/container/agent-runner/src/integration.test.ts b/container/agent-runner/src/integration.test.ts new file mode 100644 index 0000000..63c07b7 --- /dev/null +++ b/container/agent-runner/src/integration.test.ts @@ -0,0 +1,120 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; + +import { initTestSessionDb, closeSessionDb, getSessionDb } from './db/connection.js'; +import { getUndeliveredMessages } from './db/messages-out.js'; +import { getPendingMessages } from './db/messages-in.js'; +import { MockProvider } from './providers/mock.js'; +import { runPollLoop } from './poll-loop.js'; + +beforeEach(() => { + initTestSessionDb(); +}); + +afterEach(() => { + closeSessionDb(); +}); + +function insertMessage(id: string, content: object, opts?: { platformId?: string; channelType?: string; threadId?: string }) { + getSessionDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, thread_id, content) + VALUES (?, 'chat', datetime('now'), 'pending', ?, ?, ?, ?)`, + ) + .run(id, opts?.platformId ?? null, opts?.channelType ?? null, opts?.threadId ?? null, JSON.stringify(content)); +} + +describe('poll loop integration', () => { + it('should pick up a message, process it, and write a response', async () => { + // Insert a message before starting the loop + insertMessage('m1', { sender: 'Alice', text: 'What is the meaning of life?' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'thread-1' }); + + const provider = new MockProvider(() => '42'); + + // Run the poll loop in background, abort after it processes + const controller = new AbortController(); + const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000); + + // Wait for processing + await waitFor(() => getUndeliveredMessages().length > 0, 2000); + controller.abort(); + + // Verify + const out = getUndeliveredMessages(); + expect(out).toHaveLength(1); + expect(JSON.parse(out[0].content).text).toBe('42'); + expect(out[0].platform_id).toBe('chan-1'); + expect(out[0].channel_type).toBe('discord'); + expect(out[0].thread_id).toBe('thread-1'); + expect(out[0].in_reply_to).toBe('m1'); + + // Input message should be completed + const pending = getPendingMessages(); + expect(pending).toHaveLength(0); + + await loopPromise.catch(() => {}); // swallow abort + }); + + it('should process multiple messages in a batch', async () => { + insertMessage('m1', { sender: 'Alice', text: 'Hello' }); + insertMessage('m2', { sender: 'Bob', text: 'World' }); + + const provider = new MockProvider(() => 'Got both messages'); + const controller = new AbortController(); + const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000); + + await waitFor(() => getUndeliveredMessages().length > 0, 2000); + controller.abort(); + + const out = getUndeliveredMessages(); + expect(out).toHaveLength(1); + expect(JSON.parse(out[0].content).text).toBe('Got both messages'); + + await loopPromise.catch(() => {}); + }); + + it('should process messages arriving after loop starts', async () => { + const provider = new MockProvider(() => 'Processed'); + const controller = new AbortController(); + const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 3000); + + // Insert message after loop has started + await sleep(200); + insertMessage('m-late', { sender: 'Charlie', text: 'Late arrival' }); + + await waitFor(() => getUndeliveredMessages().length > 0, 2000); + controller.abort(); + + const out = getUndeliveredMessages(); + expect(out.length).toBeGreaterThanOrEqual(1); + + await loopPromise.catch(() => {}); + }); +}); + +// Helper: run poll loop until aborted or timeout +async function runPollLoopWithTimeout(provider: MockProvider, signal: AbortSignal, timeoutMs: number): Promise { + return Promise.race([ + runPollLoop({ + provider, + cwd: '/tmp', + mcpServers: {}, + env: {}, + }), + new Promise((_, reject) => { + signal.addEventListener('abort', () => reject(new Error('aborted'))); + }), + new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), timeoutMs)), + ]); +} + +async function waitFor(condition: () => boolean, timeoutMs: number): Promise { + const start = Date.now(); + while (!condition()) { + if (Date.now() - start > timeoutMs) throw new Error('waitFor timeout'); + await sleep(50); + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +}