Merge remote-tracking branch 'origin/main' into nc-cli

This commit is contained in:
gavrielc
2026-05-08 15:35:03 +03:00
62 changed files with 2919 additions and 254 deletions

View File

@@ -0,0 +1,34 @@
/**
* PreCompact hook script — outputs custom compaction instructions to stdout.
*
* Claude Code captures the stdout of PreCompact shell hooks and passes it
* as `customInstructions` to the compaction prompt. This ensures the
* compaction summary preserves message routing context that the agent needs
* to correctly address responses.
*
* Invoked by the PreCompact hook in .claude-shared/settings.json:
* "command": "bun /app/src/compact-instructions.ts"
*/
import { getAllDestinations } from './destinations.js';
const destinations = getAllDestinations();
const names = destinations.map((d) => d.name);
const instructions = [
'Preserve the following in the compaction summary:',
'',
'1. For recent messages, keep the full XML structure including all attributes:',
' - <message from="..." sender="..." time="..."> for chat messages',
' - <task from="..." time="..."> for scheduled tasks',
' - <webhook from="..." source="..." event="..."> for webhooks',
' The message content can be summarized if long, but the XML tags and attributes must remain.',
'',
'2. Preserve the chronological message/reply sequence of recent exchanges.',
' The agent needs to see: who said what, in what order, and from which destination.',
'',
'3. The `from` attribute identifies which destination sent the message.',
' The agent MUST wrap all responses in <message to="name">...</message> blocks.',
` Available destinations: ${names.length > 0 ? names.map((n) => `\`${n}\``).join(', ') : '(none)'}`,
];
console.log(instructions.join('\n'));

View File

@@ -0,0 +1,29 @@
/**
* Per-batch context the poll loop publishes for downstream consumers
* (MCP tools, etc.) that don't sit on the poll-loop's call stack.
*
* Today the only field is `inReplyTo` — the id of the first inbound
* message in the batch the agent is currently processing. MCP tools like
* `send_message` and `send_file` read this and stamp it onto the outbound
* row so the host's a2a return-path routing can correlate replies back to
* the originating session.
*
* This is module-level state on purpose: the agent-runner is single-process
* and processes one batch at a time. Poll-loop calls `setCurrentInReplyTo`
* before invoking the provider and `clearCurrentInReplyTo` after the batch
* completes (or errors out).
*/
let currentInReplyTo: string | null = null;
export function setCurrentInReplyTo(id: string | null): void {
currentInReplyTo = id;
}
export function clearCurrentInReplyTo(): void {
currentInReplyTo = null;
}
export function getCurrentInReplyTo(): string | null {
return currentInReplyTo;
}

View File

@@ -0,0 +1,63 @@
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import { closeSessionDb, getInboundDb, initTestSessionDb } from './db/connection.js';
import { buildSystemPromptAddendum } from './destinations.js';
beforeEach(() => {
initTestSessionDb();
});
afterEach(() => {
closeSessionDb();
});
function seedDestination(name: string, displayName: string, channelType: string, platformId: string): void {
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES (?, ?, 'channel', ?, ?, NULL)`,
)
.run(name, displayName, channelType, platformId);
}
describe('buildSystemPromptAddendum — multi-destination routing guidance', () => {
it('includes default-routing nudge when there are >1 destinations', () => {
seedDestination('casa', 'Casa', 'whatsapp', 'group-1@g.us');
seedDestination('whatsapp-mg-17780', 'whatsapp-mg-17780', 'whatsapp', 'phone-2@s.whatsapp.net');
const prompt = buildSystemPromptAddendum('Casa');
expect(prompt).toContain('Default routing');
expect(prompt).toContain('from="name"');
expect(prompt).toContain('`casa`');
expect(prompt).toContain('`whatsapp-mg-17780`');
});
it('requires explicit wrapping even for a single destination', () => {
seedDestination('casa', 'Casa', 'whatsapp', 'group-1@g.us');
const prompt = buildSystemPromptAddendum('Casa');
expect(prompt).toContain('Every response must be wrapped');
expect(prompt).toContain('<message to="name">');
expect(prompt).toContain('`casa`');
});
it('handles the no-destination case without crashing', () => {
const prompt = buildSystemPromptAddendum('Casa');
expect(prompt).toContain('no configured destinations');
expect(prompt).not.toContain('Default routing');
});
it('includes default-routing and wrapping instructions for single destination', () => {
seedDestination('casa', 'Casa', 'whatsapp', 'group-1@g.us');
const prompt = buildSystemPromptAddendum('Casa');
expect(prompt).toContain('Every response must be wrapped');
expect(prompt).toContain('<message to="name">');
expect(prompt).toContain('Default routing');
expect(prompt).toContain('`casa`');
});
});

View File

@@ -102,32 +102,28 @@ function buildDestinationsSection(): string {
].join('\n');
}
// Single-destination shortcut: the agent just writes its response normally.
const lines = ['## Sending messages', ''];
if (all.length === 1) {
const d = all[0];
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
return [
'## Sending messages',
'',
`Your messages are delivered to \`${d.name}\`${label}. Just write your response directly — no special wrapping needed.`,
'',
'To mark something as scratchpad (logged but not sent), wrap it in `<internal>...</internal>`.',
'',
'To send a message mid-response (e.g., an acknowledgment before a long task), call the `send_message` MCP tool.',
].join('\n');
}
const lines = ['## Sending messages', '', 'You can send messages to the following destinations:', ''];
for (const d of all) {
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
lines.push(`- \`${d.name}\`${label}`);
lines.push(`Your destination is \`${d.name}\`${label}.`);
} else {
lines.push('You can send messages to the following destinations:', '');
for (const d of all) {
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
lines.push(`- \`${d.name}\`${label}`);
}
}
lines.push('');
lines.push('To send a message, wrap it in a `<message to="name">...</message>` block.');
lines.push('**Every response must be wrapped** in a `<message to="name">...</message>` block.');
lines.push('You can include multiple `<message>` blocks in one response to send to multiple destinations.');
lines.push('Text outside of `<message>` blocks is scratchpad — logged but not sent anywhere.');
lines.push('Use `<internal>...</internal>` to make scratchpad intent explicit.');
lines.push('');
lines.push(
'**Default routing**: when replying to an incoming message, address the same destination the message came `from` — every inbound `<message>` tag carries a `from="name"` attribute that names the origin destination. Only address a different destination when the request itself asks you to (e.g., "tell Laura that…").',
);
lines.push('');
lines.push(
'To send a message mid-response (e.g., an acknowledgment before a long task), call the `send_message` MCP tool with the `to` parameter set to a destination name.',
);

View File

@@ -177,40 +177,49 @@ function formatSingleChat(msg: MessageInRow): string {
const replyPrefix = formatReplyContext(content.replyTo);
const attachmentsSuffix = formatAttachments(content.attachments);
// Look up the destination name for the origin (reverse map lookup).
// If not found, fall back to a raw channel:platform_id marker so nothing
// gets silently dropped — this should only happen if the destination was
// removed between when the message was received and when it's being processed.
const fromDest = findByRouting(msg.channel_type, msg.platform_id);
const fromAttr = fromDest
? ` from="${escapeXml(fromDest.name)}"`
: msg.channel_type || msg.platform_id
? ` from="unknown:${escapeXml(msg.channel_type || '')}:${escapeXml(msg.platform_id || '')}"`
: '';
const fromAttr = originAttr(msg);
return `<message${idAttr}${fromAttr} sender="${escapeXml(sender)}" time="${escapeXml(time)}"${replyAttr}>${replyPrefix}${escapeXml(text)}${attachmentsSuffix}</message>`;
}
/**
* Build a ` from="destination_name"` attribute string from a message's routing
* fields. Shared by all formatters so the agent always knows where a message
* originated — critical for explicit addressing.
*/
function originAttr(msg: MessageInRow): string {
const fromDest = findByRouting(msg.channel_type, msg.platform_id);
if (fromDest) return ` from="${escapeXml(fromDest.name)}"`;
if (msg.channel_type || msg.platform_id) {
return ` from="unknown:${escapeXml(msg.channel_type || '')}:${escapeXml(msg.platform_id || '')}"`;
}
return '';
}
function formatTaskMessage(msg: MessageInRow): string {
const content = parseContent(msg.content);
const parts = ['[SCHEDULED TASK]'];
const from = originAttr(msg);
const time = formatLocalTime(msg.timestamp, TIMEZONE);
const parts: string[] = [];
if (content.scriptOutput) {
parts.push('', 'Script output:', JSON.stringify(content.scriptOutput, null, 2));
parts.push('Script output:', JSON.stringify(content.scriptOutput, null, 2), '');
}
parts.push('', 'Instructions:', content.prompt || '');
return parts.join('\n');
parts.push('Instructions:', content.prompt || '');
return `<task${from} time="${escapeXml(time)}">${parts.join('\n')}</task>`;
}
function formatWebhookMessage(msg: MessageInRow): string {
const content = parseContent(msg.content);
const source = content.source || 'unknown';
const event = content.event || 'unknown';
return `[WEBHOOK: ${source}/${event}]\n\n${JSON.stringify(content.payload || content, null, 2)}`;
const from = originAttr(msg);
return `<webhook${from} source="${escapeXml(source)}" event="${escapeXml(event)}">${JSON.stringify(content.payload || content, null, 2)}</webhook>`;
}
function formatSystemMessage(msg: MessageInRow): string {
const content = parseContent(msg.content);
return `[SYSTEM RESPONSE]\n\nAction: ${content.action || 'unknown'}\nStatus: ${content.status || 'unknown'}\nResult: ${JSON.stringify(content.result || null)}`;
const from = originAttr(msg);
return `<system_response${from} action="${escapeXml(content.action || 'unknown')}" status="${escapeXml(content.status || 'unknown')}">${JSON.stringify(content.result || null)}</system_response>`;
}
/**

View File

@@ -3,6 +3,7 @@ import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js';
import { getUndeliveredMessages } from './db/messages-out.js';
import { getPendingMessages } from './db/messages-in.js';
import { getContinuation, setContinuation } from './db/session-state.js';
import { MockProvider } from './providers/mock.js';
import { runPollLoop } from './poll-loop.js';
@@ -74,6 +75,163 @@ describe('poll loop integration', () => {
await loopPromise.catch(() => {});
});
it('should resolve thread_id per-destination, not from global routing', async () => {
// Seed a second destination
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES ('slack-test', 'Slack Test', 'channel', 'slack', 'chan-2', NULL)`,
)
.run();
// Insert messages from each destination with distinct thread IDs
insertMessage('m-discord', { sender: 'Alice', text: 'from discord' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'discord-thread-1' });
insertMessage('m-slack', { sender: 'Bob', text: 'from slack' }, { platformId: 'chan-2', channelType: 'slack', threadId: 'slack-thread-99' });
// Agent replies to both destinations
const provider = new MockProvider({}, () =>
'<message to="discord-test">reply-d</message><message to="slack-test">reply-s</message>',
);
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length >= 2, 2000);
controller.abort();
const out = getUndeliveredMessages();
const discordOut = out.find((m) => m.platform_id === 'chan-1');
const slackOut = out.find((m) => m.platform_id === 'chan-2');
expect(discordOut).toBeDefined();
expect(discordOut!.thread_id).toBe('discord-thread-1');
expect(discordOut!.in_reply_to).toBe('m-discord');
expect(slackOut).toBeDefined();
expect(slackOut!.thread_id).toBe('slack-thread-99');
expect(slackOut!.in_reply_to).toBe('m-slack');
await loopPromise.catch(() => {});
});
it('bare text produces no outbound messages (scratchpad only)', async () => {
insertMessage('m1', { sender: 'Alice', text: 'hello' }, { platformId: 'chan-1', channelType: 'discord' });
// Agent responds with bare text — no <message to="..."> wrapping
const provider = new MockProvider({}, () => 'I am thinking about this...');
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
// Wait long enough for the poll loop to process
await sleep(1000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(0);
await loopPromise.catch(() => {});
});
it('unknown destination is dropped, valid destination is sent', async () => {
insertMessage('m1', { sender: 'Alice', text: 'hi' }, { platformId: 'chan-1', channelType: 'discord' });
const provider = new MockProvider(
{},
() => '<message to="nonexistent">dropped</message><message to="discord-test">delivered</message>',
);
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
const out = getUndeliveredMessages();
// Only the valid destination should produce output
expect(out).toHaveLength(1);
expect(JSON.parse(out[0].content).text).toBe('delivered');
expect(out[0].platform_id).toBe('chan-1');
await loopPromise.catch(() => {});
});
it('multiple <message> blocks each produce an outbound message', async () => {
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES ('slack-test', 'Slack Test', 'channel', 'slack', 'chan-2', NULL)`,
)
.run();
insertMessage('m1', { sender: 'Alice', text: 'broadcast' }, { platformId: 'chan-1', channelType: 'discord' });
const provider = new MockProvider(
{},
() => '<message to="discord-test">for discord</message><message to="slack-test">for slack</message>',
);
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length >= 2, 2000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(2);
const discord = out.find((m) => m.platform_id === 'chan-1');
const slack = out.find((m) => m.platform_id === 'chan-2');
expect(discord).toBeDefined();
expect(JSON.parse(discord!.content).text).toBe('for discord');
expect(slack).toBeDefined();
expect(JSON.parse(slack!.content).text).toBe('for slack');
await loopPromise.catch(() => {});
});
it('sends null thread_id when no prior inbound from destination', async () => {
// Seed a second destination that has NO inbound messages
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES ('slack-new', 'Slack New', 'channel', 'slack', 'chan-new', NULL)`,
)
.run();
// Only insert a message from discord — slack-new has never sent anything
insertMessage('m1', { sender: 'Alice', text: 'tell slack' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'discord-thread' });
const provider = new MockProvider({}, () => '<message to="slack-new">hello slack</message>');
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].platform_id).toBe('chan-new');
expect(out[0].thread_id).toBeNull();
await loopPromise.catch(() => {});
});
it('resolves most recent thread_id when destination has multiple inbound messages', async () => {
// Two messages from same destination, different threads
insertMessage('m-old', { sender: 'Alice', text: 'old' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'thread-old' });
insertMessage('m-new', { sender: 'Alice', text: 'new' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'thread-new' });
const provider = new MockProvider({}, () => '<message to="discord-test">reply</message>');
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].thread_id).toBe('thread-new');
expect(out[0].in_reply_to).toBe('m-new');
await loopPromise.catch(() => {});
});
it('should process messages arriving after loop starts', async () => {
const provider = new MockProvider({}, () => '<message to="discord-test">Processed</message>');
const controller = new AbortController();
@@ -91,8 +249,161 @@ describe('poll loop integration', () => {
await loopPromise.catch(() => {});
});
it('internal tags between message blocks are stripped from scratchpad', async () => {
insertMessage('m1', { sender: 'Alice', text: 'hi' }, { platformId: 'chan-1', channelType: 'discord' });
const provider = new MockProvider(
{},
() => '<internal>thinking about this...</internal><message to="discord-test">answer</message><internal>done thinking</internal>',
);
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(JSON.parse(out[0].content).text).toBe('answer');
await loopPromise.catch(() => {});
});
it('handles mixed task + chat batch with correct origin metadata', async () => {
// Seed destination for routing lookup
insertMessage('m-chat', { sender: 'Alice', text: 'check this' }, { platformId: 'chan-1', channelType: 'discord' });
// Task with same routing — simulates a scheduled task in a channel session
getInboundDb()
.prepare(
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content)
VALUES ('t-task', 'task', datetime('now'), 'pending', 'chan-1', 'discord', ?)`,
)
.run(JSON.stringify({ prompt: 'daily check' }));
const provider = new MockProvider({}, () => '<message to="discord-test">done</message>');
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].platform_id).toBe('chan-1');
await loopPromise.catch(() => {});
});
it('should inject destination reminder after a compacted event', async () => {
// Two destinations — required for the reminder to fire (single-destination
// groups have a fallback path that works without <message to="…"> wrapping).
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES ('discord-second', 'Discord Second', 'channel', 'discord', 'chan-2', NULL)`,
)
.run();
insertMessage('m1', { sender: 'Alice', text: 'First message' }, { platformId: 'chan-1', channelType: 'discord' });
const provider = new CompactingProvider();
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2500);
await waitFor(() => getUndeliveredMessages().length > 0, 2500);
controller.abort();
expect(provider.pushes.length).toBeGreaterThanOrEqual(1);
const reminder = provider.pushes.find((p) => p.includes('Context was just compacted'));
expect(reminder).toBeDefined();
expect(reminder).toContain('2 destinations');
expect(reminder).toContain('discord-test');
expect(reminder).toContain('discord-second');
expect(reminder).toContain('<message to="name">');
await loopPromise.catch(() => {});
});
it('should NOT inject destination reminder with a single destination', async () => {
insertMessage('m1', { sender: 'Alice', text: 'First message' }, { platformId: 'chan-1', channelType: 'discord' });
const provider = new CompactingProvider();
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2500);
await waitFor(() => getUndeliveredMessages().length > 0, 2500);
controller.abort();
// Only the original prompt push (if any) — no reminder, since beforeEach
// seeds exactly one destination.
const reminders = provider.pushes.filter((p) => p.includes('Context was just compacted'));
expect(reminders).toHaveLength(0);
await loopPromise.catch(() => {});
});
});
/**
* Provider that emits a single compacted event mid-stream, then returns a
* result. Captures every push() call so tests can assert on the injected
* reminder content.
*/
class CompactingProvider {
readonly supportsNativeSlashCommands = false;
readonly pushes: string[] = [];
isSessionInvalid(): boolean {
return false;
}
query(_input: { prompt: string; cwd: string }) {
const pushes = this.pushes;
let ended = false;
let aborted = false;
let resolveWaiter: (() => void) | null = null;
async function* events() {
yield { type: 'activity' as const };
yield { type: 'init' as const, continuation: 'compaction-test-session' };
yield { type: 'activity' as const };
yield { type: 'compacted' as const, text: 'Context compacted (50,000 tokens compacted).' };
// Wait for poll-loop to push the reminder (or end / abort)
await new Promise<void>((resolve) => {
resolveWaiter = resolve;
// Belt-and-braces: don't hang forever if the reminder never arrives
setTimeout(resolve, 200);
});
yield { type: 'activity' as const };
yield { type: 'result' as const, text: '<message to="discord-test">ack</message>' };
while (!ended && !aborted) {
await new Promise<void>((resolve) => {
resolveWaiter = resolve;
setTimeout(resolve, 50);
});
}
}
return {
push(message: string) {
pushes.push(message);
resolveWaiter?.();
},
end() {
ended = true;
resolveWaiter?.();
},
abort() {
aborted = true;
resolveWaiter?.();
},
events: events(),
};
}
}
// Helper: run poll loop until aborted or timeout
async function runPollLoopWithTimeout(provider: MockProvider, signal: AbortSignal, timeoutMs: number): Promise<void> {
return Promise.race([
@@ -119,3 +430,142 @@ async function waitFor(condition: () => boolean, timeoutMs: number): Promise<voi
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
describe('poll loop — provider error recovery', () => {
it('writes error to outbound and continues loop on provider throw', async () => {
insertMessage('m1', { sender: 'Alice', text: 'trigger error' }, { platformId: 'chan-1', channelType: 'discord' });
const provider = new ThrowingProvider('API rate limit exceeded');
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(JSON.parse(out[0].content).text).toContain('Error:');
expect(JSON.parse(out[0].content).text).toContain('API rate limit exceeded');
// Input message should be marked completed despite the error
const pending = getPendingMessages();
expect(pending).toHaveLength(0);
await loopPromise.catch(() => {});
});
});
describe('poll loop — stale session recovery', () => {
it('clears continuation when provider reports session invalid', async () => {
// Pre-seed a continuation so the local variable in runPollLoop is set.
// Without this, the `if (continuation && isSessionInvalid)` check skips.
setContinuation('mock', 'pre-existing-session');
insertMessage('m1', { sender: 'Alice', text: 'stale session' }, { platformId: 'chan-1', channelType: 'discord' });
const provider = new InvalidSessionProvider();
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
// Error was written to outbound
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(JSON.parse(out[0].content).text).toContain('Error:');
// Continuation was cleared (isSessionInvalid returned true)
expect(getContinuation('mock')).toBeUndefined();
await loopPromise.catch(() => {});
});
});
describe('poll loop — /clear command', () => {
it('clears session, writes confirmation, skips query', async () => {
// Seed a continuation so we can verify it gets cleared
setContinuation('mock', 'existing-session-id');
expect(getContinuation('mock')).toBe('existing-session-id');
// Insert a /clear command
getInboundDb()
.prepare(
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content)
VALUES ('m-clear', 'chat', datetime('now'), 'pending', 'chan-1', 'discord', ?)`,
)
.run(JSON.stringify({ text: '/clear' }));
const provider = new MockProvider({}, () => '<message to="discord-test">should not run</message>');
const controller = new AbortController();
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
controller.abort();
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(JSON.parse(out[0].content).text).toBe('Session cleared.');
// Continuation was cleared
expect(getContinuation('mock')).toBeUndefined();
// Command message was completed
const pending = getPendingMessages();
expect(pending).toHaveLength(0);
await loopPromise.catch(() => {});
});
});
/**
* Provider that throws on every query, simulating API failures.
*/
class ThrowingProvider {
readonly supportsNativeSlashCommands = false;
private errorMessage: string;
constructor(errorMessage: string) {
this.errorMessage = errorMessage;
}
isSessionInvalid(): boolean {
return false;
}
query(_input: { prompt: string; cwd: string }) {
const errorMessage = this.errorMessage;
return {
push() {},
end() {},
abort() {},
events: (async function* () {
throw new Error(errorMessage);
})(),
};
}
}
/**
* Provider that throws with an error that triggers isSessionInvalid.
* First emits an init event (setting continuation), then throws.
*/
class InvalidSessionProvider {
readonly supportsNativeSlashCommands = false;
isSessionInvalid(): boolean {
return true;
}
query(_input: { prompt: string; cwd: string }) {
return {
push() {},
end() {},
abort() {},
events: (async function* () {
yield { type: 'init' as const, continuation: 'doomed-session' };
throw new Error('session not found');
})(),
};
}
}

View File

@@ -0,0 +1,50 @@
/**
* Tests for the core MCP tools' interaction with the per-batch routing
* context. The agent-runner sets a current `inReplyTo` at the top of each
* batch in poll-loop, and outbound writes from MCP tools (send_message,
* send_file) must pick it up so a2a return-path routing on the host can
* correlate replies back to the originating session.
*/
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
import { initTestSessionDb, closeSessionDb, getInboundDb } from '../db/connection.js';
import { getUndeliveredMessages } from '../db/messages-out.js';
import { setCurrentInReplyTo, clearCurrentInReplyTo } from '../current-batch.js';
import { sendMessage } from './core.js';
beforeEach(() => {
initTestSessionDb();
// Seed a peer agent destination
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES ('peer', 'Peer', 'agent', NULL, NULL, 'ag-peer')`,
)
.run();
});
afterEach(() => {
clearCurrentInReplyTo();
closeSessionDb();
});
describe('send_message MCP tool — in_reply_to plumbing', () => {
it('stamps current batch in_reply_to on outbound rows', async () => {
setCurrentInReplyTo('inbound-msg-1');
await sendMessage.handler({ to: 'peer', text: 'hello' });
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].in_reply_to).toBe('inbound-msg-1');
});
it('writes null when no batch is active', async () => {
// No setCurrentInReplyTo before this call — simulates ad-hoc / out-of-batch invocation.
await sendMessage.handler({ to: 'peer', text: 'hello' });
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].in_reply_to).toBeNull();
});
});

View File

@@ -9,6 +9,7 @@
import fs from 'fs';
import path from 'path';
import { getCurrentInReplyTo } from '../current-batch.js';
import { findByName, getAllDestinations } from '../destinations.js';
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
import { getSessionRouting } from '../db/session-routing.js';
@@ -50,9 +51,7 @@ function destinationList(): string {
*/
function resolveRouting(
to: string | undefined,
):
| { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string }
| { error: string } {
): { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } | { error: string } {
if (!to) {
// Default: reply to whatever thread/channel this session is bound to.
const session = getSessionRouting();
@@ -82,9 +81,7 @@ function resolveRouting(
// preserve the thread_id so replies land in the correct thread.
const session = getSessionRouting();
const threadId =
session.channel_type === dest.channelType && session.platform_id === dest.platformId
? session.thread_id
: null;
session.channel_type === dest.channelType && session.platform_id === dest.platformId ? session.thread_id : null;
return {
channel_type: dest.channelType!,
platform_id: dest.platformId!,
@@ -98,12 +95,14 @@ function resolveRouting(
export const sendMessage: McpToolDefinition = {
tool: {
name: 'send_message',
description:
'Send a message to a named destination. If you have only one destination, you can omit `to`.',
description: 'Send a message to a named destination. If you have only one destination, you can omit `to`.',
inputSchema: {
type: 'object' as const,
properties: {
to: { type: 'string', description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.' },
to: {
type: 'string',
description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.',
},
text: { type: 'string', description: 'Message content' },
},
required: ['text'],
@@ -119,6 +118,7 @@ export const sendMessage: McpToolDefinition = {
const id = generateId();
const seq = writeMessageOut({
id,
in_reply_to: getCurrentInReplyTo(),
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,
@@ -165,6 +165,7 @@ export const sendFile: McpToolDefinition = {
writeMessageOut({
id,
in_reply_to: getCurrentInReplyTo(),
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,

View File

@@ -47,7 +47,7 @@ describe('formatter', () => {
insertMessage('m1', 'task', { prompt: 'Review open PRs' });
const messages = getPendingMessages();
const prompt = formatMessages(messages);
expect(prompt).toContain('[SCHEDULED TASK]');
expect(prompt).toContain('<task');
expect(prompt).toContain('Review open PRs');
});
@@ -55,15 +55,17 @@ describe('formatter', () => {
insertMessage('m1', 'webhook', { source: 'github', event: 'push', payload: { ref: 'main' } });
const messages = getPendingMessages();
const prompt = formatMessages(messages);
expect(prompt).toContain('[WEBHOOK: github/push]');
expect(prompt).toContain('<webhook');
expect(prompt).toContain('source="github"');
expect(prompt).toContain('event="push"');
});
it('should format system messages', () => {
insertMessage('m1', 'system', { action: 'register_group', status: 'success', result: { id: 'ag-1' } });
const messages = getPendingMessages();
const prompt = formatMessages(messages);
expect(prompt).toContain('[SYSTEM RESPONSE]');
expect(prompt).toContain('register_group');
expect(prompt).toContain('<system_response');
expect(prompt).toContain('action="register_group"');
});
it('should handle mixed kinds', () => {
@@ -72,7 +74,7 @@ describe('formatter', () => {
const messages = getPendingMessages();
const prompt = formatMessages(messages);
expect(prompt).toContain('sender="John"');
expect(prompt).toContain('[SYSTEM RESPONSE]');
expect(prompt).toContain('<system_response');
});
it('should escape XML in content', () => {
@@ -147,6 +149,76 @@ describe('routing', () => {
});
});
describe('origin metadata (from= attribute)', () => {
function seedDestination(name: string, channelType: string, platformId: string): void {
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES (?, ?, 'channel', ?, ?, NULL)`,
)
.run(name, name, channelType, platformId);
}
function insertWithRouting(id: string, kind: string, content: object, channelType: string | null, platformId: string | null): void {
getInboundDb()
.prepare(
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content)
VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?)`,
)
.run(id, kind, platformId, channelType, JSON.stringify(content));
}
it('chat message includes from= when destination matches', () => {
seedDestination('discord-main', 'discord', 'chan-1');
insertWithRouting('m1', 'chat', { sender: 'Alice', text: 'hi' }, 'discord', 'chan-1');
const prompt = formatMessages(getPendingMessages());
expect(prompt).toContain('from="discord-main"');
});
it('chat message falls back to raw routing when no destination matches', () => {
insertWithRouting('m1', 'chat', { sender: 'Alice', text: 'hi' }, 'telegram', 'chat-999');
const prompt = formatMessages(getPendingMessages());
expect(prompt).toContain('from="unknown:telegram:chat-999"');
});
it('chat message omits from= when routing is null', () => {
insertMessage('m1', 'chat', { sender: 'Alice', text: 'hi' });
const prompt = formatMessages(getPendingMessages());
expect(prompt).not.toContain('from=');
});
it('task message includes from= when destination matches', () => {
seedDestination('slack-ops', 'slack', 'C-OPS');
insertWithRouting('t1', 'task', { prompt: 'check status' }, 'slack', 'C-OPS');
const prompt = formatMessages(getPendingMessages());
expect(prompt).toContain('<task');
expect(prompt).toContain('from="slack-ops"');
});
it('task message omits from= when routing is null', () => {
insertMessage('t1', 'task', { prompt: 'check status' });
const prompt = formatMessages(getPendingMessages());
expect(prompt).toContain('<task');
expect(prompt).not.toContain('from=');
});
it('webhook message includes from= when destination matches', () => {
seedDestination('github-ch', 'github', 'repo-1');
insertWithRouting('w1', 'webhook', { source: 'github', event: 'push', payload: {} }, 'github', 'repo-1');
const prompt = formatMessages(getPendingMessages());
expect(prompt).toContain('<webhook');
expect(prompt).toContain('from="github-ch"');
});
it('system message includes from= when destination matches', () => {
seedDestination('discord-main', 'discord', 'chan-1');
insertWithRouting('s1', 'system', { action: 'test', status: 'ok', result: null }, 'discord', 'chan-1');
const prompt = formatMessages(getPendingMessages());
expect(prompt).toContain('<system_response');
expect(prompt).toContain('from="discord-main"');
});
});
describe('mock provider', () => {
it('should produce init + result events', async () => {
const provider = new MockProvider({}, (prompt) => `Echo: ${prompt}`);

View File

@@ -1,13 +1,18 @@
import { findByName, getAllDestinations, type DestinationEntry } from './destinations.js';
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
import { writeMessageOut } from './db/messages-out.js';
import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
import { clearContinuation, migrateLegacyContinuation, setContinuation } from './db/session-state.js';
import { clearCurrentInReplyTo, setCurrentInReplyTo } from './current-batch.js';
import {
clearContinuation,
migrateLegacyContinuation,
setContinuation,
} from './db/session-state.js';
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, isRunnerCommand, stripInternalTags, type RoutingContext } from './formatter.js';
formatMessages,
extractRouting,
categorizeMessage,
isClearCommand,
isRunnerCommand,
stripInternalTags,
type RoutingContext,
} from './formatter.js';
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
const POLL_INTERVAL_MS = 1000;
@@ -170,6 +175,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
// Process the query while concurrently polling for new messages
const skippedSet = new Set(skipped);
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
// Publish the batch's in_reply_to so MCP tools (send_message, send_file)
// can stamp it on outbound rows — needed for a2a return-path routing.
setCurrentInReplyTo(routing.inReplyTo);
try {
const result = await processQuery(query, routing, processingIds, config.providerName);
if (result.continuation && result.continuation !== continuation) {
@@ -198,6 +206,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
thread_id: routing.threadId,
content: JSON.stringify({ text: `Error: ${errMsg}` }),
});
} finally {
clearCurrentInReplyTo();
}
// Ensure completed even if processQuery ended without a result event
@@ -366,6 +376,23 @@ async function processQuery(
if (event.text) {
dispatchResultText(event.text, routing);
}
} else if (event.type === 'compacted') {
// The SDK auto-compacted the conversation. After compaction the
// model often drops the learned `<message to="…">` wrapping
// discipline (the destinations are still in the system prompt,
// but the behavioral pattern is summarized away). Inject a
// reminder back into the live query so the next turn re-anchors
// on the destination model. Only do this when there's >1
// destination — single-destination groups have a fallback that
// works without wrapping. See qwibitai/nanoclaw#2325.
const destinations = getAllDestinations();
if (destinations.length > 1) {
const names = destinations.map((d) => d.name).join(', ');
query.push(
`[system] Context was just compacted. Reminder: you have ${destinations.length} destinations (${names}). ` +
`Use <message to="name"> blocks to address them. Bare text goes to the scratchpad fallback only.`,
);
}
}
}
} finally {
@@ -385,25 +412,26 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`);
break;
case 'error':
log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`);
log(
`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`,
);
break;
case 'progress':
log(`Progress: ${event.message}`);
break;
case 'compacted':
log(`Compacted: ${event.text}`);
break;
}
}
/**
* Parse the agent's final text for <message to="name">...</message> blocks
* and dispatch each one to its resolved destination. Text outside of blocks
* (including <internal>...</internal>) is normally scratchpad — logged but
* not sent.
* (including <internal>...</internal>) is scratchpad — logged but not sent.
*
* Single-destination shortcut: if the agent has exactly one configured
* destination AND the output contains zero <message> blocks, the entire
* cleaned text (with <internal> tags stripped) is sent to that destination.
* This preserves the simple case of one user on one channel — the agent
* doesn't need to know about wrapping syntax at all.
* The agent must always wrap output in <message to="name">...</message>
* blocks, even with a single destination. Bare text is scratchpad only.
*/
function dispatchResultText(text: string, routing: RoutingContext): void {
const MESSAGE_RE = /<message\s+to="([^"]+)"\s*>([\s\S]*?)<\/message>/g;
@@ -436,30 +464,6 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
const scratchpad = stripInternalTags(scratchpadParts.join(''));
// Single-destination shortcut: the agent wrote plain text — send to
// the session's originating channel (from session_routing) if available,
// otherwise fall back to the single destination.
if (sent === 0 && scratchpad) {
if (routing.channelType && routing.platformId) {
// Reply to the channel/thread the message came from
writeMessageOut({
id: generateId(),
in_reply_to: routing.inReplyTo,
kind: 'chat',
platform_id: routing.platformId,
channel_type: routing.channelType,
thread_id: routing.threadId,
content: JSON.stringify({ text: scratchpad }),
});
return;
}
const all = getAllDestinations();
if (all.length === 1) {
sendToDestination(all[0], scratchpad, routing);
return;
}
}
if (scratchpad) {
log(`[scratchpad] ${scratchpad.slice(0, 500)}${scratchpad.length > 500 ? '…' : ''}`);
}
@@ -472,20 +476,46 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void {
const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!;
const channelType = dest.type === 'channel' ? dest.channelType! : 'agent';
// Inherit thread_id from the inbound routing context so replies land in the
// same thread the conversation is in. For non-threaded adapters the router
// strips thread_id at ingest, so this will already be null.
// Resolve thread_id per-destination from the most recent inbound message
// that came from this same channel+platform. In agent-shared sessions,
// different destinations have different thread contexts — using a single
// routing.threadId would stamp one channel's thread onto another.
const destRouting = resolveDestinationThread(channelType, platformId);
writeMessageOut({
id: generateId(),
in_reply_to: routing.inReplyTo,
in_reply_to: destRouting?.inReplyTo ?? routing.inReplyTo,
kind: 'chat',
platform_id: platformId,
channel_type: channelType,
thread_id: routing.threadId,
thread_id: destRouting?.threadId ?? null,
content: JSON.stringify({ text: body }),
});
}
/**
* Find the thread_id and message id from the most recent inbound message
* matching the given channel+platform. Returns null if no match found.
*/
function resolveDestinationThread(
channelType: string,
platformId: string,
): { threadId: string | null; inReplyTo: string | null } | null {
try {
const db = getInboundDb();
const row = db
.prepare(
`SELECT thread_id, id FROM messages_in
WHERE channel_type = ? AND platform_id = ?
ORDER BY seq DESC LIMIT 1`,
)
.get(channelType, platformId) as { thread_id: string | null; id: string } | undefined;
if (row) return { threadId: row.thread_id, inReplyTo: row.id };
} catch (err) {
log(`resolveDestinationThread error: ${err instanceof Error ? err.message : String(err)}`);
}
return null;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -329,7 +329,7 @@ export class ClaudeProvider implements AgentProvider {
} else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'compact_boundary') {
const meta = (message as { compact_metadata?: { pre_tokens?: number } }).compact_metadata;
const detail = meta?.pre_tokens ? ` (${meta.pre_tokens.toLocaleString()} tokens compacted)` : '';
yield { type: 'result', text: `Context compacted${detail}.` };
yield { type: 'compacted', text: `Context compacted${detail}.` };
} else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'task_notification') {
const tn = message as { summary?: string };
yield { type: 'progress', message: tn.summary || 'Task notification' };

View File

@@ -79,4 +79,12 @@ export type ProviderEvent =
* event (tool call, thinking, partial message, anything) so the
* poll-loop's idle timer stays honest during long tool runs.
*/
| { type: 'activity' };
| { type: 'activity' }
/**
* The provider's underlying SDK auto-compacted the conversation context.
* The poll-loop reacts by injecting a destination reminder back into
* the live query so the agent doesn't drop `<message to="…">` wrapping
* after compaction. Distinct from `result` so it doesn't mark the turn
* completed or get dispatched as a chat message. See qwibitai/nanoclaw#2325.
*/
| { type: 'compacted'; text: string };