/** * Chat SDK bridge — wraps a Chat SDK adapter + Chat instance * to conform to the NanoClaw ChannelAdapter interface. * * Used by Discord, Slack, and other Chat SDK-supported platforms. */ import http from 'http'; import { Chat, Card, CardText, Actions, Button, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage, } from 'chat'; import { log } from '../log.js'; import { SqliteStateAdapter } from '../state-sqlite.js'; import { registerWebhookAdapter } from '../webhook-server.js'; import { getAskQuestionRender } from '../db/sessions.js'; import { normalizeOptions, type NormalizedOption } from './ask-question.js'; import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js'; /** Adapter with optional gateway support (e.g., Discord). */ interface GatewayAdapter extends Adapter { startGatewayListener?( options: { waitUntil?: (task: Promise) => void }, durationMs?: number, abortSignal?: AbortSignal, webhookUrl?: string, ): Promise; } /** Reply context extracted from a platform's raw message. */ export interface ReplyContext { text: string; sender: string; } /** Extract reply context from a platform-specific raw message. Return null if no reply. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any export type ReplyContextExtractor = (raw: Record) => ReplyContext | null; export interface ChatSdkBridgeConfig { adapter: Adapter; concurrency?: ConcurrencyStrategy; /** Bot token for authenticating forwarded Gateway events (required for interaction handling). */ botToken?: string; /** Platform-specific reply context extraction. */ extractReplyContext?: ReplyContextExtractor; /** * Whether this platform uses threads as the primary conversation unit. * See `ChannelAdapter.supportsThreads`. Declared by the calling channel * skill, not inferred, because some platforms (Discord) can be used either * way and the default depends on installation style. */ supportsThreads: boolean; /** * Optional transform applied to outbound text/markdown before it reaches the * adapter. Used by channels that need to sanitize for a platform-specific * quirk (e.g. Telegram's legacy Markdown parse mode). */ transformOutboundText?: (text: string) => string; /** * Maximum text length the underlying adapter accepts in a single message. * When set, the bridge splits outbound text longer than this on paragraph * → line → hard-char boundaries and posts multiple messages. Without this, * adapters like Discord (2000) and Telegram (4096) silently truncate * mid-response. The returned id is the first chunk's id so subsequent edits * and reactions still target the head of the reply. */ maxTextLength?: number; } /** * Split `text` into chunks no larger than `limit`, preferring paragraph * breaks, then line breaks, then a hard character cut as a last resort. * Preserves code fences only structurally — a fenced block that straddles a * chunk boundary will render as two independent blocks on the receiving * platform, which is the same behavior as manually re-opening a fence. */ /** * Decode the actual option value from a button callback. Buttons are encoded * with an integer index (to keep under Telegram's 64-byte callback_data cap), * and the real value is looked up via `getAskQuestionRender(questionId)`. * Falls back to treating the tail as a literal value so old in-flight cards * (encoded before this shortening landed) still resolve. */ function resolveSelectedOption( render: { options: NormalizedOption[] } | undefined, eventValue: string | undefined, tail: string | undefined, ): string { const candidate = eventValue ?? tail ?? ''; if (render && /^\d+$/.test(candidate)) { const idx = Number(candidate); if (render.options[idx]) return render.options[idx].value; } return candidate; } export function splitForLimit(text: string, limit: number): string[] { if (text.length <= limit) return [text]; const chunks: string[] = []; let remaining = text; while (remaining.length > limit) { let cut = remaining.lastIndexOf('\n\n', limit); if (cut <= 0) cut = remaining.lastIndexOf('\n', limit); if (cut <= 0) cut = remaining.lastIndexOf(' ', limit); if (cut <= 0) cut = limit; chunks.push(remaining.slice(0, cut).trimEnd()); remaining = remaining.slice(cut).trimStart(); } if (remaining.length > 0) chunks.push(remaining); return chunks; } export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter { const { adapter } = config; const transformText = (t: string): string => (config.transformOutboundText ? config.transformOutboundText(t) : t); let chat: Chat; let state: SqliteStateAdapter; let setupConfig: ChannelSetup; let gatewayAbort: AbortController | null = null; async function messageToInbound(message: ChatMessage, isMention: boolean): Promise { // eslint-disable-next-line @typescript-eslint/no-explicit-any const serialized = message.toJSON() as Record; // Download attachment data before serialization loses fetchData() if (message.attachments && message.attachments.length > 0) { const enriched = []; for (const att of message.attachments) { // eslint-disable-next-line @typescript-eslint/no-explicit-any const entry: Record = { type: att.type, name: att.name, mimeType: att.mimeType, size: att.size, width: (att as unknown as Record).width, height: (att as unknown as Record).height, }; if (att.fetchData) { try { const buffer = await att.fetchData(); entry.data = buffer.toString('base64'); } catch (err) { log.warn('Failed to download attachment', { type: att.type, err }); } } enriched.push(entry); } serialized.attachments = enriched; } // Extract reply context via platform-specific hook if (config.extractReplyContext && message.raw) { // eslint-disable-next-line @typescript-eslint/no-explicit-any const replyTo = config.extractReplyContext(message.raw as Record); if (replyTo) serialized.replyTo = replyTo; } // Project chat-sdk's nested author into the flat sender fields the router // expects (see src/router.ts extractAndUpsertUser). Native adapters already // populate these directly; this brings chat-sdk adapters in line. const author = serialized.author as { userId?: string; fullName?: string; userName?: string } | undefined; if (author) { const name = author.fullName ?? author.userName; serialized.senderId = author.userId; serialized.sender = name; serialized.senderName = name; } // Drop raw to save DB space (can be very large) serialized.raw = undefined; return { id: message.id, kind: 'chat-sdk', content: serialized, timestamp: message.metadata.dateSent.toISOString(), isMention, }; } const bridge: ChannelAdapter = { name: adapter.name, channelType: adapter.name, supportsThreads: config.supportsThreads, async setup(hostConfig: ChannelSetup) { setupConfig = hostConfig; state = new SqliteStateAdapter(); chat = new Chat({ adapters: { [adapter.name]: adapter }, userName: adapter.userName || 'NanoClaw', concurrency: config.concurrency ?? 'concurrent', state, logger: 'silent', }); // Four SDK dispatch paths — bridge just forwards. All per-wiring // engage / accumulate / drop / subscribe decisions live in the host // router (src/router.ts routeInbound / evaluateEngage). The bridge // only resolves channel ids and sets the platform-confirmed isMention // flag that routeInbound evaluates; the router calls back into // bridge.subscribe(...) when a mention-sticky wiring engages. // Subscribed threads — every message in a thread we've previously // engaged. Carry the SDK's `message.isMention` through so mention-mode // wirings still fire on in-thread mentions. chat.onSubscribedMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, message.isMention === true)); }); // @mention in an unsubscribed thread — SDK-confirmed bot mention. chat.onNewMention(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true)); }); // DMs — by definition addressed to the bot. Thread id flows through // so sub-thread context reaches delivery (Slack users can open threads // inside a DM). Router collapses DM sub-threads to one session via // is_group=0 short-circuit. chat.onDirectMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); log.info('Inbound DM received', { adapter: adapter.name, channelId, sender: (message.author as any)?.fullName ?? (message.author as any)?.userId ?? 'unknown', threadId: thread.id, }); await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true)); }); // Plain messages in unsubscribed threads. // // Chat SDK dispatch (handling-events.mdx §"Handler dispatch order") is // exclusive: subscribed → onSubscribedMessage; unsubscribed+mention → // onNewMention; unsubscribed+pattern-match → onNewMessage. Registering // with `/./` lets the router see every plain message on every // unsubscribed thread the bot can see. The router short-circuits via // getMessagingGroupWithAgentCount (~1 DB read) for unwired channels, // so forwarding every one is cheap enough to not need a bridge-side // flood gate. chat.onNewMessage(/./, async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, false)); }); // Handle button clicks (ask_user_question) chat.onAction(async (event) => { if (!event.actionId.startsWith('ncq:')) return; const parts = event.actionId.split(':'); if (parts.length < 3) return; const questionId = parts[1]; const tail = parts.slice(2).join(':'); const userId = event.user?.userId || ''; // Resolve render metadata BEFORE dispatching onAction (which deletes the row). const render = getAskQuestionRender(questionId); // New format: button id/value is an integer index into options (kept // short to fit Telegram's 64-byte callback_data cap). Old format: // the full value is embedded in actionId/value directly. const selectedOption = resolveSelectedOption(render, event.value, tail); const title = render?.title ?? '❓ Question'; const matched = render?.options.find((o) => o.value === selectedOption); const selectedLabel = matched?.selectedLabel ?? selectedOption ?? '(clicked)'; // Update the card to show the selected answer and remove buttons try { const tid = event.threadId; await adapter.editMessage(tid, event.messageId, { markdown: `${title}\n\n${selectedLabel}`, }); } catch (err) { log.warn('Failed to update card after action', { err }); } setupConfig.onAction(questionId, selectedOption, userId); }); await chat.initialize(); // Start Gateway listener for adapters that support it (e.g., Discord) const gatewayAdapter = adapter as GatewayAdapter; if (gatewayAdapter.startGatewayListener) { gatewayAbort = new AbortController(); // Start local HTTP server to receive forwarded Gateway events (including interactions) const webhookUrl = await startLocalWebhookServer(gatewayAdapter, setupConfig, config.botToken); const startGateway = () => { if (gatewayAbort?.signal.aborted) return; // Capture the long-running listener promise via waitUntil let listenerPromise: Promise | undefined; gatewayAdapter.startGatewayListener!( { waitUntil: (p: Promise) => { listenerPromise = p; }, }, 24 * 60 * 60 * 1000, gatewayAbort!.signal, webhookUrl, ).then(() => { // startGatewayListener resolves immediately with a Response; // the actual work is in the listenerPromise passed to waitUntil if (listenerPromise) { listenerPromise .then(() => { if (!gatewayAbort?.signal.aborted) { log.info('Gateway listener expired, restarting', { adapter: adapter.name }); startGateway(); } }) .catch((err) => { if (!gatewayAbort?.signal.aborted) { log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err }); setTimeout(startGateway, 5000); } }); } }); }; startGateway(); log.info('Gateway listener started', { adapter: adapter.name }); } else { // Non-gateway adapters (Slack, Teams, GitHub, etc.) — register on the shared webhook server registerWebhookAdapter(chat, adapter.name); } log.info('Chat SDK bridge initialized', { adapter: adapter.name }); }, async deliver(platformId: string, threadId: string | null, message): Promise { // platformId is already in the adapter's encoded format (e.g. "telegram:6037840640", // "discord:guildId:channelId") — use it directly as the thread ID const tid = threadId ?? platformId; const content = message.content as Record; if (content.operation === 'edit' && content.messageId) { await adapter.editMessage(tid, content.messageId as string, { markdown: transformText((content.text as string) || (content.markdown as string) || ''), }); return; } if (content.operation === 'reaction' && content.messageId && content.emoji) { await adapter.addReaction(tid, content.messageId as string, content.emoji as string); return; } // Ask question card — render as Card with buttons if (content.type === 'ask_question' && content.questionId && content.options) { const questionId = content.questionId as string; const title = content.title as string; const question = content.question as string; if (!title) { log.error('ask_question missing required title — skipping delivery', { questionId }); return; } const options: NormalizedOption[] = normalizeOptions(content.options as never); const card = Card({ title, children: [ CardText(question), Actions( // Encode button id/value with the option index rather than the // full value. Telegram caps callback_data at 64 bytes, and // long values (e.g. ISO datetimes, URLs) push the JSON payload // well past that. The onAction handlers resolve the index back // to the real value via getAskQuestionRender(questionId). options.map((opt, idx) => Button({ id: `ncq:${questionId}:${idx}`, label: opt.label, value: String(idx) }), ), ), ], }); const result = await adapter.postMessage(tid, { card, fallbackText: `${title}\n\n${question}\nOptions: ${options.map((o) => o.label).join(', ')}`, }); return result?.id; } // Normal message const rawText = (content.markdown as string) || (content.text as string); const text = rawText ? transformText(rawText) : rawText; if (text) { // Attach files if present (FileUpload format: { data, filename }) const fileUploads = message.files?.map((f: { data: Buffer; filename: string }) => ({ data: f.data, filename: f.filename, })); // Split if over the adapter's max length. Files ride on the first // chunk so the head of the reply still carries them. const chunks = config.maxTextLength && text.length > config.maxTextLength ? splitForLimit(text, config.maxTextLength) : [text]; let firstId: string | undefined; for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; const attachFiles = i === 0 && fileUploads && fileUploads.length > 0; const result = await adapter.postMessage( tid, attachFiles ? { markdown: chunk, files: fileUploads } : { markdown: chunk }, ); if (i === 0) firstId = result?.id; } return firstId; } else if (message.files && message.files.length > 0) { // Files only, no text const fileUploads = message.files.map((f: { data: Buffer; filename: string }) => ({ data: f.data, filename: f.filename, })); const result = await adapter.postMessage(tid, { markdown: '', files: fileUploads }); return result?.id; } }, async setTyping(platformId: string, threadId: string | null) { const tid = threadId ?? platformId; await adapter.startTyping(tid); }, async teardown() { gatewayAbort?.abort(); await chat.shutdown(); log.info('Chat SDK bridge shut down', { adapter: adapter.name }); }, isConnected() { return true; }, async subscribe(_platformId: string, threadId: string) { // Chat SDK's subscription state lives on the StateAdapter (not on the // Chat instance itself). SqliteStateAdapter.subscribe is idempotent — // a second call on an already-subscribed thread is a no-op. threadId // is the SDK's thread id, which is what the router already has from // the original inbound event. await state.subscribe(threadId); }, }; // Only expose openDM when the underlying Chat SDK adapter implements it. // Delegate straight to adapter.openDM rather than going through chat.openDM: // the latter dispatches via inferAdapterFromUserId, which only recognizes // Discord snowflakes, Slack U-ids, Teams 29:-ids, and gChat users/-ids, and // throws for everything else (Telegram numeric ids, iMessage, Matrix, …). // Calling adapter.openDM directly also preserves the adapter's native // platform_id encoding via channelIdFromThreadId (e.g. "telegram:"), // which matches what onInbound stores in messaging_groups — avoiding a // duplicate-row / decode-error cascade at delivery time. See user-dm.ts for // the direct-addressable fallback when the adapter has no openDM at all. if (adapter.openDM) { bridge.openDM = async (userHandle: string): Promise => { const threadId = await adapter.openDM!(userHandle); return adapter.channelIdFromThreadId(threadId); }; } return bridge; } /** * Start a local HTTP server to receive forwarded Gateway events. * This is needed because the Gateway listener in webhook-forwarding mode * sends ALL raw events (including INTERACTION_CREATE for button clicks) * to the webhookUrl, which we handle here. */ function startLocalWebhookServer( adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string, ): Promise { return new Promise((resolve) => { const server = http.createServer((req, res) => { const chunks: Buffer[] = []; req.on('data', (chunk: Buffer) => chunks.push(chunk)); req.on('end', () => { const body = Buffer.concat(chunks).toString(); handleForwardedEvent(body, adapter, setupConfig, botToken) .then(() => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end('{"ok":true}'); }) .catch((err) => { log.error('Webhook server error', { err }); res.writeHead(500); res.end('{"error":"internal"}'); }); }); }); server.listen(0, '127.0.0.1', () => { const addr = server.address() as { port: number }; const url = `http://127.0.0.1:${addr.port}/webhook`; log.info('Local webhook server started', { port: addr.port }); resolve(url); }); }); } async function handleForwardedEvent( body: string, adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string, ): Promise { let event: { type: string; data: Record }; try { event = JSON.parse(body); } catch { return; } // Handle interaction events (button clicks) — not handled by adapter's handleForwardedGatewayEvent if (event.type === 'GATEWAY_INTERACTION_CREATE' && event.data) { const interaction = event.data; // type 3 = MessageComponent (button/select) if (interaction.type === 3) { const customId = (interaction.data as Record)?.custom_id as string; const user = (interaction.member as Record)?.user as Record | undefined; const interactionId = interaction.id as string; const interactionToken = interaction.token as string; // Parse the selected option from custom_id let questionId: string | undefined; let tail: string | undefined; if (customId?.startsWith('ncq:')) { const colonIdx = customId.indexOf(':', 4); // after "ncq:" if (colonIdx !== -1) { questionId = customId.slice(4, colonIdx); tail = customId.slice(colonIdx + 1); } } // Update the card to show the selected answer and remove buttons const originalEmbeds = ((interaction.message as Record)?.embeds as Array>) || []; const originalDescription = (originalEmbeds[0]?.description as string) || ''; const render = questionId ? getAskQuestionRender(questionId) : undefined; // Discord custom_id mirrors the new index-based encoding (see Button // construction). Decode back to the real option value for downstream. const selectedOption = resolveSelectedOption(render, tail, tail); const cardTitle = render?.title ?? ((originalEmbeds[0]?.title as string) || '❓ Question'); const matchedOpt = render?.options.find((o) => o.value === selectedOption); const selectedLabel = matchedOpt?.selectedLabel ?? selectedOption ?? customId; try { await fetch(`https://discord.com/api/v10/interactions/${interactionId}/${interactionToken}/callback`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ type: 7, // UPDATE_MESSAGE — acknowledge + update in one call data: { embeds: [ { title: cardTitle, description: `${originalDescription}\n\n${selectedLabel}`, }, ], components: [], // remove buttons }, }), }); } catch (err) { log.error('Failed to update interaction', { err }); } // Dispatch to host if (questionId && selectedOption) { setupConfig.onAction(questionId, selectedOption, user?.id || ''); } return; } } // Forward other events to the adapter's webhook handler for normal processing const fakeRequest = new Request('http://localhost/webhook', { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-discord-gateway-token': botToken || '', }, body, }); await adapter.handleWebhook(fakeRequest, {}); }