Files
nanoclaw/src/channels/chat-sdk-bridge.ts
gavrielc 16b9499532 feat(routing): engage modes + sender scope + accumulate/drop + per-agent fan-out
Replaces the opaque trigger_rules JSON + response_scope enum on
messaging_group_agents with four explicit orthogonal columns:

    engage_mode            'pattern' | 'mention' | 'mention-sticky'
    engage_pattern         regex source; required when mode='pattern';
                           '.' is the "always" sentinel
    sender_scope           'all' | 'known'
    ignored_message_policy 'drop' | 'accumulate'

Inbound routing becomes a fan-out — every wired agent is evaluated
independently. A match gets its own session + container wake. A miss
with accumulate keeps the message as context-only (trigger=0) in that
agent's session, so when the agent does eventually engage it sees the
prior chatter.

## Schema

- Migration 010 (`engage-modes`): adds the 4 new columns, backfills
  from trigger_rules.pattern + requiresTrigger + response_scope, drops
  the legacy columns.
- messages_in gains `trigger INTEGER NOT NULL DEFAULT 1` (session DB
  schema + `migrateMessagesInTable` forward-compat).
- countDueMessages gates waking on `trigger = 1`.

## Routing

- `pickAgent` (returns one) → loop over all wired agents. Per agent:
  evaluate engage_mode; run access gate + sender-scope gate; on full
  match → resolveSession + writeSessionMessage(trigger=1) + wake. On
  miss with accumulate → writeSessionMessage(trigger=0), no wake. On
  miss with drop → skip.
- New `findSessionForAgent(agentGroupId, mgId, threadId)` scopes
  session lookup by agent so fan-out doesn't cross sessions.
- `messageIdForAgent` namespaces inbound message ids by agent_group_id
  so PRIMARY KEY doesn't collide across per-agent session DBs.

## Adapter layer

- `ConversationConfig` replaces `triggerPattern` + `requiresTrigger`
  with `engageMode` + `engagePattern`.
- Chat SDK bridge stores `Map<platformId, ConversationConfig[]>` (multi-
  agent per conversation) and applies union gating pre-onInbound:
    * onSubscribedMessage: engage if any wiring keeps firing in
      subscribed state (mention-sticky or pattern)
    * onNewMention: engage on mention; only subscribes the thread if
      at least one wiring is `mention-sticky`
    * onDirectMessage: engage per mode; sticky follows same rule
- Bridge no longer unconditionally calls `thread.subscribe()`.

## Sender scope

- Permissions module registers a second hook `setSenderScopeGate` that
  runs per-wiring after the existing access gate. `sender_scope='known'`
  requires canAccessAgentGroup(); `'all'` is a no-op. Not installed →
  no-op everywhere (default allow).

## Container side

- Host passes `NANOCLAW_MAX_MESSAGES_PER_PROMPT` (reuses existing
  MAX_MESSAGES_PER_PROMPT config; was dead code from v1).
- `getPendingMessages` queries `ORDER BY seq DESC LIMIT N`, reverses to
  chronological order for the prompt — accumulated context rides along
  with trigger rows up to the cap.
- `MessageInRow` gains `trigger: number` so the container can tell them
  apart in downstream code (container still processes both; only the
  host uses `trigger=0` for don't-wake).

## Defaults (per ACTION-ITEMS item 1 decision)

- DM (is_group=0): `engage_mode='pattern'`, `engage_pattern='.'` (always)
- Threaded group: `engage_mode='mention-sticky'` (seed-discord)
- Non-threaded group / CLI: pattern '.' in bootstrap scripts

## Tests

- src/host-core.test.ts: 3 new cases — fan-out (2 agents, 2 sessions,
  2 wakes), accumulate (trigger=0 + no wake), drop (no session created).
- Existing 10 host-core tests still pass.
- Migration 010 runs on an empty DB in 0-row path — verified.

Closes: ACTION-ITEMS items 1, 4.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 01:30:04 +03:00

600 lines
23 KiB
TypeScript

/**
* Chat SDK bridge — wraps a Chat SDK adapter + Chat instance
* to conform to the NanoClaw ChannelAdapter interface.
*
* Used by Discord, Slack, and other Chat SDK-supported platforms.
*/
import http from 'http';
import {
Chat,
Card,
CardText,
Actions,
Button,
type Adapter,
type ConcurrencyStrategy,
type Message as ChatMessage,
} from 'chat';
import { log } from '../log.js';
import { SqliteStateAdapter } from '../state-sqlite.js';
import { registerWebhookAdapter } from '../webhook-server.js';
import { getAskQuestionRender } from '../db/sessions.js';
import { normalizeOptions, type NormalizedOption } from './ask-question.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js';
/** Adapter with optional gateway support (e.g., Discord). */
interface GatewayAdapter extends Adapter {
startGatewayListener?(
options: { waitUntil?: (task: Promise<unknown>) => void },
durationMs?: number,
abortSignal?: AbortSignal,
webhookUrl?: string,
): Promise<Response>;
}
/** Reply context extracted from a platform's raw message. */
export interface ReplyContext {
text: string;
sender: string;
}
/** Extract reply context from a platform-specific raw message. Return null if no reply. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ReplyContextExtractor = (raw: Record<string, any>) => ReplyContext | null;
export interface ChatSdkBridgeConfig {
adapter: Adapter;
concurrency?: ConcurrencyStrategy;
/** Bot token for authenticating forwarded Gateway events (required for interaction handling). */
botToken?: string;
/** Platform-specific reply context extraction. */
extractReplyContext?: ReplyContextExtractor;
/**
* Whether this platform uses threads as the primary conversation unit.
* See `ChannelAdapter.supportsThreads`. Declared by the calling channel
* skill, not inferred, because some platforms (Discord) can be used either
* way and the default depends on installation style.
*/
supportsThreads: boolean;
/**
* Optional transform applied to outbound text/markdown before it reaches the
* adapter. Used by channels that need to sanitize for a platform-specific
* quirk (e.g. Telegram's legacy Markdown parse mode).
*/
transformOutboundText?: (text: string) => string;
}
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
const { adapter } = config;
const transformText = (t: string): string => (config.transformOutboundText ? config.transformOutboundText(t) : t);
let chat: Chat;
let state: SqliteStateAdapter;
let setupConfig: ChannelSetup;
// Keyed by platformId. Multiple agents may be wired to the same
// conversation — this holds all their configs so the bridge can apply the
// most-permissive engage rule at gate time and only subscribe when at
// least one wiring requested 'mention-sticky'.
//
// STALENESS: populated at setup() and updateConversations(). If wirings
// change after setup, updateConversations() must be called to refresh
// (ACTION-ITEMS item 17).
let conversations: Map<string, ConversationConfig[]>;
let gatewayAbort: AbortController | null = null;
function buildConversationMap(configs: ConversationConfig[]): Map<string, ConversationConfig[]> {
const map = new Map<string, ConversationConfig[]>();
for (const conv of configs) {
const existing = map.get(conv.platformId);
if (existing) existing.push(conv);
else map.set(conv.platformId, [conv]);
}
return map;
}
/**
* Should a message from (channelId, kind) engage any of the wired agents?
*
* - `mention` — engages only when the message actually @-mentions
* the bot (the bridge already sees it here because
* Chat SDK only forwards subscribed / mentioned /
* DM messages)
* - `mention-sticky` — same as `mention` for gating, PLUS we subscribe
* the thread so later messages arrive via the
* subscribed path and fall through to an
* engage-all style treatment
* - `pattern` — regex test against message text; `.` = always
*
* We take the union across wired agents — if any one of them would engage,
* the message goes through. Per-agent filtering after that happens in the
* host router (see src/router.ts pickAgents).
*/
function shouldEngage(
channelId: string,
source: 'subscribed' | 'mention' | 'dm',
text: string,
): { engage: boolean; stickySubscribe: boolean } {
const configs = conversations.get(channelId);
// Unknown conversation — forward anyway (may be a new group that
// hasn't been registered yet; central routing will log + drop cleanly).
if (!configs || configs.length === 0) return { engage: true, stickySubscribe: false };
let engage = false;
let stickySubscribe = false;
for (const cfg of configs) {
switch (cfg.engageMode) {
case 'mention':
if (source === 'mention' || source === 'dm') engage = true;
break;
case 'mention-sticky':
if (source === 'mention' || source === 'dm') {
engage = true;
stickySubscribe = true;
} else if (source === 'subscribed') {
// Thread was already subscribed on a prior mention — treat as
// engage-all so follow-ups in the thread reach the agent.
engage = true;
}
break;
case 'pattern': {
const pattern = cfg.engagePattern ?? '.';
try {
if (pattern === '.' || new RegExp(pattern).test(text)) engage = true;
} catch {
// Invalid regex → fail open so the admin can see something and fix.
engage = true;
}
break;
}
}
if (engage && stickySubscribe) break;
}
return { engage, stickySubscribe };
}
async function messageToInbound(message: ChatMessage): Promise<InboundMessage> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const serialized = message.toJSON() as Record<string, any>;
// Download attachment data before serialization loses fetchData()
if (message.attachments && message.attachments.length > 0) {
const enriched = [];
for (const att of message.attachments) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const entry: Record<string, any> = {
type: att.type,
name: att.name,
mimeType: att.mimeType,
size: att.size,
width: (att as unknown as Record<string, unknown>).width,
height: (att as unknown as Record<string, unknown>).height,
};
if (att.fetchData) {
try {
const buffer = await att.fetchData();
entry.data = buffer.toString('base64');
} catch (err) {
log.warn('Failed to download attachment', { type: att.type, err });
}
}
enriched.push(entry);
}
serialized.attachments = enriched;
}
// Extract reply context via platform-specific hook
if (config.extractReplyContext && message.raw) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const replyTo = config.extractReplyContext(message.raw as Record<string, any>);
if (replyTo) serialized.replyTo = replyTo;
}
// Project chat-sdk's nested author into the flat sender fields the router
// expects (see src/router.ts extractAndUpsertUser). Native adapters already
// populate these directly; this brings chat-sdk adapters in line.
const author = serialized.author as { userId?: string; fullName?: string; userName?: string } | undefined;
if (author) {
const name = author.fullName ?? author.userName;
serialized.senderId = author.userId;
serialized.sender = name;
serialized.senderName = name;
}
// Drop raw to save DB space (can be very large)
serialized.raw = undefined;
return {
id: message.id,
kind: 'chat-sdk',
content: serialized,
timestamp: message.metadata.dateSent.toISOString(),
};
}
const bridge: ChannelAdapter = {
name: adapter.name,
channelType: adapter.name,
supportsThreads: config.supportsThreads,
async setup(hostConfig: ChannelSetup) {
setupConfig = hostConfig;
conversations = buildConversationMap(hostConfig.conversations);
state = new SqliteStateAdapter();
chat = new Chat({
adapters: { [adapter.name]: adapter },
userName: adapter.userName || 'NanoClaw',
concurrency: config.concurrency ?? 'concurrent',
state,
logger: 'silent',
});
// Subscribed threads — the conversation is already active (via prior
// mention-sticky engagement or admin wiring). Gate on engageMode so a
// plain 'mention' wiring doesn't keep firing after a one-off mention.
chat.onSubscribedMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.content === 'string' ? message.content : '';
const decision = shouldEngage(channelId, 'subscribed', text);
if (!decision.engage) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
});
// @mention in an unsubscribed thread — always engage; subscribe only
// if the wiring is 'mention-sticky'.
chat.onNewMention(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.content === 'string' ? message.content : '';
const decision = shouldEngage(channelId, 'mention', text);
if (!decision.engage) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
if (decision.stickySubscribe) {
await thread.subscribe();
}
});
// DMs — apply engage rules too, but DMs typically default to pattern='.'
// at setup time so this is a pass-through in practice. sticky subscribe
// follows the same rule as a group mention.
//
// Thread id is passed through so sub-thread context reaches delivery
// (Slack users can open threads inside a DM). The router collapses DM
// sub-threads to one session (is_group=0 short-circuits the per-thread
// escalation).
chat.onDirectMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.content === 'string' ? message.content : '';
const decision = shouldEngage(channelId, 'dm', text);
log.info('Inbound DM received', {
adapter: adapter.name,
channelId,
sender: (message.author as any)?.fullName ?? (message.author as any)?.userId ?? 'unknown',
threadId: thread.id,
engage: decision.engage,
});
if (!decision.engage) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
if (decision.stickySubscribe) {
await thread.subscribe();
}
});
// Handle button clicks (ask_user_question)
chat.onAction(async (event) => {
if (!event.actionId.startsWith('ncq:')) return;
const parts = event.actionId.split(':');
if (parts.length < 3) return;
const questionId = parts[1];
const selectedOption = event.value || '';
const userId = event.user?.userId || '';
// Resolve render metadata BEFORE dispatching onAction (which deletes the row).
const render = getAskQuestionRender(questionId);
const title = render?.title ?? '❓ Question';
const matched = render?.options.find((o) => o.value === selectedOption);
const selectedLabel = matched?.selectedLabel ?? selectedOption ?? '(clicked)';
// Update the card to show the selected answer and remove buttons
try {
const tid = event.threadId;
await adapter.editMessage(tid, event.messageId, {
markdown: `${title}\n\n${selectedLabel}`,
});
} catch (err) {
log.warn('Failed to update card after action', { err });
}
setupConfig.onAction(questionId, selectedOption, userId);
});
await chat.initialize();
// Start Gateway listener for adapters that support it (e.g., Discord)
const gatewayAdapter = adapter as GatewayAdapter;
if (gatewayAdapter.startGatewayListener) {
gatewayAbort = new AbortController();
// Start local HTTP server to receive forwarded Gateway events (including interactions)
const webhookUrl = await startLocalWebhookServer(gatewayAdapter, setupConfig, config.botToken);
const startGateway = () => {
if (gatewayAbort?.signal.aborted) return;
// Capture the long-running listener promise via waitUntil
let listenerPromise: Promise<unknown> | undefined;
gatewayAdapter.startGatewayListener!(
{
waitUntil: (p: Promise<unknown>) => {
listenerPromise = p;
},
},
24 * 60 * 60 * 1000,
gatewayAbort!.signal,
webhookUrl,
).then(() => {
// startGatewayListener resolves immediately with a Response;
// the actual work is in the listenerPromise passed to waitUntil
if (listenerPromise) {
listenerPromise
.then(() => {
if (!gatewayAbort?.signal.aborted) {
log.info('Gateway listener expired, restarting', { adapter: adapter.name });
startGateway();
}
})
.catch((err) => {
if (!gatewayAbort?.signal.aborted) {
log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err });
setTimeout(startGateway, 5000);
}
});
}
});
};
startGateway();
log.info('Gateway listener started', { adapter: adapter.name });
} else {
// Non-gateway adapters (Slack, Teams, GitHub, etc.) — register on the shared webhook server
registerWebhookAdapter(chat, adapter.name);
}
log.info('Chat SDK bridge initialized', { adapter: adapter.name });
},
async deliver(platformId: string, threadId: string | null, message): Promise<string | undefined> {
// platformId is already in the adapter's encoded format (e.g. "telegram:6037840640",
// "discord:guildId:channelId") — use it directly as the thread ID
const tid = threadId ?? platformId;
const content = message.content as Record<string, unknown>;
if (content.operation === 'edit' && content.messageId) {
await adapter.editMessage(tid, content.messageId as string, {
markdown: transformText((content.text as string) || (content.markdown as string) || ''),
});
return;
}
if (content.operation === 'reaction' && content.messageId && content.emoji) {
await adapter.addReaction(tid, content.messageId as string, content.emoji as string);
return;
}
// Ask question card — render as Card with buttons
if (content.type === 'ask_question' && content.questionId && content.options) {
const questionId = content.questionId as string;
const title = content.title as string;
const question = content.question as string;
if (!title) {
log.error('ask_question missing required title — skipping delivery', { questionId });
return;
}
const options: NormalizedOption[] = normalizeOptions(content.options as never);
const card = Card({
title,
children: [
CardText(question),
Actions(
options.map((opt) =>
Button({ id: `ncq:${questionId}:${opt.value}`, label: opt.label, value: opt.value }),
),
),
],
});
const result = await adapter.postMessage(tid, {
card,
fallbackText: `${title}\n\n${question}\nOptions: ${options.map((o) => o.label).join(', ')}`,
});
return result?.id;
}
// Normal message
const rawText = (content.markdown as string) || (content.text as string);
const text = rawText ? transformText(rawText) : rawText;
if (text) {
// Attach files if present (FileUpload format: { data, filename })
const fileUploads = message.files?.map((f: { data: Buffer; filename: string }) => ({
data: f.data,
filename: f.filename,
}));
if (fileUploads && fileUploads.length > 0) {
const result = await adapter.postMessage(tid, { markdown: text, files: fileUploads });
return result?.id;
} else {
const result = await adapter.postMessage(tid, { markdown: text });
return result?.id;
}
} else if (message.files && message.files.length > 0) {
// Files only, no text
const fileUploads = message.files.map((f: { data: Buffer; filename: string }) => ({
data: f.data,
filename: f.filename,
}));
const result = await adapter.postMessage(tid, { markdown: '', files: fileUploads });
return result?.id;
}
},
async setTyping(platformId: string, threadId: string | null) {
const tid = threadId ?? platformId;
await adapter.startTyping(tid);
},
async teardown() {
gatewayAbort?.abort();
await chat.shutdown();
log.info('Chat SDK bridge shut down', { adapter: adapter.name });
},
isConnected() {
return true;
},
updateConversations(configs: ConversationConfig[]) {
conversations = buildConversationMap(configs);
},
};
// Only expose openDM when the underlying Chat SDK adapter implements it.
// Delegate straight to adapter.openDM rather than going through chat.openDM:
// the latter dispatches via inferAdapterFromUserId, which only recognizes
// Discord snowflakes, Slack U-ids, Teams 29:-ids, and gChat users/-ids, and
// throws for everything else (Telegram numeric ids, iMessage, Matrix, …).
// Calling adapter.openDM directly also preserves the adapter's native
// platform_id encoding via channelIdFromThreadId (e.g. "telegram:<chatId>"),
// which matches what onInbound stores in messaging_groups — avoiding a
// duplicate-row / decode-error cascade at delivery time. See user-dm.ts for
// the direct-addressable fallback when the adapter has no openDM at all.
if (adapter.openDM) {
bridge.openDM = async (userHandle: string): Promise<string> => {
const threadId = await adapter.openDM!(userHandle);
return adapter.channelIdFromThreadId(threadId);
};
}
return bridge;
}
/**
* Start a local HTTP server to receive forwarded Gateway events.
* This is needed because the Gateway listener in webhook-forwarding mode
* sends ALL raw events (including INTERACTION_CREATE for button clicks)
* to the webhookUrl, which we handle here.
*/
function startLocalWebhookServer(
adapter: GatewayAdapter,
setupConfig: ChannelSetup,
botToken?: string,
): Promise<string> {
return new Promise((resolve) => {
const server = http.createServer((req, res) => {
const chunks: Buffer[] = [];
req.on('data', (chunk: Buffer) => chunks.push(chunk));
req.on('end', () => {
const body = Buffer.concat(chunks).toString();
handleForwardedEvent(body, adapter, setupConfig, botToken)
.then(() => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end('{"ok":true}');
})
.catch((err) => {
log.error('Webhook server error', { err });
res.writeHead(500);
res.end('{"error":"internal"}');
});
});
});
server.listen(0, '127.0.0.1', () => {
const addr = server.address() as { port: number };
const url = `http://127.0.0.1:${addr.port}/webhook`;
log.info('Local webhook server started', { port: addr.port });
resolve(url);
});
});
}
async function handleForwardedEvent(
body: string,
adapter: GatewayAdapter,
setupConfig: ChannelSetup,
botToken?: string,
): Promise<void> {
let event: { type: string; data: Record<string, unknown> };
try {
event = JSON.parse(body);
} catch {
return;
}
// Handle interaction events (button clicks) — not handled by adapter's handleForwardedGatewayEvent
if (event.type === 'GATEWAY_INTERACTION_CREATE' && event.data) {
const interaction = event.data;
// type 3 = MessageComponent (button/select)
if (interaction.type === 3) {
const customId = (interaction.data as Record<string, unknown>)?.custom_id as string;
const user = (interaction.member as Record<string, unknown>)?.user as Record<string, string> | undefined;
const interactionId = interaction.id as string;
const interactionToken = interaction.token as string;
// Parse the selected option from custom_id
let questionId: string | undefined;
let selectedOption: string | undefined;
if (customId?.startsWith('ncq:')) {
const colonIdx = customId.indexOf(':', 4); // after "ncq:"
if (colonIdx !== -1) {
questionId = customId.slice(4, colonIdx);
selectedOption = customId.slice(colonIdx + 1);
}
}
// Update the card to show the selected answer and remove buttons
const originalEmbeds =
((interaction.message as Record<string, unknown>)?.embeds as Array<Record<string, unknown>>) || [];
const originalDescription = (originalEmbeds[0]?.description as string) || '';
const render = questionId ? getAskQuestionRender(questionId) : undefined;
const cardTitle = render?.title ?? ((originalEmbeds[0]?.title as string) || '❓ Question');
const matchedOpt = render?.options.find((o) => o.value === selectedOption);
const selectedLabel = matchedOpt?.selectedLabel ?? selectedOption ?? customId;
try {
await fetch(`https://discord.com/api/v10/interactions/${interactionId}/${interactionToken}/callback`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
type: 7, // UPDATE_MESSAGE — acknowledge + update in one call
data: {
embeds: [
{
title: cardTitle,
description: `${originalDescription}\n\n${selectedLabel}`,
},
],
components: [], // remove buttons
},
}),
});
} catch (err) {
log.error('Failed to update interaction', { err });
}
// Dispatch to host
if (questionId && selectedOption) {
setupConfig.onAction(questionId, selectedOption, user?.id || '');
}
return;
}
}
// Forward other events to the adapter's webhook handler for normal processing
const fakeRequest = new Request('http://localhost/webhook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-discord-gateway-token': botToken || '',
},
body,
});
await adapter.handleWebhook(fakeRequest, {});
}