fix(agent-runner): require explicit destination addressing, fix per-destination threading
The poll loop had a bare-text routing fallback in dispatchResultText: when the agent produced text without <message to="..."> wrapping, it would auto- route to the session's originating channel (via a frozen RoutingContext) or to the single configured destination. This caused three problems: 1. Routing drift: RoutingContext was extracted once from the initial batch and never refreshed. When the initial batch was a null-routed cron task and a real chat arrived mid-query, replies were silently dropped to scratchpad because the frozen routing had all-null fields. 2. Cross-channel thread bleed: sendToDestination applied a single routing.threadId to every outbound message regardless of destination. In agent-shared sessions (multiple channels sharing one session), one channel's thread ID was stamped onto messages to a different channel. 3. Inconsistent formatting: task, webhook, and system messages had no origin metadata in their formatted output, so the agent couldn't tell which destination they came from — even when the underlying messages_in rows carried routing fields. Changes: - Remove the bare-text routing fallbacks in dispatchResultText (both the routing-based and single-destination shortcuts). All agent output must be wrapped in <message to="name">...</message>. Bare text is scratchpad. - Update buildDestinationsSection() to require explicit wrapping for all groups, including single-destination. No more "no special wrapping needed" shortcut. - Resolve thread_id per-destination via resolveDestinationThread(), which queries messages_in for the most recent message matching the target channel+platform. Falls back to null (top-level channel message) when no prior inbound exists for that destination. - Extract originAttr() helper in formatter.ts and apply it to all message types. Tasks now render as <task from="dest" time="...">, webhooks as <webhook from="dest" source="..." event="...">, system responses as <system_response from="dest" ...>. The agent always sees where a message originated. - Add a PreCompact shell hook (compact-instructions.ts) that outputs custom compaction instructions, telling the compactor to preserve recent message XML structure and routing metadata in the summary. Wired via settings.json in the .claude-shared scaffold, with a migration path (ensurePreCompactHook) for existing groups. Relation to open PRs: - #2277 (mergeRouting) becomes unnecessary — the routing fallback it patches no longer exists. Can be closed. - #2327 (post-compaction destination reminder) is complementary — it handles the post-compaction push, this handles pre-compaction instructions. Both can merge independently. - #2328 (default routing instruction) is complementary — it adds "reply to the from= destination" guidance to the multi-destination section. Compatible with the unified instruction format here. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { findByName, getAllDestinations, type DestinationEntry } from './destinations.js';
|
||||
import { findByName, type DestinationEntry } from './destinations.js';
|
||||
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
|
||||
import { writeMessageOut } from './db/messages-out.js';
|
||||
import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
||||
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
||||
import {
|
||||
clearContinuation,
|
||||
migrateLegacyContinuation,
|
||||
@@ -396,14 +396,10 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
|
||||
/**
|
||||
* Parse the agent's final text for <message to="name">...</message> blocks
|
||||
* and dispatch each one to its resolved destination. Text outside of blocks
|
||||
* (including <internal>...</internal>) is normally scratchpad — logged but
|
||||
* not sent.
|
||||
* (including <internal>...</internal>) is scratchpad — logged but not sent.
|
||||
*
|
||||
* Single-destination shortcut: if the agent has exactly one configured
|
||||
* destination AND the output contains zero <message> blocks, the entire
|
||||
* cleaned text (with <internal> tags stripped) is sent to that destination.
|
||||
* This preserves the simple case of one user on one channel — the agent
|
||||
* doesn't need to know about wrapping syntax at all.
|
||||
* The agent must always wrap output in <message to="name">...</message>
|
||||
* blocks, even with a single destination. Bare text is scratchpad only.
|
||||
*/
|
||||
function dispatchResultText(text: string, routing: RoutingContext): void {
|
||||
const MESSAGE_RE = /<message\s+to="([^"]+)"\s*>([\s\S]*?)<\/message>/g;
|
||||
@@ -436,30 +432,6 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
|
||||
|
||||
const scratchpad = stripInternalTags(scratchpadParts.join(''));
|
||||
|
||||
// Single-destination shortcut: the agent wrote plain text — send to
|
||||
// the session's originating channel (from session_routing) if available,
|
||||
// otherwise fall back to the single destination.
|
||||
if (sent === 0 && scratchpad) {
|
||||
if (routing.channelType && routing.platformId) {
|
||||
// Reply to the channel/thread the message came from
|
||||
writeMessageOut({
|
||||
id: generateId(),
|
||||
in_reply_to: routing.inReplyTo,
|
||||
kind: 'chat',
|
||||
platform_id: routing.platformId,
|
||||
channel_type: routing.channelType,
|
||||
thread_id: routing.threadId,
|
||||
content: JSON.stringify({ text: scratchpad }),
|
||||
});
|
||||
return;
|
||||
}
|
||||
const all = getAllDestinations();
|
||||
if (all.length === 1) {
|
||||
sendToDestination(all[0], scratchpad, routing);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (scratchpad) {
|
||||
log(`[scratchpad] ${scratchpad.slice(0, 500)}${scratchpad.length > 500 ? '…' : ''}`);
|
||||
}
|
||||
@@ -472,20 +444,46 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
|
||||
function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void {
|
||||
const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!;
|
||||
const channelType = dest.type === 'channel' ? dest.channelType! : 'agent';
|
||||
// Inherit thread_id from the inbound routing context so replies land in the
|
||||
// same thread the conversation is in. For non-threaded adapters the router
|
||||
// strips thread_id at ingest, so this will already be null.
|
||||
// Resolve thread_id per-destination from the most recent inbound message
|
||||
// that came from this same channel+platform. In agent-shared sessions,
|
||||
// different destinations have different thread contexts — using a single
|
||||
// routing.threadId would stamp one channel's thread onto another.
|
||||
const destRouting = resolveDestinationThread(channelType, platformId);
|
||||
writeMessageOut({
|
||||
id: generateId(),
|
||||
in_reply_to: routing.inReplyTo,
|
||||
in_reply_to: destRouting?.inReplyTo ?? routing.inReplyTo,
|
||||
kind: 'chat',
|
||||
platform_id: platformId,
|
||||
channel_type: channelType,
|
||||
thread_id: routing.threadId,
|
||||
thread_id: destRouting?.threadId ?? null,
|
||||
content: JSON.stringify({ text: body }),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the thread_id and message id from the most recent inbound message
|
||||
* matching the given channel+platform. Returns null if no match found.
|
||||
*/
|
||||
function resolveDestinationThread(
|
||||
channelType: string,
|
||||
platformId: string,
|
||||
): { threadId: string | null; inReplyTo: string | null } | null {
|
||||
try {
|
||||
const db = getInboundDb();
|
||||
const row = db
|
||||
.prepare(
|
||||
`SELECT thread_id, id FROM messages_in
|
||||
WHERE channel_type = ? AND platform_id = ?
|
||||
ORDER BY seq DESC LIMIT 1`,
|
||||
)
|
||||
.get(channelType, platformId) as { thread_id: string | null; id: string } | undefined;
|
||||
if (row) return { threadId: row.thread_id, inReplyTo: row.id };
|
||||
} catch {
|
||||
// Fall through — DB may not have these columns on older sessions
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user