feat(router,cli): replyTo override + CLI admin-transport flows

- InboundEvent gains an optional replyTo; router stamps the row's address
  fields from it when set, so replies can route to a different channel than
  the one the inbound came in on.
- ChannelSetup adds onInboundEvent for admin-transport adapters that build
  the full event themselves.
- CLI wire format accepts {text, to, reply_to}. Routed messages go through
  onInboundEvent and do not evict an active chat client.
- init-first-agent hands the DM welcome to the running service via
  data/cli.sock — synchronous wake, no sweep wait. Fails loudly if the
  service is down; no silent fallback.
- Split the CLI scratch-agent bootstrap into scripts/init-cli-agent.ts;
  init-first-agent is DM-only.

Agents cannot set replyTo: it lives only on the inbound/router seam and is
consumed once when writing messages_in.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-20 23:30:47 +03:00
parent dadf258136
commit 6c26c0413a
15 changed files with 503 additions and 213 deletions

View File

@@ -10,6 +10,14 @@ export interface ChannelSetup {
/** Called when an inbound message arrives from the platform. */
onInbound(platformId: string, threadId: string | null, message: InboundMessage): void | Promise<void>;
/**
* Called by admin-transport adapters (CLI) that want to route a message to
* an arbitrary channel/platform and optionally redirect replies elsewhere.
* Regular chat adapters should use `onInbound`; `onInboundEvent` skips the
* adapter-channel-type injection so the caller can target any wired mg.
*/
onInboundEvent(event: InboundEvent): void | Promise<void>;
/** Called when the adapter discovers metadata about a conversation. */
onMetadata(platformId: string, name?: string, isGroup?: boolean): void;
@@ -17,6 +25,41 @@ export interface ChannelSetup {
onAction(questionId: string, selectedOption: string, userId: string): void;
}
/** Delivery address used for reply-to overrides and (normally) the inbound's own origin. */
export interface DeliveryAddress {
channelType: string;
platformId: string;
threadId: string | null;
}
/**
* Full inbound event handed to the router.
*
* `channelType` + `platformId` + `threadId` identify which messaging group /
* session receives the message. `replyTo`, when set, overrides where the
* agent's reply is delivered — used by the CLI admin transport when the
* operator wants a message routed to one channel but replies echoed back to
* their terminal. Agents cannot set `replyTo`; it is a router-layer concept
* set only by external adapters carrying operator intent.
*/
export interface InboundEvent {
channelType: string;
platformId: string;
threadId: string | null;
message: {
id: string;
kind: 'chat' | 'chat-sdk';
content: string; // JSON blob
timestamp: string;
/**
* Platform-confirmed bot-mention signal forwarded from the adapter.
* See InboundMessage.isMention for the full explanation.
*/
isMention?: boolean;
};
replyTo?: DeliveryAddress;
}
/** Inbound message from adapter to host. */
export interface InboundMessage {
id: string;

View File

@@ -105,6 +105,7 @@ describe('channel registry', () => {
await initChannelAdapters(() => ({
conversations: [],
onInbound: () => {},
onInboundEvent: () => {},
onMetadata: () => {},
onAction: () => {},
}));
@@ -208,6 +209,7 @@ describe('channel + router integration', () => {
await initChannelAdapters(() => ({
conversations: [],
onInbound: () => {},
onInboundEvent: () => {},
onMetadata: () => {},
onAction: () => {},
}));

View File

@@ -7,19 +7,31 @@
* the normal router/delivery path like any other adapter — `/clear` and
* other session-level commands work identically.
*
* MVP shape:
* - One hardcoded messaging_group: `cli/local`. Wired to one agent via
* the setup flow (see `scripts/init-first-agent.ts`). Multi-agent
* support can add per-agent messaging_groups later without breaking
* the wire protocol.
* - Single connected client at a time. A second connection closes the
* first with a "superseded" notice.
* - Wire format: one JSON object per line.
* Client → server: { "text": "user message" }
* Server → client: { "text": "agent reply" }
* - deliver() silently no-ops when no client is connected. The outbound
* row is already in outbound.db, so the message isn't lost — it just
* doesn't reach this run's terminal. Reconnect to see subsequent replies.
* Wire format: one JSON object per line.
*
* Client → server:
* { "text": "user message" } # default — talk to cli/local
* { "text": "...", "to": {"channelType": "discord",
* "platformId": "discord:@me:149...",
* "threadId": null} } # route to a specific mg
* { "text": "...", "to": {...}, "reply_to": {...} } # + redirect replies
* Server → client:
* { "text": "agent reply" }
*
* The `to` and `reply_to` addressing is how admin transports (the bootstrap
* script) inject messages targeting any wired channel. `reply_to` is a
* router-layer concept — agents cannot set it; it is carried only on
* inbound events from CLI clients that hold operator privilege (the socket
* is chmod 0600, so "connected to this socket" ≈ "is the owner").
*
* Single-client chat semantics: one connected terminal at a time. A second
* "chat" connection closes the first with a "superseded" notice. Admin
* route-opcode connections (`to` set) are one-shot and do NOT evict an
* active chat client.
*
* deliver() silently no-ops when no client is connected. The outbound row
* is already in outbound.db, so the message isn't lost — it just doesn't
* reach this run's terminal. Reconnect to see subsequent replies.
*/
import fs from 'fs';
import net from 'net';
@@ -30,7 +42,8 @@ import { log } from '../log.js';
import type {
ChannelAdapter,
ChannelSetup,
InboundMessage,
DeliveryAddress,
InboundEvent,
OutboundMessage,
} from './adapter.js';
import { registerChannelAdapter } from './channel-registry.js';
@@ -129,16 +142,25 @@ function createAdapter(): ChannelAdapter {
};
function handleConnection(socket: net.Socket, config: ChannelSetup): void {
if (client) {
try {
client.write(JSON.stringify({ text: '[superseded by a newer client]' }) + '\n');
client.end();
} catch {
// swallow
// Defer the chat-slot swap until we see the first line — if it turns out
// to be a routed (`to`-bearing) one-shot, we leave the existing chat
// client in place. Only plain chat connections participate in supersede.
let claimedChatSlot = false;
const claimChatSlot = () => {
if (claimedChatSlot) return;
claimedChatSlot = true;
if (client && client !== socket) {
try {
client.write(JSON.stringify({ text: '[superseded by a newer client]' }) + '\n');
client.end();
} catch {
// swallow
}
}
}
client = socket;
log.info('CLI client connected');
client = socket;
log.info('CLI client connected');
};
let buffer = '';
socket.on('data', (chunk) => {
@@ -148,13 +170,13 @@ function createAdapter(): ChannelAdapter {
const line = buffer.slice(0, idx).trim();
buffer = buffer.slice(idx + 1);
if (!line) continue;
void handleLine(line, config);
void handleLine(line, config, claimChatSlot);
}
});
socket.on('close', () => {
if (client === socket) client = null;
log.info('CLI client disconnected');
if (claimedChatSlot) log.info('CLI client disconnected');
});
socket.on('error', (err) => {
@@ -162,8 +184,16 @@ function createAdapter(): ChannelAdapter {
});
}
async function handleLine(line: string, config: ChannelSetup): Promise<void> {
let payload: { text?: unknown };
async function handleLine(
line: string,
config: ChannelSetup,
claimChatSlot: () => void,
): Promise<void> {
let payload: {
text?: unknown;
to?: unknown;
reply_to?: unknown;
};
try {
payload = JSON.parse(line);
} catch (err) {
@@ -172,23 +202,73 @@ function createAdapter(): ChannelAdapter {
}
if (typeof payload.text !== 'string' || payload.text.length === 0) return;
const inbound: InboundMessage = {
id: `cli-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
kind: 'chat',
timestamp: new Date().toISOString(),
content: {
text: payload.text,
sender: 'cli',
senderId: `cli:${PLATFORM_ID}`,
},
};
const to = parseAddress(payload.to);
const replyTo = parseAddress(payload.reply_to);
if (to) {
// Routed message — admin transport. Build a full InboundEvent targeting
// `to`'s channel/platform, and let `reply_to` (if any) redirect replies.
// Does NOT claim the chat slot, so an active terminal chat isn't evicted.
const event: InboundEvent = {
channelType: to.channelType,
platformId: to.platformId,
threadId: to.threadId,
message: {
id: `cli-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
kind: 'chat',
timestamp: new Date().toISOString(),
content: JSON.stringify({
text: payload.text,
sender: 'cli',
senderId: `cli:${PLATFORM_ID}`,
}),
},
replyTo: replyTo ?? undefined,
};
try {
await config.onInboundEvent(event);
} catch (err) {
log.error('CLI: onInboundEvent threw', { err });
}
return;
}
// Plain chat — claim the slot (evicting any prior client) and route via
// the standard onInbound path (adapter injects its own channelType).
claimChatSlot();
try {
await config.onInbound(PLATFORM_ID, null, inbound);
await config.onInbound(PLATFORM_ID, null, {
id: `cli-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
kind: 'chat',
timestamp: new Date().toISOString(),
content: {
text: payload.text,
sender: 'cli',
senderId: `cli:${PLATFORM_ID}`,
},
});
} catch (err) {
log.error('CLI: onInbound threw', { err });
}
}
function parseAddress(raw: unknown): DeliveryAddress | null {
if (!raw || typeof raw !== 'object') return null;
const obj = raw as Record<string, unknown>;
if (typeof obj.channelType !== 'string' || typeof obj.platformId !== 'string') return null;
const threadId =
obj.threadId === null || obj.threadId === undefined
? null
: typeof obj.threadId === 'string'
? obj.threadId
: null;
return {
channelType: obj.channelType,
platformId: obj.platformId,
threadId,
};
}
return adapter;
}

View File

@@ -25,7 +25,7 @@ import {
outboundDbPath,
} from './session-manager.js';
import { getSession, findSession } from './db/sessions.js';
import type { InboundEvent } from './router.js';
import type { InboundEvent } from './channels/adapter.js';
// Mock container runner to prevent actual Docker spawning
vi.mock('./container-runner.js', () => ({

View File

@@ -86,6 +86,15 @@ async function main(): Promise<void> {
log.error('Failed to route inbound message', { channelType: adapter.channelType, err });
});
},
onInboundEvent(event) {
routeInbound(event).catch((err) => {
log.error('Failed to route inbound event', {
sourceAdapter: adapter.channelType,
targetChannelType: event.channelType,
err,
});
});
},
onMetadata(platformId, name, isGroup) {
log.info('Channel metadata discovered', {
channelType: adapter.channelType,

View File

@@ -57,6 +57,7 @@ async function mountMockAdapter(
await initChannelAdapters(() => ({
conversations: [],
onInbound: () => {},
onInboundEvent: () => {},
onMetadata: () => {},
onAction: () => {},
}));

View File

@@ -41,7 +41,7 @@ import { getAllAgentGroups } from '../../db/agent-groups.js';
import { getMessagingGroup } from '../../db/messaging-groups.js';
import { getDeliveryAdapter } from '../../delivery.js';
import { log } from '../../log.js';
import type { InboundEvent } from '../../router.js';
import type { InboundEvent } from '../../channels/adapter.js';
import { pickApprovalDelivery, pickApprover } from '../approvals/primitive.js';
import { createPendingChannelApproval, hasInFlightChannelApproval } from './db/pending-channel-approvals.js';

View File

@@ -27,8 +27,8 @@ import {
setSenderResolver,
setSenderScopeGate,
type AccessGateResult,
type InboundEvent,
} from '../../router.js';
import type { InboundEvent } from '../../channels/adapter.js';
import { registerResponseHandler, type ResponsePayload } from '../../response-registry.js';
import { log } from '../../log.js';
import type { MessagingGroup, MessagingGroupAgent } from '../../types.js';

View File

@@ -60,6 +60,7 @@ async function mountMockAdapter(
await initChannelAdapters(() => ({
conversations: [],
onInbound: () => {},
onInboundEvent: () => {},
onMetadata: () => {},
onAction: () => {},
}));

View File

@@ -30,7 +30,7 @@ import { normalizeOptions, type RawOption } from '../../channels/ask-question.js
import { getMessagingGroup } from '../../db/messaging-groups.js';
import { getDeliveryAdapter } from '../../delivery.js';
import { log } from '../../log.js';
import type { InboundEvent } from '../../router.js';
import type { InboundEvent } from '../../channels/adapter.js';
import { pickApprovalDelivery, pickApprover } from '../approvals/primitive.js';
import { createPendingSenderApproval, hasInFlightSenderApproval } from './db/pending-sender-approvals.js';

View File

@@ -32,32 +32,12 @@ import { resolveSession, writeSessionMessage } from './session-manager.js';
import { wakeContainer } from './container-runner.js';
import { getSession } from './db/sessions.js';
import type { AgentGroup, MessagingGroup, MessagingGroupAgent } from './types.js';
import type { InboundEvent } from './channels/adapter.js';
function generateId(): string {
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
export interface InboundEvent {
channelType: string;
platformId: string;
threadId: string | null;
message: {
id: string;
kind: 'chat' | 'chat-sdk';
content: string; // JSON blob
timestamp: string;
/**
* Platform-confirmed bot-mention signal forwarded from the adapter.
* When defined, it's authoritative — use this instead of text-matching
* agent_group_name, which breaks on platforms where the mention token
* is the bot's platform username (e.g. Telegram). undefined means the
* adapter doesn't provide the signal; evaluateEngage falls back to
* agent-name regex.
*/
isMention?: boolean;
};
}
/**
* Sender-resolver hook. Runs before agent resolution.
*
@@ -408,13 +388,23 @@ async function deliverToAgent(
const { session, created } = resolveSession(agent.agent_group_id, mg.id, event.threadId, effectiveSessionMode);
// The inbound row's (channel_type, platform_id, thread_id) is the address
// the agent's reply will be delivered to. Normally it mirrors the source
// (stamped from the event). When the caller supplied `replyTo` (CLI admin
// transport acting on operator intent), the reply is redirected there.
const deliveryAddr = event.replyTo ?? {
channelType: event.channelType,
platformId: event.platformId,
threadId: event.threadId,
};
writeSessionMessage(session.agent_group_id, session.id, {
id: messageIdForAgent(event.message.id, agent.agent_group_id),
kind: event.message.kind,
timestamp: event.message.timestamp,
platformId: event.platformId,
channelType: event.channelType,
threadId: event.threadId,
platformId: deliveryAddr.platformId,
channelType: deliveryAddr.channelType,
threadId: deliveryAddr.threadId,
content: event.message.content,
trigger: wake ? 1 : 0,
});