v2 phase 4+5: Discord via Chat SDK, expanded MCP tools, message seq IDs

- Chat SDK bridge + Discord adapter (gateway listener, message routing)
- MCP tools refactored into modular structure: core (send_message, send_file,
  edit_message, add_reaction), scheduling (schedule/list/cancel/pause/resume
  tasks), interactive (ask_user_question, send_card), agents (send_to_agent)
- Message seq IDs: shared integer sequence across messages_in/out so agents
  see small numeric IDs instead of platform snowflakes
- busy_timeout=5000 for session DB (poll loop + MCP server concurrent access)
- Always copy agent-runner source to fix stale cache when non-index files change
- Seed script for Discord testing, e2e test script

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-09 02:53:39 +03:00
parent b36f127acc
commit afbc20a6c4
21 changed files with 2702 additions and 37 deletions

View File

@@ -0,0 +1,189 @@
/**
* Chat SDK bridge — wraps a Chat SDK adapter + Chat instance
* to conform to the NanoClaw ChannelAdapter interface.
*
* Used by Discord, Slack, and other Chat SDK-supported platforms.
*/
import { Chat, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage } from 'chat';
import { createMemoryState } from '@chat-adapter/state-memory';
import { log } from '../log.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js';
/** Adapter with optional gateway support (e.g., Discord). */
interface GatewayAdapter extends Adapter {
startGatewayListener?(
options: { waitUntil?: (task: Promise<unknown>) => void },
durationMs?: number,
abortSignal?: AbortSignal,
): Promise<Response>;
}
export interface ChatSdkBridgeConfig {
adapter: GatewayAdapter;
concurrency?: ConcurrencyStrategy;
}
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
const { adapter } = config;
let chat: Chat;
let state: ReturnType<typeof createMemoryState>;
let setupConfig: ChannelSetup;
let conversations: Map<string, ConversationConfig>;
let gatewayAbort: AbortController | null = null;
function buildConversationMap(configs: ConversationConfig[]): Map<string, ConversationConfig> {
const map = new Map<string, ConversationConfig>();
for (const conv of configs) {
map.set(conv.platformId, conv);
}
return map;
}
function messageToInbound(message: ChatMessage): InboundMessage {
return {
id: message.id,
kind: 'chat-sdk',
content: message.toJSON(),
timestamp: message.metadata.dateSent.toISOString(),
};
}
return {
name: adapter.name,
channelType: adapter.name,
async setup(hostConfig: ChannelSetup) {
setupConfig = hostConfig;
conversations = buildConversationMap(hostConfig.conversations);
state = createMemoryState();
chat = new Chat({
adapters: { [adapter.name]: adapter },
userName: adapter.userName || 'NanoClaw',
concurrency: config.concurrency ?? 'concurrent',
state,
logger: 'silent',
});
// Subscribed threads — forward all messages
chat.onSubscribedMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, thread.id, messageToInbound(message));
});
// @mention in unsubscribed thread — forward + subscribe
chat.onNewMention(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, thread.id, messageToInbound(message));
await thread.subscribe();
});
// DMs — always forward + subscribe
chat.onDirectMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, null, messageToInbound(message));
await thread.subscribe();
});
await chat.initialize();
// Subscribe registered conversations (after initialize connects state)
for (const conv of hostConfig.conversations) {
if (conv.agentGroupId) {
const threadId = adapter.encodeThreadId({ guildId: '', channelId: conv.platformId } as never);
await state.subscribe(threadId);
}
}
// Start Gateway listener for adapters that support it (e.g., Discord)
if (adapter.startGatewayListener) {
gatewayAbort = new AbortController();
const startGateway = () => {
if (gatewayAbort?.signal.aborted) return;
// Capture the long-running listener promise via waitUntil
let listenerPromise: Promise<unknown> | undefined;
adapter
.startGatewayListener!(
{ waitUntil: (p: Promise<unknown>) => { listenerPromise = p; } },
24 * 60 * 60 * 1000,
gatewayAbort!.signal,
)
.then(() => {
// startGatewayListener resolves immediately with a Response;
// the actual work is in the listenerPromise passed to waitUntil
if (listenerPromise) {
listenerPromise
.then(() => {
if (!gatewayAbort?.signal.aborted) {
log.info('Gateway listener expired, restarting', { adapter: adapter.name });
startGateway();
}
})
.catch((err) => {
if (!gatewayAbort?.signal.aborted) {
log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err });
setTimeout(startGateway, 5000);
}
});
}
});
};
startGateway();
log.info('Gateway listener started', { adapter: adapter.name });
}
log.info('Chat SDK bridge initialized', { adapter: adapter.name });
},
async deliver(platformId: string, threadId: string | null, message) {
const tid = threadId ?? adapter.encodeThreadId({ guildId: '', channelId: platformId } as never);
const content = message.content as Record<string, unknown>;
if (content.operation === 'edit' && content.messageId) {
await adapter.editMessage(tid, content.messageId as string, {
markdown: (content.text as string) || (content.markdown as string) || '',
});
return;
}
if (content.operation === 'reaction' && content.messageId && content.emoji) {
await adapter.addReaction(tid, content.messageId as string, content.emoji as string);
return;
}
// Normal message
const text = (content.markdown as string) || (content.text as string);
if (text) {
await adapter.postMessage(tid, { markdown: text });
}
},
async setTyping(platformId: string, threadId: string | null) {
const tid = threadId ?? adapter.encodeThreadId({ guildId: '', channelId: platformId } as never);
await adapter.startTyping(tid);
},
async teardown() {
gatewayAbort?.abort();
await chat.shutdown();
log.info('Chat SDK bridge shut down', { adapter: adapter.name });
},
isConnected() {
return true;
},
updateConversations(configs: ConversationConfig[]) {
conversations = buildConversationMap(configs);
// Subscribe new conversations
for (const conv of configs) {
if (conv.agentGroupId) {
const threadId = adapter.encodeThreadId({ guildId: '', channelId: conv.platformId } as never);
state.subscribe(threadId).catch(() => {});
}
}
},
};
}

View File

@@ -0,0 +1,22 @@
/**
* Discord channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createDiscordAdapter } from '@chat-adapter/discord';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('discord', {
factory: () => {
const env = readEnvFile(['DISCORD_BOT_TOKEN', 'DISCORD_PUBLIC_KEY', 'DISCORD_APPLICATION_ID']);
if (!env.DISCORD_BOT_TOKEN) return null;
const discordAdapter = createDiscordAdapter({
botToken: env.DISCORD_BOT_TOKEN,
publicKey: env.DISCORD_PUBLIC_KEY,
applicationId: env.DISCORD_APPLICATION_ID,
});
return createChatSdkBridge({ adapter: discordAdapter, concurrency: 'concurrent' });
},
});

View File

@@ -185,15 +185,8 @@ function buildMounts(agentGroup: AgentGroup, session: Session): VolumeMount[] {
const agentRunnerSrc = path.join(projectRoot, 'container', 'agent-runner', 'src');
const groupRunnerDir = path.join(DATA_DIR, 'v2-sessions', agentGroup.id, 'agent-runner-src');
if (fs.existsSync(agentRunnerSrc)) {
const srcIndex = path.join(agentRunnerSrc, 'index-v2.ts');
const cachedIndex = path.join(groupRunnerDir, 'index-v2.ts');
const needsCopy =
!fs.existsSync(groupRunnerDir) ||
!fs.existsSync(cachedIndex) ||
fs.statSync(srcIndex).mtimeMs > fs.statSync(cachedIndex).mtimeMs;
if (needsCopy) {
fs.cpSync(agentRunnerSrc, groupRunnerDir, { recursive: true });
}
// Always copy — source files may have changed beyond just the index
fs.cpSync(agentRunnerSrc, groupRunnerDir, { recursive: true });
}
mounts.push({ hostPath: groupRunnerDir, containerPath: '/app/src', readonly: false });

View File

@@ -74,6 +74,7 @@ CREATE TABLE pending_questions (
export const SESSION_SCHEMA = `
CREATE TABLE messages_in (
id TEXT PRIMARY KEY,
seq INTEGER UNIQUE,
kind TEXT NOT NULL,
timestamp TEXT NOT NULL,
status TEXT DEFAULT 'pending',
@@ -89,6 +90,7 @@ CREATE TABLE messages_in (
CREATE TABLE messages_out (
id TEXT PRIMARY KEY,
seq INTEGER UNIQUE,
in_reply_to TEXT,
timestamp TEXT NOT NULL,
delivered INTEGER DEFAULT 0,

View File

@@ -17,7 +17,7 @@ import { routeInbound } from './router-v2.js';
import { log } from './log.js';
// Channel imports — each triggers self-registration
// import './channels/discord-v2.js';
import './channels/discord-v2.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js';
import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js';

View File

@@ -108,11 +108,23 @@ export function writeSessionMessage(
db.pragma('journal_mode = DELETE');
try {
const nextSeq = (
db
.prepare(
`SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM (
SELECT seq FROM messages_in WHERE seq IS NOT NULL
UNION ALL
SELECT seq FROM messages_out WHERE seq IS NOT NULL
)`,
)
.get() as { next: number }
).next;
db.prepare(
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence)
VALUES (@id, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`,
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence)
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`,
).run({
id: message.id,
seq: nextSeq,
kind: message.kind,
timestamp: message.timestamp,
platformId: message.platformId ?? null,