test: add host-side routing and session resolution tests
Host-side (vitest): - Routed message preserves platformId/channelType/threadId on messages_in - Fan-out gives each agent correct per-agent routing - writeSessionRouting populates session_routing from messaging group - writeSessionRouting writes null routing for agent-shared sessions - Per-thread session includes thread_id in session_routing - Agent-shared resolves to same session on repeated calls - Agent-shared session has null messaging_group_id - findSessionByAgentGroup returns channel-bound session (documents #2332) - Skip: agent-shared/channel-bound coexistence (blocked on #2332 fix) Container-side (bun:test): - Internal tags stripped between message blocks - Mixed task + chat batch with correct routing The agent-shared tests uncovered the exact bug from #2332: findSessionByAgentGroup doesn't distinguish agent-shared from channel-bound sessions, so A2A resolution reuses a channel session when one exists. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -249,6 +249,51 @@ describe('poll loop integration', () => {
|
|||||||
await loopPromise.catch(() => {});
|
await loopPromise.catch(() => {});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('internal tags between message blocks are stripped from scratchpad', async () => {
|
||||||
|
insertMessage('m1', { sender: 'Alice', text: 'hi' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||||
|
|
||||||
|
const provider = new MockProvider(
|
||||||
|
{},
|
||||||
|
() => '<internal>thinking about this...</internal><message to="discord-test">answer</message><internal>done thinking</internal>',
|
||||||
|
);
|
||||||
|
const controller = new AbortController();
|
||||||
|
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||||
|
|
||||||
|
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||||
|
controller.abort();
|
||||||
|
|
||||||
|
const out = getUndeliveredMessages();
|
||||||
|
expect(out).toHaveLength(1);
|
||||||
|
expect(JSON.parse(out[0].content).text).toBe('answer');
|
||||||
|
|
||||||
|
await loopPromise.catch(() => {});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles mixed task + chat batch with correct origin metadata', async () => {
|
||||||
|
// Seed destination for routing lookup
|
||||||
|
insertMessage('m-chat', { sender: 'Alice', text: 'check this' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||||
|
// Task with same routing — simulates a scheduled task in a channel session
|
||||||
|
getInboundDb()
|
||||||
|
.prepare(
|
||||||
|
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content)
|
||||||
|
VALUES ('t-task', 'task', datetime('now'), 'pending', 'chan-1', 'discord', ?)`,
|
||||||
|
)
|
||||||
|
.run(JSON.stringify({ prompt: 'daily check' }));
|
||||||
|
|
||||||
|
const provider = new MockProvider({}, () => '<message to="discord-test">done</message>');
|
||||||
|
const controller = new AbortController();
|
||||||
|
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||||
|
|
||||||
|
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||||
|
controller.abort();
|
||||||
|
|
||||||
|
const out = getUndeliveredMessages();
|
||||||
|
expect(out).toHaveLength(1);
|
||||||
|
expect(out[0].platform_id).toBe('chan-1');
|
||||||
|
|
||||||
|
await loopPromise.catch(() => {});
|
||||||
|
});
|
||||||
|
|
||||||
it('should inject destination reminder after a compacted event', async () => {
|
it('should inject destination reminder after a compacted event', async () => {
|
||||||
// Two destinations — required for the reminder to fire (single-destination
|
// Two destinations — required for the reminder to fire (single-destination
|
||||||
// groups have a fallback path that works without <message to="…"> wrapping).
|
// groups have a fallback path that works without <message to="…"> wrapping).
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import {
|
|||||||
import {
|
import {
|
||||||
resolveSession,
|
resolveSession,
|
||||||
writeSessionMessage,
|
writeSessionMessage,
|
||||||
|
writeSessionRouting,
|
||||||
initSessionFolder,
|
initSessionFolder,
|
||||||
sessionDir,
|
sessionDir,
|
||||||
inboundDbPath,
|
inboundDbPath,
|
||||||
@@ -26,7 +27,7 @@ import {
|
|||||||
readOutboxFiles,
|
readOutboxFiles,
|
||||||
clearOutbox,
|
clearOutbox,
|
||||||
} from './session-manager.js';
|
} from './session-manager.js';
|
||||||
import { getSession, findSession } from './db/sessions.js';
|
import { getSession, findSession, findSessionByAgentGroup } from './db/sessions.js';
|
||||||
import type { InboundEvent } from './channels/adapter.js';
|
import type { InboundEvent } from './channels/adapter.js';
|
||||||
|
|
||||||
// Mock container runner to prevent actual Docker spawning
|
// Mock container runner to prevent actual Docker spawning
|
||||||
@@ -595,6 +596,304 @@ describe('router', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('routing metadata preservation', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Test Agent',
|
||||||
|
folder: 'test-agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
createMessagingGroup({
|
||||||
|
id: 'mg-1',
|
||||||
|
channel_type: 'discord',
|
||||||
|
platform_id: 'chan-123',
|
||||||
|
name: 'General',
|
||||||
|
is_group: 1,
|
||||||
|
unknown_sender_policy: 'public',
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
createMessagingGroupAgent({
|
||||||
|
id: 'mga-1',
|
||||||
|
messaging_group_id: 'mg-1',
|
||||||
|
agent_group_id: 'ag-1',
|
||||||
|
engage_mode: 'pattern',
|
||||||
|
engage_pattern: '.',
|
||||||
|
sender_scope: 'all',
|
||||||
|
ignored_message_policy: 'drop',
|
||||||
|
session_mode: 'shared',
|
||||||
|
priority: 0,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('routed message carries platformId, channelType, threadId on the messages_in row', async () => {
|
||||||
|
const { routeInbound } = await import('./router.js');
|
||||||
|
|
||||||
|
await routeInbound({
|
||||||
|
channelType: 'discord',
|
||||||
|
platformId: 'chan-123',
|
||||||
|
threadId: 'thread-42',
|
||||||
|
message: { id: 'msg-r1', kind: 'chat', content: JSON.stringify({ sender: 'A', text: 'hi' }), timestamp: now() },
|
||||||
|
});
|
||||||
|
|
||||||
|
const session = findSession('mg-1', null);
|
||||||
|
const db = new Database(inboundDbPath('ag-1', session!.id));
|
||||||
|
const row = db.prepare('SELECT platform_id, channel_type, thread_id FROM messages_in WHERE id LIKE ?').get('msg-r1%') as {
|
||||||
|
platform_id: string | null;
|
||||||
|
channel_type: string | null;
|
||||||
|
thread_id: string | null;
|
||||||
|
};
|
||||||
|
db.close();
|
||||||
|
|
||||||
|
expect(row.platform_id).toBe('chan-123');
|
||||||
|
expect(row.channel_type).toBe('discord');
|
||||||
|
expect(row.thread_id).toBe('thread-42');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fan-out gives each agent its own routing, not leaked from sibling', async () => {
|
||||||
|
const { routeInbound } = await import('./router.js');
|
||||||
|
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-2',
|
||||||
|
name: 'Agent Two',
|
||||||
|
folder: 'agent-two',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
createMessagingGroupAgent({
|
||||||
|
id: 'mga-2',
|
||||||
|
messaging_group_id: 'mg-1',
|
||||||
|
agent_group_id: 'ag-2',
|
||||||
|
engage_mode: 'pattern',
|
||||||
|
engage_pattern: '.',
|
||||||
|
sender_scope: 'all',
|
||||||
|
ignored_message_policy: 'drop',
|
||||||
|
session_mode: 'shared',
|
||||||
|
priority: 0,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
await routeInbound({
|
||||||
|
channelType: 'discord',
|
||||||
|
platformId: 'chan-123',
|
||||||
|
threadId: 'thread-fanout',
|
||||||
|
message: { id: 'msg-fo', kind: 'chat', content: JSON.stringify({ text: 'fan' }), timestamp: now() },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Both agents should have the message with correct routing
|
||||||
|
const { getSessionsByAgentGroup } = await import('./db/sessions.js');
|
||||||
|
for (const agId of ['ag-1', 'ag-2']) {
|
||||||
|
const sessions = getSessionsByAgentGroup(agId);
|
||||||
|
expect(sessions).toHaveLength(1);
|
||||||
|
const db = new Database(inboundDbPath(agId, sessions[0].id));
|
||||||
|
const row = db.prepare('SELECT platform_id, channel_type, thread_id FROM messages_in LIMIT 1').get() as {
|
||||||
|
platform_id: string | null;
|
||||||
|
channel_type: string | null;
|
||||||
|
thread_id: string | null;
|
||||||
|
};
|
||||||
|
db.close();
|
||||||
|
expect(row.platform_id).toBe('chan-123');
|
||||||
|
expect(row.channel_type).toBe('discord');
|
||||||
|
expect(row.thread_id).toBe('thread-fanout');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('writeSessionRouting', () => {
|
||||||
|
it('populates session_routing from the messaging group', () => {
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Agent',
|
||||||
|
folder: 'agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
createMessagingGroup({
|
||||||
|
id: 'mg-1',
|
||||||
|
channel_type: 'telegram',
|
||||||
|
platform_id: 'tg:12345',
|
||||||
|
name: 'Chat',
|
||||||
|
is_group: 0,
|
||||||
|
unknown_sender_policy: 'public',
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||||
|
writeSessionRouting('ag-1', session.id);
|
||||||
|
|
||||||
|
const db = new Database(inboundDbPath('ag-1', session.id));
|
||||||
|
const row = db.prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1').get() as {
|
||||||
|
channel_type: string | null;
|
||||||
|
platform_id: string | null;
|
||||||
|
thread_id: string | null;
|
||||||
|
} | undefined;
|
||||||
|
db.close();
|
||||||
|
|
||||||
|
expect(row).toBeDefined();
|
||||||
|
expect(row!.channel_type).toBe('telegram');
|
||||||
|
expect(row!.platform_id).toBe('tg:12345');
|
||||||
|
expect(row!.thread_id).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('writes null routing for agent-shared session (no messaging group)', () => {
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Agent',
|
||||||
|
folder: 'agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { session } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||||
|
writeSessionRouting('ag-1', session.id);
|
||||||
|
|
||||||
|
const db = new Database(inboundDbPath('ag-1', session.id));
|
||||||
|
const row = db.prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1').get() as {
|
||||||
|
channel_type: string | null;
|
||||||
|
platform_id: string | null;
|
||||||
|
thread_id: string | null;
|
||||||
|
} | undefined;
|
||||||
|
db.close();
|
||||||
|
|
||||||
|
expect(row).toBeDefined();
|
||||||
|
expect(row!.channel_type).toBeNull();
|
||||||
|
expect(row!.platform_id).toBeNull();
|
||||||
|
expect(row!.thread_id).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('includes thread_id from per-thread session', () => {
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Agent',
|
||||||
|
folder: 'agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
createMessagingGroup({
|
||||||
|
id: 'mg-1',
|
||||||
|
channel_type: 'discord',
|
||||||
|
platform_id: 'chan-123',
|
||||||
|
name: 'General',
|
||||||
|
is_group: 1,
|
||||||
|
unknown_sender_policy: 'public',
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { session } = resolveSession('ag-1', 'mg-1', 'thread-77', 'per-thread');
|
||||||
|
writeSessionRouting('ag-1', session.id);
|
||||||
|
|
||||||
|
const db = new Database(inboundDbPath('ag-1', session.id));
|
||||||
|
const row = db.prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1').get() as {
|
||||||
|
channel_type: string | null;
|
||||||
|
platform_id: string | null;
|
||||||
|
thread_id: string | null;
|
||||||
|
} | undefined;
|
||||||
|
db.close();
|
||||||
|
|
||||||
|
expect(row).toBeDefined();
|
||||||
|
expect(row!.channel_type).toBe('discord');
|
||||||
|
expect(row!.platform_id).toBe('chan-123');
|
||||||
|
expect(row!.thread_id).toBe('thread-77');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('agent-shared session resolution', () => {
|
||||||
|
it('resolves to the same session on repeated calls', () => {
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Agent',
|
||||||
|
folder: 'agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { session: s1, created: c1 } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||||
|
const { session: s2, created: c2 } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||||
|
|
||||||
|
expect(c1).toBe(true);
|
||||||
|
expect(c2).toBe(false);
|
||||||
|
expect(s1.id).toBe(s2.id);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('agent-shared session has null messaging_group_id', () => {
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Agent',
|
||||||
|
folder: 'agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { session } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||||
|
expect(session.messaging_group_id).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
// BUG (#2332): agent-shared resolveSession reuses an existing channel-bound
|
||||||
|
// session via findSessionByAgentGroup instead of creating a dedicated
|
||||||
|
// agent-shared session. The two cannot coexist today — the agent-shared
|
||||||
|
// call finds the channel session and returns it. This test documents the
|
||||||
|
// current (broken) behavior; fixing #2332 should make it pass as written.
|
||||||
|
it.skip('agent-shared and channel-bound sessions coexist for the same agent group', () => {
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Agent',
|
||||||
|
folder: 'agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
createMessagingGroup({
|
||||||
|
id: 'mg-1',
|
||||||
|
channel_type: 'discord',
|
||||||
|
platform_id: 'chan-123',
|
||||||
|
name: 'General',
|
||||||
|
is_group: 1,
|
||||||
|
unknown_sender_policy: 'public',
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { session: shared } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||||
|
const { session: agentShared } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||||
|
|
||||||
|
expect(shared.id).not.toBe(agentShared.id);
|
||||||
|
expect(shared.messaging_group_id).toBe('mg-1');
|
||||||
|
expect(agentShared.messaging_group_id).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('findSessionByAgentGroup returns existing channel-bound session (bug #2332)', () => {
|
||||||
|
// Documents the current behavior: findSessionByAgentGroup doesn't
|
||||||
|
// distinguish agent-shared from channel-bound. When a channel session
|
||||||
|
// exists, agent-shared resolution reuses it instead of creating a
|
||||||
|
// separate session. This is the root cause of A2A misrouting.
|
||||||
|
createAgentGroup({
|
||||||
|
id: 'ag-1',
|
||||||
|
name: 'Agent',
|
||||||
|
folder: 'agent',
|
||||||
|
agent_provider: null,
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
createMessagingGroup({
|
||||||
|
id: 'mg-1',
|
||||||
|
channel_type: 'discord',
|
||||||
|
platform_id: 'chan-123',
|
||||||
|
name: 'General',
|
||||||
|
is_group: 1,
|
||||||
|
unknown_sender_policy: 'public',
|
||||||
|
created_at: now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const { session: channelSession } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||||
|
const found = findSessionByAgentGroup('ag-1');
|
||||||
|
|
||||||
|
// Bug: picks the channel session — an agent-shared call would get this
|
||||||
|
// instead of a dedicated session.
|
||||||
|
expect(found).toBeDefined();
|
||||||
|
expect(found!.id).toBe(channelSession.id);
|
||||||
|
expect(found!.messaging_group_id).toBe('mg-1'); // should be null for agent-shared
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('delivery', () => {
|
describe('delivery', () => {
|
||||||
it('should detect undelivered messages in outbound DB', () => {
|
it('should detect undelivered messages in outbound DB', () => {
|
||||||
createAgentGroup({
|
createAgentGroup({
|
||||||
|
|||||||
Reference in New Issue
Block a user