The router hardcoded is_group=0 when auto-creating messaging groups, causing channel mentions to be misclassified as DMs. The Chat SDK bridge knows which handler fired (onDirectMessage vs onNewMention) so thread the signal through InboundMessage → InboundEvent → router. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
468 lines
18 KiB
TypeScript
468 lines
18 KiB
TypeScript
/**
|
|
* Inbound message routing.
|
|
*
|
|
* Channel adapter event → resolve messaging group → sender resolver →
|
|
* resolve/pick agent → access gate → resolve/create session → write
|
|
* messages_in → wake container.
|
|
*
|
|
* Two module hooks (registered by the permissions module):
|
|
* - `setSenderResolver` runs BEFORE agent resolution so user rows get
|
|
* upserted even if the message ends up dropped by agent wiring.
|
|
* Without the module, userId is null and downstream code tolerates it.
|
|
* - `setAccessGate` runs AFTER agent resolution so policy decisions can
|
|
* branch on the target agent group. Without the module, access is
|
|
* allow-all.
|
|
*
|
|
* `dropped_messages` is core audit infra. Core writes rows for structural
|
|
* drops (no agent wired, no trigger match); the access gate writes rows
|
|
* for policy refusals.
|
|
*/
|
|
import { getChannelAdapter } from './channels/channel-registry.js';
|
|
import { gateCommand } from './command-gate.js';
|
|
import { getAgentGroup } from './db/agent-groups.js';
|
|
import { recordDroppedMessage } from './db/dropped-messages.js';
|
|
import {
|
|
createMessagingGroup,
|
|
getMessagingGroupAgents,
|
|
getMessagingGroupWithAgentCount,
|
|
} from './db/messaging-groups.js';
|
|
import { findSessionForAgent } from './db/sessions.js';
|
|
import { startTypingRefresh } from './modules/typing/index.js';
|
|
import { log } from './log.js';
|
|
import { resolveSession, writeSessionMessage, writeOutboundDirect } from './session-manager.js';
|
|
import { wakeContainer } from './container-runner.js';
|
|
import { getSession } from './db/sessions.js';
|
|
import type { AgentGroup, MessagingGroup, MessagingGroupAgent } from './types.js';
|
|
import type { InboundEvent } from './channels/adapter.js';
|
|
|
|
function generateId(): string {
|
|
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
|
}
|
|
|
|
/**
|
|
* Sender-resolver hook. Runs before agent resolution.
|
|
*
|
|
* The permissions module registers this to extract the sender's namespaced
|
|
* user id and upsert the users row. Returns null when the payload doesn't
|
|
* carry enough info to identify a sender. Without the hook, every message
|
|
* arrives at the gate with userId=null.
|
|
*/
|
|
export type SenderResolverFn = (event: InboundEvent) => string | null;
|
|
|
|
let senderResolver: SenderResolverFn | null = null;
|
|
|
|
export function setSenderResolver(fn: SenderResolverFn): void {
|
|
if (senderResolver) {
|
|
log.warn('Sender resolver overwritten');
|
|
}
|
|
senderResolver = fn;
|
|
}
|
|
|
|
/**
|
|
* Access-gate hook. Runs after agent resolution.
|
|
*
|
|
* The permissions module registers this; without it, core defaults to
|
|
* allow-all. The gate receives the raw event so it can extract the sender
|
|
* name for audit-trail purposes, and it is responsible for recording its
|
|
* own `dropped_messages` row on refusal (structural drops are already
|
|
* recorded by core before the gate runs).
|
|
*/
|
|
export type AccessGateResult = { allowed: true } | { allowed: false; reason: string };
|
|
|
|
export type AccessGateFn = (
|
|
event: InboundEvent,
|
|
userId: string | null,
|
|
mg: MessagingGroup,
|
|
agentGroupId: string,
|
|
) => AccessGateResult;
|
|
|
|
let accessGate: AccessGateFn | null = null;
|
|
|
|
export function setAccessGate(fn: AccessGateFn): void {
|
|
if (accessGate) {
|
|
log.warn('Access gate overwritten');
|
|
}
|
|
accessGate = fn;
|
|
}
|
|
|
|
/**
|
|
* Per-wiring sender-scope hook. Runs alongside the access gate for each
|
|
* agent that would otherwise engage — lets the permissions module enforce
|
|
* `sender_scope='known'` on wirings that are stricter than the messaging
|
|
* group's `unknown_sender_policy`. When the hook isn't registered (module
|
|
* not installed), sender_scope is a no-op.
|
|
*/
|
|
export type SenderScopeGateFn = (
|
|
event: InboundEvent,
|
|
userId: string | null,
|
|
mg: MessagingGroup,
|
|
agent: MessagingGroupAgent,
|
|
) => AccessGateResult;
|
|
|
|
let senderScopeGate: SenderScopeGateFn | null = null;
|
|
|
|
export function setSenderScopeGate(fn: SenderScopeGateFn): void {
|
|
if (senderScopeGate) {
|
|
log.warn('Sender-scope gate overwritten');
|
|
}
|
|
senderScopeGate = fn;
|
|
}
|
|
|
|
/**
|
|
* Channel-registration hook. Runs when the router sees a mention/DM on a
|
|
* messaging group that has no wirings AND hasn't been denied. The hook is
|
|
* expected to escalate to an owner (card, etc.) and arrange for future
|
|
* replay via routeInbound after approval. Fire-and-forget from the
|
|
* router's perspective.
|
|
*
|
|
* Registered by the permissions module. Without the module the router
|
|
* silently records the drop with reason='no_agent_wired' and moves on.
|
|
*/
|
|
export type ChannelRequestGateFn = (mg: MessagingGroup, event: InboundEvent) => Promise<void>;
|
|
|
|
let channelRequestGate: ChannelRequestGateFn | null = null;
|
|
|
|
export function setChannelRequestGate(fn: ChannelRequestGateFn): void {
|
|
if (channelRequestGate) {
|
|
log.warn('Channel-request gate overwritten');
|
|
}
|
|
channelRequestGate = fn;
|
|
}
|
|
|
|
function safeParseContent(raw: string): { text?: string; sender?: string; senderId?: string } {
|
|
try {
|
|
return JSON.parse(raw);
|
|
} catch {
|
|
return { text: raw };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Route an inbound message from a channel adapter to the correct session.
|
|
* Creates messaging group + session if they don't exist yet.
|
|
*/
|
|
export async function routeInbound(event: InboundEvent): Promise<void> {
|
|
// 0. Apply the adapter's thread policy. Non-threaded adapters (Telegram,
|
|
// WhatsApp, iMessage, email) collapse threads to the channel.
|
|
const adapter = getChannelAdapter(event.channelType);
|
|
if (adapter && !adapter.supportsThreads) {
|
|
event = { ...event, threadId: null };
|
|
}
|
|
|
|
const isMention = event.message.isMention === true;
|
|
|
|
// 1. Combined lookup: messaging_group row + count of wired agents in a
|
|
// single query. Cheap short-circuit for the common "unwired channel"
|
|
// case — one DB read and we're out, no auto-create, no sender
|
|
// resolution, no log spam.
|
|
const found = getMessagingGroupWithAgentCount(event.channelType, event.platformId);
|
|
|
|
let mg: MessagingGroup;
|
|
let agentCount: number;
|
|
if (!found) {
|
|
// No messaging_groups row. Auto-create only when the message warrants
|
|
// attention (the bot was addressed — @mention or DM). Plain chatter in
|
|
// channels we merely sit in stays silent — no row, no DB writes.
|
|
if (!isMention) return;
|
|
const mgId = `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
|
mg = {
|
|
id: mgId,
|
|
channel_type: event.channelType,
|
|
platform_id: event.platformId,
|
|
name: null,
|
|
is_group: event.message.isGroup ? 1 : 0,
|
|
unknown_sender_policy: 'request_approval',
|
|
denied_at: null,
|
|
created_at: new Date().toISOString(),
|
|
};
|
|
createMessagingGroup(mg);
|
|
log.info('Auto-created messaging group', {
|
|
id: mgId,
|
|
channelType: event.channelType,
|
|
platformId: event.platformId,
|
|
});
|
|
agentCount = 0;
|
|
} else {
|
|
mg = found.mg;
|
|
agentCount = found.agentCount;
|
|
}
|
|
|
|
// 1b. No wirings — either silent drop (plain chatter / denied channel) or
|
|
// escalate to owner for channel-registration approval.
|
|
if (agentCount === 0) {
|
|
if (!isMention) return;
|
|
if (mg.denied_at) {
|
|
log.debug('Message dropped — channel was denied by owner', {
|
|
messagingGroupId: mg.id,
|
|
deniedAt: mg.denied_at,
|
|
});
|
|
return;
|
|
}
|
|
|
|
const parsed = safeParseContent(event.message.content);
|
|
recordDroppedMessage({
|
|
channel_type: event.channelType,
|
|
platform_id: event.platformId,
|
|
user_id: null,
|
|
sender_name: parsed.sender ?? null,
|
|
reason: 'no_agent_wired',
|
|
messaging_group_id: mg.id,
|
|
agent_group_id: null,
|
|
});
|
|
|
|
if (channelRequestGate) {
|
|
// Fire-and-forget escalation. The gate is expected to build a card,
|
|
// persist pending_channel_approvals, and replay the event via
|
|
// routeInbound after approval. Errors are logged internally — the
|
|
// user's message still stays dropped here either way.
|
|
void channelRequestGate(mg, event).catch((err) =>
|
|
log.error('Channel-request gate threw', { messagingGroupId: mg.id, err }),
|
|
);
|
|
} else {
|
|
log.warn('MESSAGE DROPPED — no agent groups wired and no channel-request gate registered', {
|
|
messagingGroupId: mg.id,
|
|
channelType: event.channelType,
|
|
platformId: event.platformId,
|
|
});
|
|
}
|
|
return;
|
|
}
|
|
|
|
// 2. Sender resolution (permissions module upserts the users row as a
|
|
// side effect so later role/access lookups find a real record).
|
|
// Without the module, userId is null — downstream tolerates it.
|
|
const userId: string | null = senderResolver ? senderResolver(event) : null;
|
|
|
|
// 3. Fetch wired agents in full (we already know the count is > 0; now
|
|
// we need their actual rows for fan-out).
|
|
const agents = getMessagingGroupAgents(mg.id);
|
|
|
|
// 4. Fan-out: evaluate each wired agent independently against engage_mode,
|
|
// sender_scope, and access gate. An agent that engages gets its own
|
|
// session and container wake. An agent that declines but has
|
|
// ignored_message_policy='accumulate' still gets the message stored in
|
|
// its session (trigger=0) so the context is available when it does
|
|
// engage later. Drop policy = skip silently.
|
|
//
|
|
// Subscribe (for mention-sticky wirings on threaded platforms) fires
|
|
// once per message from this loop — the first engaging mention-sticky
|
|
// wiring triggers adapter.subscribe(...); subsequent wirings don't
|
|
// re-subscribe (chat.subscribe is idempotent anyway, but the flag
|
|
// avoids the extra await).
|
|
const parsed = safeParseContent(event.message.content);
|
|
const messageText = parsed.text ?? '';
|
|
|
|
let engagedCount = 0;
|
|
let accumulatedCount = 0;
|
|
let subscribed = false;
|
|
|
|
for (const agent of agents) {
|
|
const agentGroup = getAgentGroup(agent.agent_group_id);
|
|
if (!agentGroup) continue;
|
|
|
|
const engages = evaluateEngage(agent, messageText, isMention, mg, event.threadId);
|
|
|
|
const accessOk = engages && (!accessGate || accessGate(event, userId, mg, agent.agent_group_id).allowed);
|
|
const scopeOk = engages && (!senderScopeGate || senderScopeGate(event, userId, mg, agent).allowed);
|
|
|
|
if (engages && accessOk && scopeOk) {
|
|
await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, true);
|
|
engagedCount++;
|
|
|
|
// Mention-sticky: ask the adapter to subscribe the thread so the
|
|
// platform's subscribed-message path carries follow-ups without
|
|
// requiring another @mention. Threaded-adapter only; DMs and
|
|
// non-threaded platforms skip.
|
|
if (
|
|
!subscribed &&
|
|
agent.engage_mode === 'mention-sticky' &&
|
|
adapter?.supportsThreads &&
|
|
adapter.subscribe &&
|
|
event.threadId !== null &&
|
|
mg.is_group !== 0
|
|
) {
|
|
subscribed = true;
|
|
// Fire-and-forget — subscribe is platform-side bookkeeping and
|
|
// shouldn't block message routing. Errors are logged inside the
|
|
// adapter (or by the promise rejection handler below).
|
|
void adapter.subscribe(event.platformId, event.threadId).catch((err) => {
|
|
log.warn('adapter.subscribe failed', { channelType: event.channelType, threadId: event.threadId, err });
|
|
});
|
|
}
|
|
} else if (agent.ignored_message_policy === 'accumulate') {
|
|
await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, false);
|
|
accumulatedCount++;
|
|
} else {
|
|
log.debug('Message not engaged for agent (drop policy)', {
|
|
agentGroupId: agent.agent_group_id,
|
|
engage_mode: agent.engage_mode,
|
|
engages,
|
|
accessOk,
|
|
scopeOk,
|
|
});
|
|
}
|
|
}
|
|
|
|
if (engagedCount + accumulatedCount === 0) {
|
|
recordDroppedMessage({
|
|
channel_type: event.channelType,
|
|
platform_id: event.platformId,
|
|
user_id: userId,
|
|
sender_name: parsed.sender ?? null,
|
|
reason: 'no_agent_engaged',
|
|
messaging_group_id: mg.id,
|
|
agent_group_id: null,
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Decide whether a given wired agent should engage on this message.
|
|
*
|
|
* 'pattern' — regex test on text; '.' = always
|
|
* 'mention' — bot must be mentioned on the platform. Resolved by
|
|
* the adapter (SDK-level) and forwarded as
|
|
* `event.message.isMention`. Agent display name
|
|
* (`agent_group.name`) is irrelevant — users address
|
|
* the bot via its platform username (@botname on
|
|
* Telegram, user-id mention on Slack/Discord), not
|
|
* via the agent's NanoClaw-side display name. If a
|
|
* user wants to disambiguate between multiple agents
|
|
* wired to one chat, use engage_mode='pattern' with
|
|
* the disambiguator as the regex.
|
|
* 'mention-sticky' — platform mention OR an active per-thread session
|
|
* already exists for this (agent, mg, thread). The
|
|
* session existence IS our subscription state; once
|
|
* a thread has engaged us once, follow-ups arrive
|
|
* with no mention and should still fire.
|
|
*/
|
|
function evaluateEngage(
|
|
agent: MessagingGroupAgent,
|
|
text: string,
|
|
isMention: boolean,
|
|
mg: MessagingGroup,
|
|
threadId: string | null,
|
|
): boolean {
|
|
switch (agent.engage_mode) {
|
|
case 'pattern': {
|
|
const pat = agent.engage_pattern ?? '.';
|
|
if (pat === '.') return true;
|
|
try {
|
|
return new RegExp(pat).test(text);
|
|
} catch {
|
|
// Bad regex: fail open so admin sees the agent responding + can fix.
|
|
return true;
|
|
}
|
|
}
|
|
case 'mention':
|
|
return isMention;
|
|
case 'mention-sticky': {
|
|
if (isMention) return true;
|
|
// Sticky follow-up: session already exists for this (agent, mg, thread)
|
|
// — the thread was activated before, keep firing.
|
|
if (mg.is_group === 0) return false; // DMs never use mention-sticky sensibly
|
|
const existing = findSessionForAgent(agent.agent_group_id, mg.id, threadId);
|
|
return existing !== undefined;
|
|
}
|
|
default:
|
|
return false;
|
|
}
|
|
}
|
|
|
|
async function deliverToAgent(
|
|
agent: MessagingGroupAgent,
|
|
agentGroup: AgentGroup,
|
|
mg: MessagingGroup,
|
|
event: InboundEvent,
|
|
userId: string | null,
|
|
adapterSupportsThreads: boolean,
|
|
wake: boolean,
|
|
): Promise<void> {
|
|
// Apply the adapter thread policy: threaded adapter in a group chat →
|
|
// per-thread session regardless of wiring. agent-shared preserved (it's
|
|
// a cross-channel directive the adapter doesn't know about). DMs collapse
|
|
// sub-threads to one session (is_group=0 short-circuit).
|
|
let effectiveSessionMode = agent.session_mode;
|
|
if (adapterSupportsThreads && effectiveSessionMode !== 'agent-shared' && mg.is_group !== 0) {
|
|
effectiveSessionMode = 'per-thread';
|
|
}
|
|
|
|
const { session, created } = resolveSession(agent.agent_group_id, mg.id, event.threadId, effectiveSessionMode);
|
|
|
|
// The inbound row's (channel_type, platform_id, thread_id) is the address
|
|
// the agent's reply will be delivered to. Normally it mirrors the source
|
|
// (stamped from the event). When the caller supplied `replyTo` (CLI admin
|
|
// transport acting on operator intent), the reply is redirected there.
|
|
const deliveryAddr = event.replyTo ?? {
|
|
channelType: event.channelType,
|
|
platformId: event.platformId,
|
|
threadId: event.threadId,
|
|
};
|
|
|
|
// Command gate: classify slash commands before they reach the container.
|
|
// Filtered commands are dropped silently. Denied admin commands get a
|
|
// permission-denied response written directly to messages_out.
|
|
if (event.message.kind === 'chat' || event.message.kind === 'chat-sdk') {
|
|
const gate = gateCommand(event.message.content, userId, agent.agent_group_id);
|
|
if (gate.action === 'filter') {
|
|
log.debug('Filtered command dropped by gate', { agentGroupId: agent.agent_group_id });
|
|
return;
|
|
}
|
|
if (gate.action === 'deny') {
|
|
writeOutboundDirect(session.agent_group_id, session.id, {
|
|
id: `deny-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
|
kind: 'chat',
|
|
platformId: deliveryAddr.platformId,
|
|
channelType: deliveryAddr.channelType,
|
|
threadId: deliveryAddr.threadId,
|
|
content: JSON.stringify({ text: `Permission denied: ${gate.command} requires admin access.` }),
|
|
});
|
|
log.info('Admin command denied by gate', { command: gate.command, userId, agentGroupId: agent.agent_group_id });
|
|
return;
|
|
}
|
|
}
|
|
|
|
writeSessionMessage(session.agent_group_id, session.id, {
|
|
id: messageIdForAgent(event.message.id, agent.agent_group_id),
|
|
kind: event.message.kind,
|
|
timestamp: event.message.timestamp,
|
|
platformId: deliveryAddr.platformId,
|
|
channelType: deliveryAddr.channelType,
|
|
threadId: deliveryAddr.threadId,
|
|
content: event.message.content,
|
|
trigger: wake ? 1 : 0,
|
|
});
|
|
|
|
log.info('Message routed', {
|
|
sessionId: session.id,
|
|
agentGroup: agent.agent_group_id,
|
|
engage_mode: agent.engage_mode,
|
|
kind: event.message.kind,
|
|
userId,
|
|
wake,
|
|
created,
|
|
agentGroupName: agentGroup.name,
|
|
});
|
|
|
|
if (wake) {
|
|
// Typing indicator + wake are only for the engaged branch; accumulated
|
|
// messages sit silently until a real trigger fires.
|
|
startTypingRefresh(session.id, session.agent_group_id, event.channelType, event.platformId, event.threadId);
|
|
const freshSession = getSession(session.id);
|
|
if (freshSession) {
|
|
await wakeContainer(freshSession);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* When fanning out, the same inbound message lands in multiple per-agent
|
|
* session DBs. messages_in.id is PRIMARY KEY, so reuse of the raw id would
|
|
* collide across sessions (or, more subtly, within one session if re-routed
|
|
* after a retry). Namespace by agent_group_id to keep ids unique per session.
|
|
*/
|
|
function messageIdForAgent(baseId: string | undefined, agentGroupId: string): string {
|
|
const id = baseId && baseId.length > 0 ? baseId : generateId();
|
|
return `${id}:${agentGroupId}`;
|
|
}
|