fix(agent-to-agent): route A2A replies back to originating session (#2267)

Squash merge of PR #2267 by ddaniels.

When an agent group has more than one active session, A2A replies landed
in the newest session via findSessionByAgentGroup's ORDER BY created_at
DESC. The session that asked the question never saw the answer.

Adds origin-aware return-path routing with three layers:

1. Direct return-path: if the reply has in_reply_to, look up the
   triggering inbound row's source_session_id and route there.
2. Peer-affinity fallback: find the most recent A2A inbound from this
   peer and use its source_session_id.
3. Legacy fallback: newest active session (pre-migration compat).

Container-side: MCP send_message/send_file now thread the current
batch's in_reply_to through to outbound rows via current-batch.ts.

Also flips our A2A bug-documenting test (#2332) from asserting the
broken behavior to asserting the fixed behavior.

Co-Authored-By: Doug Daniels <ddaniels888@gmail.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-05-08 00:48:10 +03:00
parent 3b07c0ceaf
commit 107945f10c
12 changed files with 517 additions and 39 deletions

View File

@@ -0,0 +1,29 @@
/**
* Per-batch context the poll loop publishes for downstream consumers
* (MCP tools, etc.) that don't sit on the poll-loop's call stack.
*
* Today the only field is `inReplyTo` — the id of the first inbound
* message in the batch the agent is currently processing. MCP tools like
* `send_message` and `send_file` read this and stamp it onto the outbound
* row so the host's a2a return-path routing can correlate replies back to
* the originating session.
*
* This is module-level state on purpose: the agent-runner is single-process
* and processes one batch at a time. Poll-loop calls `setCurrentInReplyTo`
* before invoking the provider and `clearCurrentInReplyTo` after the batch
* completes (or errors out).
*/
let currentInReplyTo: string | null = null;
export function setCurrentInReplyTo(id: string | null): void {
currentInReplyTo = id;
}
export function clearCurrentInReplyTo(): void {
currentInReplyTo = null;
}
export function getCurrentInReplyTo(): string | null {
return currentInReplyTo;
}

View File

@@ -0,0 +1,50 @@
/**
* Tests for the core MCP tools' interaction with the per-batch routing
* context. The agent-runner sets a current `inReplyTo` at the top of each
* batch in poll-loop, and outbound writes from MCP tools (send_message,
* send_file) must pick it up so a2a return-path routing on the host can
* correlate replies back to the originating session.
*/
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
import { initTestSessionDb, closeSessionDb, getInboundDb } from '../db/connection.js';
import { getUndeliveredMessages } from '../db/messages-out.js';
import { setCurrentInReplyTo, clearCurrentInReplyTo } from '../current-batch.js';
import { sendMessage } from './core.js';
beforeEach(() => {
initTestSessionDb();
// Seed a peer agent destination
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES ('peer', 'Peer', 'agent', NULL, NULL, 'ag-peer')`,
)
.run();
});
afterEach(() => {
clearCurrentInReplyTo();
closeSessionDb();
});
describe('send_message MCP tool — in_reply_to plumbing', () => {
it('stamps current batch in_reply_to on outbound rows', async () => {
setCurrentInReplyTo('inbound-msg-1');
await sendMessage.handler({ to: 'peer', text: 'hello' });
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].in_reply_to).toBe('inbound-msg-1');
});
it('writes null when no batch is active', async () => {
// No setCurrentInReplyTo before this call — simulates ad-hoc / out-of-batch invocation.
await sendMessage.handler({ to: 'peer', text: 'hello' });
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].in_reply_to).toBeNull();
});
});

View File

@@ -9,6 +9,7 @@
import fs from 'fs';
import path from 'path';
import { getCurrentInReplyTo } from '../current-batch.js';
import { findByName, getAllDestinations } from '../destinations.js';
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
import { getSessionRouting } from '../db/session-routing.js';
@@ -50,9 +51,7 @@ function destinationList(): string {
*/
function resolveRouting(
to: string | undefined,
):
| { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string }
| { error: string } {
): { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } | { error: string } {
if (!to) {
// Default: reply to whatever thread/channel this session is bound to.
const session = getSessionRouting();
@@ -82,9 +81,7 @@ function resolveRouting(
// preserve the thread_id so replies land in the correct thread.
const session = getSessionRouting();
const threadId =
session.channel_type === dest.channelType && session.platform_id === dest.platformId
? session.thread_id
: null;
session.channel_type === dest.channelType && session.platform_id === dest.platformId ? session.thread_id : null;
return {
channel_type: dest.channelType!,
platform_id: dest.platformId!,
@@ -98,12 +95,14 @@ function resolveRouting(
export const sendMessage: McpToolDefinition = {
tool: {
name: 'send_message',
description:
'Send a message to a named destination. If you have only one destination, you can omit `to`.',
description: 'Send a message to a named destination. If you have only one destination, you can omit `to`.',
inputSchema: {
type: 'object' as const,
properties: {
to: { type: 'string', description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.' },
to: {
type: 'string',
description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.',
},
text: { type: 'string', description: 'Message content' },
},
required: ['text'],
@@ -119,6 +118,7 @@ export const sendMessage: McpToolDefinition = {
const id = generateId();
const seq = writeMessageOut({
id,
in_reply_to: getCurrentInReplyTo(),
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,
@@ -165,6 +165,7 @@ export const sendFile: McpToolDefinition = {
writeMessageOut({
id,
in_reply_to: getCurrentInReplyTo(),
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,

View File

@@ -2,12 +2,17 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destina
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
import { writeMessageOut } from './db/messages-out.js';
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
import { clearContinuation, migrateLegacyContinuation, setContinuation } from './db/session-state.js';
import { clearCurrentInReplyTo, setCurrentInReplyTo } from './current-batch.js';
import {
clearContinuation,
migrateLegacyContinuation,
setContinuation,
} from './db/session-state.js';
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, isRunnerCommand, stripInternalTags, type RoutingContext } from './formatter.js';
formatMessages,
extractRouting,
categorizeMessage,
isClearCommand,
isRunnerCommand,
stripInternalTags,
type RoutingContext,
} from './formatter.js';
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
const POLL_INTERVAL_MS = 1000;
@@ -170,6 +175,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
// Process the query while concurrently polling for new messages
const skippedSet = new Set(skipped);
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
// Publish the batch's in_reply_to so MCP tools (send_message, send_file)
// can stamp it on outbound rows — needed for a2a return-path routing.
setCurrentInReplyTo(routing.inReplyTo);
try {
const result = await processQuery(query, routing, processingIds, config.providerName);
if (result.continuation && result.continuation !== continuation) {
@@ -198,6 +206,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
thread_id: routing.threadId,
content: JSON.stringify({ text: `Error: ${errMsg}` }),
});
} finally {
clearCurrentInReplyTo();
}
// Ensure completed even if processQuery ended without a result event
@@ -402,7 +412,9 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`);
break;
case 'error':
log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`);
log(
`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`,
);
break;
case 'progress':
log(`Progress: ${event.message}`);