From 7201fe503223ed3f1411c95e41a740d3e7eb36f6 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Thu, 9 Apr 2026 00:10:46 +0300 Subject: [PATCH] v2 phase 4: channel adapter interface, registry, and host wiring ChannelAdapter interface with setup/deliver/teardown/setTyping lifecycle. Self-registration pattern via channel-registry. Host wiring in index-v2 bridges inbound messages to routeInbound and outbound delivery to adapters. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/channels/adapter.ts | 79 +++++++++ src/channels/channel-registry.test.ts | 227 ++++++++++++++++++++++++++ src/channels/channel-registry.ts | 72 ++++++++ src/db/index.ts | 1 + src/db/messaging-groups.ts | 6 + src/index-v2.ts | 107 +++++++++++- 6 files changed, 483 insertions(+), 9 deletions(-) create mode 100644 src/channels/adapter.ts create mode 100644 src/channels/channel-registry.test.ts create mode 100644 src/channels/channel-registry.ts diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts new file mode 100644 index 0000000..0bd5edd --- /dev/null +++ b/src/channels/adapter.ts @@ -0,0 +1,79 @@ +/** + * v2 Channel Adapter interface. + * + * Channel adapters bridge NanoClaw with messaging platforms (Discord, Slack, etc.). + * Two patterns: native adapters (implement directly) or Chat SDK bridge (wrap a Chat SDK adapter). + */ + +/** Configuration for a registered conversation (messaging group + agent wiring). */ +export interface ConversationConfig { + platformId: string; + agentGroupId: string; + triggerPattern?: string; // regex string (for native channels) + requiresTrigger: boolean; + sessionMode: 'shared' | 'per-thread'; +} + +/** Passed to the adapter at setup time. */ +export interface ChannelSetup { + /** Known conversations from central DB. */ + conversations: ConversationConfig[]; + + /** Called when an inbound message arrives from the platform. */ + onInbound(platformId: string, threadId: string | null, message: InboundMessage): void; + + /** Called when the adapter discovers metadata about a conversation. */ + onMetadata(platformId: string, name?: string, isGroup?: boolean): void; +} + +/** Inbound message from adapter to host. */ +export interface InboundMessage { + id: string; + kind: 'chat' | 'chat-sdk'; + content: unknown; // JS object — host will JSON.stringify before writing to session DB + timestamp: string; +} + +/** Outbound message from host to adapter. */ +export interface OutboundMessage { + kind: string; + content: unknown; // parsed JSON from messages_out +} + +/** Discovered conversation info (from syncConversations). */ +export interface ConversationInfo { + platformId: string; + name: string; + isGroup: boolean; +} + +/** The v2 channel adapter contract. */ +export interface ChannelAdapter { + name: string; + channelType: string; + + // Lifecycle + setup(config: ChannelSetup): Promise; + teardown(): Promise; + isConnected(): boolean; + + // Outbound delivery + deliver(platformId: string, threadId: string | null, message: OutboundMessage): Promise; + + // Optional + setTyping?(platformId: string, threadId: string | null): Promise; + syncConversations?(): Promise; + updateConversations?(conversations: ConversationConfig[]): void; +} + +/** Factory function that creates a channel adapter (returns null if credentials missing). */ +export type ChannelAdapterFactory = () => ChannelAdapter | null; + +/** Registration entry for a channel adapter. */ +export interface ChannelRegistration { + factory: ChannelAdapterFactory; + containerConfig?: { + mounts?: Array<{ hostPath: string; containerPath: string; readonly: boolean }>; + env?: Record; + }; +} diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts new file mode 100644 index 0000000..4032b7a --- /dev/null +++ b/src/channels/channel-registry.test.ts @@ -0,0 +1,227 @@ +/** + * Tests for the v2 channel adapter registry and integration with host. + */ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import Database from 'better-sqlite3'; +import fs from 'fs'; + +import type { ChannelAdapter, ChannelSetup, InboundMessage, OutboundMessage } from './adapter.js'; + +// Mock container runner +vi.mock('../container-runner-v2.js', () => ({ + wakeContainer: vi.fn().mockResolvedValue(undefined), + resetContainerIdleTimer: vi.fn(), + isContainerRunning: vi.fn().mockReturnValue(false), + getActiveContainerCount: vi.fn().mockReturnValue(0), + killContainer: vi.fn(), +})); + +// Override DATA_DIR for tests +vi.mock('../config.js', async () => { + const actual = await vi.importActual('../config.js'); + return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-channels' }; +}); + +const TEST_DIR = '/tmp/nanoclaw-test-channels'; + +function now() { + return new Date().toISOString(); +} + +/** Create a mock ChannelAdapter for testing. */ +function createMockAdapter(channelType: string): ChannelAdapter & { delivered: OutboundMessage[]; inbound: InboundMessage[] } { + const delivered: OutboundMessage[] = []; + const inbound: InboundMessage[] = []; + let setupConfig: ChannelSetup | null = null; + + return { + name: channelType, + channelType, + delivered, + inbound, + + async setup(config: ChannelSetup) { + setupConfig = config; + }, + + async teardown() { + setupConfig = null; + }, + + isConnected() { + return setupConfig !== null; + }, + + async deliver(_platformId: string, _threadId: string | null, message: OutboundMessage) { + delivered.push(message); + }, + + async setTyping() {}, + + updateConversations() {}, + }; +} + +describe('channel registry', () => { + // Import fresh modules for each test to avoid registry pollution + beforeEach(async () => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + }); + + afterEach(() => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + }); + + it('should register and retrieve channel adapters', async () => { + const { registerChannelAdapter, getRegisteredChannelNames, getChannelContainerConfig } = await import( + './channel-registry.js' + ); + + registerChannelAdapter('test-channel', { + factory: () => createMockAdapter('test'), + containerConfig: { + env: { TEST_KEY: 'value' }, + }, + }); + + expect(getRegisteredChannelNames()).toContain('test-channel'); + expect(getChannelContainerConfig('test-channel')).toEqual({ + env: { TEST_KEY: 'value' }, + }); + }); + + it('should skip adapters that return null (missing credentials)', async () => { + const { registerChannelAdapter, initChannelAdapters, getActiveAdapters } = await import('./channel-registry.js'); + + registerChannelAdapter('no-creds', { + factory: () => null, + }); + + await initChannelAdapters(() => ({ + conversations: [], + onInbound: () => {}, + onMetadata: () => {}, + })); + + // Should not have any active adapters for channels with null factory returns + const active = getActiveAdapters(); + const noCreds = active.find((a) => a.name === 'no-creds'); + expect(noCreds).toBeUndefined(); + }); +}); + +describe('channel + router integration', () => { + beforeEach(async () => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + + const { initTestDb, runMigrations, createAgentGroup, createMessagingGroup, createMessagingGroupAgent } = + await import('../db/index.js'); + const db = initTestDb(); + runMigrations(db); + + createAgentGroup({ + id: 'ag-1', + name: 'Test Agent', + folder: 'test-agent', + is_admin: 0, + agent_provider: null, + container_config: null, + created_at: now(), + }); + createMessagingGroup({ + id: 'mg-1', + channel_type: 'mock', + platform_id: 'chan-100', + name: 'Test Channel', + is_group: 1, + admin_user_id: null, + created_at: now(), + }); + createMessagingGroupAgent({ + id: 'mga-1', + messaging_group_id: 'mg-1', + agent_group_id: 'ag-1', + trigger_rules: null, + response_scope: 'all', + session_mode: 'shared', + priority: 0, + created_at: now(), + }); + }); + + afterEach(async () => { + const { closeDb } = await import('../db/index.js'); + closeDb(); + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + }); + + it('should route inbound message from adapter to session DB', async () => { + const { routeInbound } = await import('../router-v2.js'); + const { findSession } = await import('../db/sessions.js'); + const { sessionDbPath } = await import('../session-manager.js'); + + // Simulate what the adapter bridge does: stringify content, call routeInbound + const inboundContent = { sender: 'TestUser', senderId: 'u1', text: 'Hello from adapter', isFromMe: false }; + + await routeInbound({ + channelType: 'mock', + platformId: 'chan-100', + threadId: null, + message: { + id: 'msg-adapter-1', + kind: 'chat', + content: JSON.stringify(inboundContent), + timestamp: now(), + }, + }); + + // Verify session was created and message written + const session = findSession('mg-1', null); + expect(session).toBeDefined(); + + const dbPath = sessionDbPath('ag-1', session!.id); + const db = new Database(dbPath); + const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{ id: string; content: string }>; + db.close(); + + expect(rows).toHaveLength(1); + expect(JSON.parse(rows[0].content).text).toBe('Hello from adapter'); + }); + + it('should deliver outbound message through delivery adapter bridge', async () => { + const { setDeliveryAdapter } = await import('../delivery.js'); + const { getChannelAdapter, registerChannelAdapter, initChannelAdapters } = await import('./channel-registry.js'); + + // Register and init a mock adapter + const mockAdapter = createMockAdapter('mock'); + registerChannelAdapter('mock-delivery', { + factory: () => mockAdapter, + }); + + await initChannelAdapters((adapter) => ({ + conversations: [], + onInbound: () => {}, + onMetadata: () => {}, + })); + + // Set up delivery adapter bridge (same pattern as index-v2.ts) + setDeliveryAdapter({ + async deliver(channelType, platformId, threadId, kind, content) { + const adapter = getChannelAdapter(channelType); + if (!adapter) return; + await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) }); + }, + }); + + // Simulate delivery + const adapter = getChannelAdapter('mock'); + if (adapter) { + await adapter.deliver('chan-100', null, { kind: 'chat', content: { text: 'Agent response' } }); + } + + expect(mockAdapter.delivered).toHaveLength(1); + expect((mockAdapter.delivered[0].content as { text: string }).text).toBe('Agent response'); + }); +}); diff --git a/src/channels/channel-registry.ts b/src/channels/channel-registry.ts new file mode 100644 index 0000000..d327d33 --- /dev/null +++ b/src/channels/channel-registry.ts @@ -0,0 +1,72 @@ +/** + * v2 Channel adapter registry. + * + * Channels self-register on import. The host calls initChannelAdapters() at startup + * to instantiate and set up all registered adapters. + */ +import type { ChannelAdapter, ChannelRegistration, ChannelSetup } from './adapter.js'; +import { log } from '../log.js'; + +const registry = new Map(); +const activeAdapters = new Map(); + +/** Register a channel adapter factory. Called by channel modules on import. */ +export function registerChannelAdapter(name: string, registration: ChannelRegistration): void { + registry.set(name, registration); +} + +/** Get a live adapter by channel type. */ +export function getChannelAdapter(channelType: string): ChannelAdapter | undefined { + return activeAdapters.get(channelType); +} + +/** Get all active adapters. */ +export function getActiveAdapters(): ChannelAdapter[] { + return [...activeAdapters.values()]; +} + +/** Get all registered channel names. */ +export function getRegisteredChannelNames(): string[] { + return [...registry.keys()]; +} + +/** Get container config for a channel (used by container-runner for additional mounts/env). */ +export function getChannelContainerConfig(name: string): ChannelRegistration['containerConfig'] { + return registry.get(name)?.containerConfig; +} + +/** + * Instantiate and set up all registered channel adapters. + * Skips adapters that return null (missing credentials). + */ +export async function initChannelAdapters(setupFn: (adapter: ChannelAdapter) => ChannelSetup): Promise { + for (const [name, registration] of registry) { + try { + const adapter = registration.factory(); + if (!adapter) { + log.warn('Channel credentials missing, skipping', { channel: name }); + continue; + } + + const setup = setupFn(adapter); + await adapter.setup(setup); + activeAdapters.set(adapter.channelType, adapter); + log.info('Channel adapter started', { channel: name, type: adapter.channelType }); + } catch (err) { + log.error('Failed to start channel adapter', { channel: name, err }); + } + } +} + +/** Tear down all active adapters. */ +export async function teardownChannelAdapters(): Promise { + for (const [name, adapter] of activeAdapters) { + try { + await adapter.teardown(); + log.info('Channel adapter stopped', { channel: name }); + } catch (err) { + log.error('Failed to stop channel adapter', { channel: name, err }); + } + } + activeAdapters.clear(); +} diff --git a/src/db/index.ts b/src/db/index.ts index 35645cb..33b3a94 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -14,6 +14,7 @@ export { getMessagingGroup, getMessagingGroupByPlatform, getAllMessagingGroups, + getMessagingGroupsByChannel, updateMessagingGroup, deleteMessagingGroup, createMessagingGroupAgent, diff --git a/src/db/messaging-groups.ts b/src/db/messaging-groups.ts index 40a9702..5d431f9 100644 --- a/src/db/messaging-groups.ts +++ b/src/db/messaging-groups.ts @@ -26,6 +26,12 @@ export function getAllMessagingGroups(): MessagingGroup[] { return getDb().prepare('SELECT * FROM messaging_groups ORDER BY name').all() as MessagingGroup[]; } +export function getMessagingGroupsByChannel(channelType: string): MessagingGroup[] { + return getDb() + .prepare('SELECT * FROM messaging_groups WHERE channel_type = ?') + .all(channelType) as MessagingGroup[]; +} + export function updateMessagingGroup( id: string, updates: Partial>, diff --git a/src/index-v2.ts b/src/index-v2.ts index 07da575..eb2428b 100644 --- a/src/index-v2.ts +++ b/src/index-v2.ts @@ -1,19 +1,31 @@ /** * NanoClaw v2 — main entry point. * - * Thin orchestrator: init DB, run migrations, start delivery polls, start sweep. - * Channel adapters are started separately (Phase 4). + * Thin orchestrator: init DB, run migrations, start channel adapters, + * start delivery polls, start sweep, handle shutdown. */ import path from 'path'; import { DATA_DIR } from './config.js'; import { initDb } from './db/connection.js'; import { runMigrations } from './db/migrations/index.js'; +import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messaging-groups.js'; import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js'; -import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter } from './delivery.js'; -import { startHostSweep } from './host-sweep.js'; +import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js'; +import { startHostSweep, stopHostSweep } from './host-sweep.js'; +import { routeInbound } from './router-v2.js'; import { log } from './log.js'; +// Channel imports — each triggers self-registration +// import './channels/discord-v2.js'; + +import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js'; +import { + initChannelAdapters, + teardownChannelAdapters, + getChannelAdapter, +} from './channels/channel-registry.js'; + async function main(): Promise { log.info('NanoClaw v2 starting'); @@ -27,22 +39,99 @@ async function main(): Promise { ensureContainerRuntimeRunning(); cleanupOrphans(); - // 3. Channel adapters (Phase 4 — placeholder) - // TODO: init channel adapters, set up delivery adapter - // setDeliveryAdapter({ deliver: async (...) => { ... } }); + // 3. Channel adapters + await initChannelAdapters((adapter: ChannelAdapter): ChannelSetup => { + const conversations = buildConversationConfigs(adapter.channelType); + return { + conversations, + onInbound(platformId, threadId, message) { + routeInbound({ + channelType: adapter.channelType, + platformId, + threadId, + message: { + id: message.id, + kind: message.kind, + content: JSON.stringify(message.content), + timestamp: message.timestamp, + }, + }).catch((err) => { + log.error('Failed to route inbound message', { channelType: adapter.channelType, err }); + }); + }, + onMetadata(platformId, name, isGroup) { + log.info('Channel metadata discovered', { + channelType: adapter.channelType, + platformId, + name, + isGroup, + }); + }, + }; + }); - // 4. Start delivery polls + // 4. Delivery adapter bridge — dispatches to channel adapters + setDeliveryAdapter({ + async deliver(channelType, platformId, threadId, kind, content) { + const adapter = getChannelAdapter(channelType); + if (!adapter) { + log.warn('No adapter for channel type', { channelType }); + return; + } + await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) }); + }, + async setTyping(channelType, platformId, threadId) { + const adapter = getChannelAdapter(channelType); + await adapter?.setTyping?.(platformId, threadId); + }, + }); + + // 5. Start delivery polls startActiveDeliveryPoll(); startSweepDeliveryPoll(); log.info('Delivery polls started'); - // 5. Start host sweep + // 6. Start host sweep startHostSweep(); log.info('Host sweep started'); log.info('NanoClaw v2 running'); } +/** Build ConversationConfig[] for a channel type from the central DB. */ +function buildConversationConfigs(channelType: string): ConversationConfig[] { + const groups = getMessagingGroupsByChannel(channelType); + const configs: ConversationConfig[] = []; + + for (const mg of groups) { + const agents = getMessagingGroupAgents(mg.id); + for (const agent of agents) { + const triggerRules = agent.trigger_rules ? JSON.parse(agent.trigger_rules) : null; + configs.push({ + platformId: mg.platform_id, + agentGroupId: agent.agent_group_id, + triggerPattern: triggerRules?.pattern, + requiresTrigger: triggerRules?.requiresTrigger ?? false, + sessionMode: agent.session_mode, + }); + } + } + + return configs; +} + +/** Graceful shutdown. */ +async function shutdown(signal: string): Promise { + log.info('Shutdown signal received', { signal }); + stopDeliveryPolls(); + stopHostSweep(); + await teardownChannelAdapters(); + process.exit(0); +} + +process.on('SIGTERM', () => shutdown('SIGTERM')); +process.on('SIGINT', () => shutdown('SIGINT')); + main().catch((err) => { log.fatal('Startup failed', { err }); process.exit(1);