Files
nanoclaw/src/channels/chat-sdk-bridge.ts
Gabi Simons 9476a80ab0 feat(v2): shared webhook server for webhook-based channel adapters
Adds a shared HTTP server (port 3000, configurable via WEBHOOK_PORT)
that routes incoming webhooks to the correct Chat SDK adapter by path
(e.g. /api/webhooks/slack, /api/webhooks/teams). Required by Slack,
Teams, GitHub, Linear, and other non-gateway adapters.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 12:33:45 +00:00

611 lines
23 KiB
TypeScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Chat SDK bridge — wraps a Chat SDK adapter + Chat instance
* to conform to the NanoClaw ChannelAdapter interface.
*
* Used by Discord, Slack, and other Chat SDK-supported platforms.
*/
import http from 'http';
import {
Chat,
Card,
CardText,
Actions,
Button,
Modal,
TextInput,
markdownToPlainText,
type Adapter,
type ConcurrencyStrategy,
type Message as ChatMessage,
} from 'chat';
import { ValidationError } from '@chat-adapter/shared';
import { log } from '../log.js';
import { SqliteStateAdapter } from '../state-sqlite.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js';
/** Adapter with optional gateway support (e.g., Discord). */
interface GatewayAdapter extends Adapter {
startGatewayListener?(
options: { waitUntil?: (task: Promise<unknown>) => void },
durationMs?: number,
abortSignal?: AbortSignal,
webhookUrl?: string,
): Promise<Response>;
}
/** Reply context extracted from a platform's raw message. */
export interface ReplyContext {
text: string;
sender: string;
}
/** Extract reply context from a platform-specific raw message. Return null if no reply. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ReplyContextExtractor = (raw: Record<string, any>) => ReplyContext | null;
export interface ChatSdkBridgeConfig {
adapter: Adapter;
concurrency?: ConcurrencyStrategy;
/** Bot token for authenticating forwarded Gateway events (required for interaction handling). */
botToken?: string;
/** Platform-specific reply context extraction. */
extractReplyContext?: ReplyContextExtractor;
/**
* Whether this platform uses threads as the primary conversation unit.
* See `ChannelAdapter.supportsThreads`. Declared by the calling channel
* skill, not inferred, because some platforms (Discord) can be used either
* way and the default depends on installation style.
*/
supportsThreads: boolean;
}
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
const { adapter } = config;
let chat: Chat;
let state: SqliteStateAdapter;
let setupConfig: ChannelSetup;
let conversations: Map<string, ConversationConfig>;
let gatewayAbort: AbortController | null = null;
function buildConversationMap(configs: ConversationConfig[]): Map<string, ConversationConfig> {
const map = new Map<string, ConversationConfig>();
for (const conv of configs) {
map.set(conv.platformId, conv);
}
return map;
}
async function messageToInbound(message: ChatMessage): Promise<InboundMessage> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const serialized = message.toJSON() as Record<string, any>;
// Download attachment data before serialization loses fetchData()
if (message.attachments && message.attachments.length > 0) {
const enriched = [];
for (const att of message.attachments) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const entry: Record<string, any> = {
type: att.type,
name: att.name,
mimeType: att.mimeType,
size: att.size,
width: (att as unknown as Record<string, unknown>).width,
height: (att as unknown as Record<string, unknown>).height,
};
if (att.fetchData) {
try {
const buffer = await att.fetchData();
entry.data = buffer.toString('base64');
} catch (err) {
log.warn('Failed to download attachment', { type: att.type, err });
}
}
enriched.push(entry);
}
serialized.attachments = enriched;
}
// Extract reply context via platform-specific hook
if (config.extractReplyContext && message.raw) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const replyTo = config.extractReplyContext(message.raw as Record<string, any>);
if (replyTo) serialized.replyTo = replyTo;
}
// Drop raw to save DB space (can be very large)
serialized.raw = undefined;
return {
id: message.id,
kind: 'chat-sdk',
content: serialized,
timestamp: message.metadata.dateSent.toISOString(),
};
}
return {
name: adapter.name,
channelType: adapter.name,
supportsThreads: config.supportsThreads,
async setup(hostConfig: ChannelSetup) {
setupConfig = hostConfig;
conversations = buildConversationMap(hostConfig.conversations);
state = new SqliteStateAdapter();
chat = new Chat({
adapters: { [adapter.name]: adapter },
userName: adapter.userName || 'NanoClaw',
concurrency: config.concurrency ?? 'concurrent',
state,
logger: 'silent',
});
// Subscribed threads — forward all messages
chat.onSubscribedMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
});
// @mention in unsubscribed thread — forward + subscribe
chat.onNewMention(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
await thread.subscribe();
});
// DMs — always forward + subscribe
chat.onDirectMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, null, await messageToInbound(message));
await thread.subscribe();
});
// Handle button clicks (ask_user_question, credential card)
chat.onAction(async (event) => {
// Credential card actions: nccr:<credentialId>:<enter|reject>
if (event.actionId.startsWith('nccr:')) {
const [, credentialId, subAction] = event.actionId.split(':');
if (!credentialId || !subAction) return;
if (subAction === 'reject') {
try {
await adapter.editMessage(event.threadId, event.messageId, {
markdown: `🔑 Credential request\n\n❌ Rejected`,
});
} catch (err) {
log.warn('Failed to update credential card after reject', { err });
}
setupConfig.onCredentialReject?.(credentialId);
return;
}
if (subAction === 'enter') {
const pending = setupConfig.getCredentialForModal?.(credentialId);
if (!pending) {
log.warn('Credential card clicked but row not pending', { credentialId });
return;
}
try {
const modalChildren = [
CardText(pending.description ?? `Enter the value for ${pending.name} (host: ${pending.hostPattern}).`),
TextInput({
id: 'value',
label: pending.name,
placeholder: 'Paste your credential value',
}),
];
// Modal children include a text element for context; the SDK
// accepts TextElement in ModalChild so this is valid.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const modal = Modal({
callbackId: `nccm:${credentialId}`,
title: 'Enter credential',
submitLabel: 'Save',
// eslint-disable-next-line @typescript-eslint/no-explicit-any
children: modalChildren as any,
});
const result = await event.openModal(modal);
if (!result) {
log.warn('openModal returned undefined — channel unsupported', { credentialId });
setupConfig.onCredentialChannelUnsupported?.(credentialId);
try {
await adapter.editMessage(event.threadId, event.messageId, {
markdown: `🔑 Credential request\n\n⚠ This channel does not support modals.`,
});
} catch {
// best effort
}
}
} catch (err) {
log.error('Failed to open credential modal', { credentialId, err });
setupConfig.onCredentialChannelUnsupported?.(credentialId);
}
return;
}
return;
}
if (!event.actionId.startsWith('ncq:')) return;
const parts = event.actionId.split(':');
if (parts.length < 3) return;
const questionId = parts[1];
const selectedOption = event.value || '';
const userId = event.user?.userId || '';
// Update the card to show the selected answer and remove buttons
try {
const tid = event.threadId;
await adapter.editMessage(tid, event.messageId, {
markdown: `❓ **Question**\n\n${selectedOption ? `✅ **${selectedOption}**` : '(clicked)'}`,
});
} catch (err) {
log.warn('Failed to update card after action', { err });
}
setupConfig.onAction(questionId, selectedOption, userId);
});
// Modal submissions for credential collection
chat.onModalSubmit(async (event) => {
if (!event.callbackId.startsWith('nccm:')) return;
const credentialId = event.callbackId.slice('nccm:'.length);
const value = event.values?.value ?? '';
if (!value) {
log.warn('Credential modal submitted with empty value', { credentialId });
return;
}
setupConfig.onCredentialSubmit?.(credentialId, value);
});
await chat.initialize();
// Start Gateway listener for adapters that support it (e.g., Discord)
const gatewayAdapter = adapter as GatewayAdapter;
if (gatewayAdapter.startGatewayListener) {
gatewayAbort = new AbortController();
// Start local HTTP server to receive forwarded Gateway events (including interactions)
const webhookUrl = await startLocalWebhookServer(gatewayAdapter, setupConfig, config.botToken);
const startGateway = () => {
if (gatewayAbort?.signal.aborted) return;
// Capture the long-running listener promise via waitUntil
let listenerPromise: Promise<unknown> | undefined;
gatewayAdapter.startGatewayListener!(
{
waitUntil: (p: Promise<unknown>) => {
listenerPromise = p;
},
},
24 * 60 * 60 * 1000,
gatewayAbort!.signal,
webhookUrl,
).then(() => {
// startGatewayListener resolves immediately with a Response;
// the actual work is in the listenerPromise passed to waitUntil
if (listenerPromise) {
listenerPromise
.then(() => {
if (!gatewayAbort?.signal.aborted) {
log.info('Gateway listener expired, restarting', { adapter: adapter.name });
startGateway();
}
})
.catch((err) => {
if (!gatewayAbort?.signal.aborted) {
log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err });
setTimeout(startGateway, 5000);
}
});
}
});
};
startGateway();
log.info('Gateway listener started', { adapter: adapter.name });
} else {
// Non-gateway adapters (Slack, Teams, GitHub, etc.) — register on the shared webhook server
const webhookPath = `/api/webhooks/${adapter.name}`;
registerWebhookAdapter(webhookPath, adapter);
log.info('Webhook adapter registered', { adapter: adapter.name, path: webhookPath });
}
log.info('Chat SDK bridge initialized', { adapter: adapter.name });
},
async deliver(platformId: string, threadId: string | null, message): Promise<string | undefined> {
// platformId is already in the adapter's encoded format (e.g. "telegram:6037840640",
// "discord:guildId:channelId") — use it directly as the thread ID
const tid = threadId ?? platformId;
const content = message.content as Record<string, unknown>;
if (content.operation === 'edit' && content.messageId) {
await adapter.editMessage(tid, content.messageId as string, {
markdown: (content.text as string) || (content.markdown as string) || '',
});
return;
}
if (content.operation === 'reaction' && content.messageId && content.emoji) {
await adapter.addReaction(tid, content.messageId as string, content.emoji as string);
return;
}
// Ask question card — render as Card with buttons
if (content.type === 'ask_question' && content.questionId && content.options) {
const questionId = content.questionId as string;
const options = content.options as string[];
const card = Card({
title: '❓ Question',
children: [
CardText(content.question as string),
Actions(options.map((opt) => Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }))),
],
});
const result = await adapter.postMessage(tid, {
card,
fallbackText: `${content.question}\nOptions: ${options.join(', ')}`,
});
return result?.id;
}
// Credential request card — buttons open a modal for secure input
if (content.type === 'credential_request' && content.credentialId) {
const credentialId = content.credentialId as string;
const card = Card({
title: '🔑 Credential request',
children: [
CardText(content.question as string),
Actions([
Button({ id: `nccr:${credentialId}:enter`, label: 'Enter credential', value: 'enter' }),
Button({ id: `nccr:${credentialId}:reject`, label: 'Reject', value: 'reject' }),
]),
],
});
const result = await adapter.postMessage(tid, {
card,
fallbackText: `Credential request — open in a channel that supports modals.`,
});
return result?.id;
}
// Normal message
const text = (content.markdown as string) || (content.text as string);
if (text) {
// Attach files if present (FileUpload format: { data, filename })
const fileUploads = message.files?.map((f: { data: Buffer; filename: string }) => ({
data: f.data,
filename: f.filename,
}));
try {
if (fileUploads && fileUploads.length > 0) {
const result = await adapter.postMessage(tid, { markdown: text, files: fileUploads });
return result?.id;
} else {
const result = await adapter.postMessage(tid, { markdown: text });
return result?.id;
}
} catch (err) {
// Permanent formatting failure (e.g. Telegram MarkdownV2 entity parse error):
// retry once as plain text so the queue isn't blocked forever.
if (err instanceof ValidationError) {
log.warn('Markdown rejected by adapter, retrying as plain text', {
adapter: adapter.name,
err: err.message,
});
const plain = markdownToPlainText(text);
const result = await adapter.postMessage(tid, plain);
if (fileUploads && fileUploads.length > 0) {
await adapter.postMessage(tid, { markdown: '', files: fileUploads });
}
return result?.id;
}
throw err;
}
} else if (message.files && message.files.length > 0) {
// Files only, no text
const fileUploads = message.files.map((f: { data: Buffer; filename: string }) => ({
data: f.data,
filename: f.filename,
}));
const result = await adapter.postMessage(tid, { markdown: '', files: fileUploads });
return result?.id;
}
},
async setTyping(platformId: string, threadId: string | null) {
const tid = threadId ?? platformId;
await adapter.startTyping(tid);
},
async teardown() {
gatewayAbort?.abort();
await chat.shutdown();
log.info('Chat SDK bridge shut down', { adapter: adapter.name });
},
isConnected() {
return true;
},
updateConversations(configs: ConversationConfig[]) {
conversations = buildConversationMap(configs);
},
};
}
/**
* Start a local HTTP server to receive forwarded Gateway events.
* This is needed because the Gateway listener in webhook-forwarding mode
* sends ALL raw events (including INTERACTION_CREATE for button clicks)
* to the webhookUrl, which we handle here.
*/
function startLocalWebhookServer(
adapter: GatewayAdapter,
setupConfig: ChannelSetup,
botToken?: string,
): Promise<string> {
return new Promise((resolve) => {
const server = http.createServer((req, res) => {
const chunks: Buffer[] = [];
req.on('data', (chunk: Buffer) => chunks.push(chunk));
req.on('end', () => {
const body = Buffer.concat(chunks).toString();
handleForwardedEvent(body, adapter, setupConfig, botToken)
.then(() => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end('{"ok":true}');
})
.catch((err) => {
log.error('Webhook server error', { err });
res.writeHead(500);
res.end('{"error":"internal"}');
});
});
});
server.listen(0, '127.0.0.1', () => {
const addr = server.address() as { port: number };
const url = `http://127.0.0.1:${addr.port}/webhook`;
log.info('Local webhook server started', { port: addr.port });
resolve(url);
});
});
}
async function handleForwardedEvent(
body: string,
adapter: GatewayAdapter,
setupConfig: ChannelSetup,
botToken?: string,
): Promise<void> {
let event: { type: string; data: Record<string, unknown> };
try {
event = JSON.parse(body);
} catch {
return;
}
// Handle interaction events (button clicks) — not handled by adapter's handleForwardedGatewayEvent
if (event.type === 'GATEWAY_INTERACTION_CREATE' && event.data) {
const interaction = event.data;
// type 3 = MessageComponent (button/select)
if (interaction.type === 3) {
const customId = (interaction.data as Record<string, unknown>)?.custom_id as string;
const user = (interaction.member as Record<string, unknown>)?.user as Record<string, string> | undefined;
const interactionId = interaction.id as string;
const interactionToken = interaction.token as string;
// Parse the selected option from custom_id
let questionId: string | undefined;
let selectedOption: string | undefined;
if (customId?.startsWith('ncq:')) {
const colonIdx = customId.indexOf(':', 4); // after "ncq:"
if (colonIdx !== -1) {
questionId = customId.slice(4, colonIdx);
selectedOption = customId.slice(colonIdx + 1);
}
}
// Update the card to show the selected answer and remove buttons
const originalEmbeds =
((interaction.message as Record<string, unknown>)?.embeds as Array<Record<string, unknown>>) || [];
const originalDescription = (originalEmbeds[0]?.description as string) || '';
try {
await fetch(`https://discord.com/api/v10/interactions/${interactionId}/${interactionToken}/callback`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
type: 7, // UPDATE_MESSAGE — acknowledge + update in one call
data: {
embeds: [
{
title: '❓ Question',
description: `${originalDescription}\n\n✅ **${selectedOption || customId}**`,
},
],
components: [], // remove buttons
},
}),
});
} catch (err) {
log.error('Failed to update interaction', { err });
}
// Dispatch to host
if (questionId && selectedOption) {
setupConfig.onAction(questionId, selectedOption, user?.id || '');
}
return;
}
}
// Forward other events to the adapter's webhook handler for normal processing
const fakeRequest = new Request('http://localhost/webhook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-discord-gateway-token': botToken || '',
},
body,
});
await adapter.handleWebhook(fakeRequest, {});
}
/**
* Shared public webhook server for all webhook-based adapters.
* Each adapter registers a path (e.g. /api/webhooks/slack, /api/webhooks/teams).
* The server listens on a single port (default 3000, configurable via WEBHOOK_PORT env var).
*/
const webhookAdapters = new Map<string, Adapter>();
let sharedWebhookServer: http.Server | null = null;
function registerWebhookAdapter(path: string, adapter: Adapter): void {
webhookAdapters.set(path, adapter);
if (!sharedWebhookServer) {
const port = parseInt(process.env.WEBHOOK_PORT || '3000', 10);
sharedWebhookServer = http.createServer((req, res) => {
const matchedAdapter = req.url ? webhookAdapters.get(req.url) : undefined;
if (req.method === 'POST' && matchedAdapter) {
const chunks: Buffer[] = [];
req.on('data', (chunk: Buffer) => chunks.push(chunk));
req.on('end', async () => {
try {
const body = Buffer.concat(chunks).toString();
const headers: Record<string, string> = {};
for (const [key, val] of Object.entries(req.headers)) {
if (typeof val === 'string') headers[key] = val;
}
const request = new Request(`http://localhost${req.url}`, {
method: 'POST',
headers,
body,
});
const response = await matchedAdapter.handleWebhook!(request, {
waitUntil: (p: Promise<unknown>) => { p.catch(() => {}); },
});
const responseBody = await response.text();
const responseHeaders: Record<string, string> = { 'Content-Type': 'application/json' };
response.headers.forEach((v, k) => { responseHeaders[k] = v; });
res.writeHead(response.status, responseHeaders);
res.end(responseBody);
} catch (err) {
log.error('Webhook handler error', { url: req.url, err });
res.writeHead(500);
res.end('{"error":"internal"}');
}
});
} else {
res.writeHead(404);
res.end('Not found');
}
});
sharedWebhookServer.listen(port, '0.0.0.0', () => {
log.info('Shared webhook server started', { port, paths: [...webhookAdapters.keys()] });
});
}
}