v2 phase 4: channel adapter interface, registry, and host wiring

ChannelAdapter interface with setup/deliver/teardown/setTyping lifecycle.
Self-registration pattern via channel-registry. Host wiring in index-v2
bridges inbound messages to routeInbound and outbound delivery to adapters.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-09 00:10:46 +03:00
parent d35386a46e
commit 7201fe5032
6 changed files with 483 additions and 9 deletions

79
src/channels/adapter.ts Normal file
View File

@@ -0,0 +1,79 @@
/**
* v2 Channel Adapter interface.
*
* Channel adapters bridge NanoClaw with messaging platforms (Discord, Slack, etc.).
* Two patterns: native adapters (implement directly) or Chat SDK bridge (wrap a Chat SDK adapter).
*/
/** Configuration for a registered conversation (messaging group + agent wiring). */
export interface ConversationConfig {
platformId: string;
agentGroupId: string;
triggerPattern?: string; // regex string (for native channels)
requiresTrigger: boolean;
sessionMode: 'shared' | 'per-thread';
}
/** Passed to the adapter at setup time. */
export interface ChannelSetup {
/** Known conversations from central DB. */
conversations: ConversationConfig[];
/** Called when an inbound message arrives from the platform. */
onInbound(platformId: string, threadId: string | null, message: InboundMessage): void;
/** Called when the adapter discovers metadata about a conversation. */
onMetadata(platformId: string, name?: string, isGroup?: boolean): void;
}
/** Inbound message from adapter to host. */
export interface InboundMessage {
id: string;
kind: 'chat' | 'chat-sdk';
content: unknown; // JS object — host will JSON.stringify before writing to session DB
timestamp: string;
}
/** Outbound message from host to adapter. */
export interface OutboundMessage {
kind: string;
content: unknown; // parsed JSON from messages_out
}
/** Discovered conversation info (from syncConversations). */
export interface ConversationInfo {
platformId: string;
name: string;
isGroup: boolean;
}
/** The v2 channel adapter contract. */
export interface ChannelAdapter {
name: string;
channelType: string;
// Lifecycle
setup(config: ChannelSetup): Promise<void>;
teardown(): Promise<void>;
isConnected(): boolean;
// Outbound delivery
deliver(platformId: string, threadId: string | null, message: OutboundMessage): Promise<void>;
// Optional
setTyping?(platformId: string, threadId: string | null): Promise<void>;
syncConversations?(): Promise<ConversationInfo[]>;
updateConversations?(conversations: ConversationConfig[]): void;
}
/** Factory function that creates a channel adapter (returns null if credentials missing). */
export type ChannelAdapterFactory = () => ChannelAdapter | null;
/** Registration entry for a channel adapter. */
export interface ChannelRegistration {
factory: ChannelAdapterFactory;
containerConfig?: {
mounts?: Array<{ hostPath: string; containerPath: string; readonly: boolean }>;
env?: Record<string, string>;
};
}

View File

@@ -0,0 +1,227 @@
/**
* Tests for the v2 channel adapter registry and integration with host.
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import Database from 'better-sqlite3';
import fs from 'fs';
import type { ChannelAdapter, ChannelSetup, InboundMessage, OutboundMessage } from './adapter.js';
// Mock container runner
vi.mock('../container-runner-v2.js', () => ({
wakeContainer: vi.fn().mockResolvedValue(undefined),
resetContainerIdleTimer: vi.fn(),
isContainerRunning: vi.fn().mockReturnValue(false),
getActiveContainerCount: vi.fn().mockReturnValue(0),
killContainer: vi.fn(),
}));
// Override DATA_DIR for tests
vi.mock('../config.js', async () => {
const actual = await vi.importActual('../config.js');
return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-channels' };
});
const TEST_DIR = '/tmp/nanoclaw-test-channels';
function now() {
return new Date().toISOString();
}
/** Create a mock ChannelAdapter for testing. */
function createMockAdapter(channelType: string): ChannelAdapter & { delivered: OutboundMessage[]; inbound: InboundMessage[] } {
const delivered: OutboundMessage[] = [];
const inbound: InboundMessage[] = [];
let setupConfig: ChannelSetup | null = null;
return {
name: channelType,
channelType,
delivered,
inbound,
async setup(config: ChannelSetup) {
setupConfig = config;
},
async teardown() {
setupConfig = null;
},
isConnected() {
return setupConfig !== null;
},
async deliver(_platformId: string, _threadId: string | null, message: OutboundMessage) {
delivered.push(message);
},
async setTyping() {},
updateConversations() {},
};
}
describe('channel registry', () => {
// Import fresh modules for each test to avoid registry pollution
beforeEach(async () => {
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
fs.mkdirSync(TEST_DIR, { recursive: true });
});
afterEach(() => {
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
});
it('should register and retrieve channel adapters', async () => {
const { registerChannelAdapter, getRegisteredChannelNames, getChannelContainerConfig } = await import(
'./channel-registry.js'
);
registerChannelAdapter('test-channel', {
factory: () => createMockAdapter('test'),
containerConfig: {
env: { TEST_KEY: 'value' },
},
});
expect(getRegisteredChannelNames()).toContain('test-channel');
expect(getChannelContainerConfig('test-channel')).toEqual({
env: { TEST_KEY: 'value' },
});
});
it('should skip adapters that return null (missing credentials)', async () => {
const { registerChannelAdapter, initChannelAdapters, getActiveAdapters } = await import('./channel-registry.js');
registerChannelAdapter('no-creds', {
factory: () => null,
});
await initChannelAdapters(() => ({
conversations: [],
onInbound: () => {},
onMetadata: () => {},
}));
// Should not have any active adapters for channels with null factory returns
const active = getActiveAdapters();
const noCreds = active.find((a) => a.name === 'no-creds');
expect(noCreds).toBeUndefined();
});
});
describe('channel + router integration', () => {
beforeEach(async () => {
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
fs.mkdirSync(TEST_DIR, { recursive: true });
const { initTestDb, runMigrations, createAgentGroup, createMessagingGroup, createMessagingGroupAgent } =
await import('../db/index.js');
const db = initTestDb();
runMigrations(db);
createAgentGroup({
id: 'ag-1',
name: 'Test Agent',
folder: 'test-agent',
is_admin: 0,
agent_provider: null,
container_config: null,
created_at: now(),
});
createMessagingGroup({
id: 'mg-1',
channel_type: 'mock',
platform_id: 'chan-100',
name: 'Test Channel',
is_group: 1,
admin_user_id: null,
created_at: now(),
});
createMessagingGroupAgent({
id: 'mga-1',
messaging_group_id: 'mg-1',
agent_group_id: 'ag-1',
trigger_rules: null,
response_scope: 'all',
session_mode: 'shared',
priority: 0,
created_at: now(),
});
});
afterEach(async () => {
const { closeDb } = await import('../db/index.js');
closeDb();
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
});
it('should route inbound message from adapter to session DB', async () => {
const { routeInbound } = await import('../router-v2.js');
const { findSession } = await import('../db/sessions.js');
const { sessionDbPath } = await import('../session-manager.js');
// Simulate what the adapter bridge does: stringify content, call routeInbound
const inboundContent = { sender: 'TestUser', senderId: 'u1', text: 'Hello from adapter', isFromMe: false };
await routeInbound({
channelType: 'mock',
platformId: 'chan-100',
threadId: null,
message: {
id: 'msg-adapter-1',
kind: 'chat',
content: JSON.stringify(inboundContent),
timestamp: now(),
},
});
// Verify session was created and message written
const session = findSession('mg-1', null);
expect(session).toBeDefined();
const dbPath = sessionDbPath('ag-1', session!.id);
const db = new Database(dbPath);
const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{ id: string; content: string }>;
db.close();
expect(rows).toHaveLength(1);
expect(JSON.parse(rows[0].content).text).toBe('Hello from adapter');
});
it('should deliver outbound message through delivery adapter bridge', async () => {
const { setDeliveryAdapter } = await import('../delivery.js');
const { getChannelAdapter, registerChannelAdapter, initChannelAdapters } = await import('./channel-registry.js');
// Register and init a mock adapter
const mockAdapter = createMockAdapter('mock');
registerChannelAdapter('mock-delivery', {
factory: () => mockAdapter,
});
await initChannelAdapters((adapter) => ({
conversations: [],
onInbound: () => {},
onMetadata: () => {},
}));
// Set up delivery adapter bridge (same pattern as index-v2.ts)
setDeliveryAdapter({
async deliver(channelType, platformId, threadId, kind, content) {
const adapter = getChannelAdapter(channelType);
if (!adapter) return;
await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) });
},
});
// Simulate delivery
const adapter = getChannelAdapter('mock');
if (adapter) {
await adapter.deliver('chan-100', null, { kind: 'chat', content: { text: 'Agent response' } });
}
expect(mockAdapter.delivered).toHaveLength(1);
expect((mockAdapter.delivered[0].content as { text: string }).text).toBe('Agent response');
});
});

View File

@@ -0,0 +1,72 @@
/**
* v2 Channel adapter registry.
*
* Channels self-register on import. The host calls initChannelAdapters() at startup
* to instantiate and set up all registered adapters.
*/
import type { ChannelAdapter, ChannelRegistration, ChannelSetup } from './adapter.js';
import { log } from '../log.js';
const registry = new Map<string, ChannelRegistration>();
const activeAdapters = new Map<string, ChannelAdapter>();
/** Register a channel adapter factory. Called by channel modules on import. */
export function registerChannelAdapter(name: string, registration: ChannelRegistration): void {
registry.set(name, registration);
}
/** Get a live adapter by channel type. */
export function getChannelAdapter(channelType: string): ChannelAdapter | undefined {
return activeAdapters.get(channelType);
}
/** Get all active adapters. */
export function getActiveAdapters(): ChannelAdapter[] {
return [...activeAdapters.values()];
}
/** Get all registered channel names. */
export function getRegisteredChannelNames(): string[] {
return [...registry.keys()];
}
/** Get container config for a channel (used by container-runner for additional mounts/env). */
export function getChannelContainerConfig(name: string): ChannelRegistration['containerConfig'] {
return registry.get(name)?.containerConfig;
}
/**
* Instantiate and set up all registered channel adapters.
* Skips adapters that return null (missing credentials).
*/
export async function initChannelAdapters(setupFn: (adapter: ChannelAdapter) => ChannelSetup): Promise<void> {
for (const [name, registration] of registry) {
try {
const adapter = registration.factory();
if (!adapter) {
log.warn('Channel credentials missing, skipping', { channel: name });
continue;
}
const setup = setupFn(adapter);
await adapter.setup(setup);
activeAdapters.set(adapter.channelType, adapter);
log.info('Channel adapter started', { channel: name, type: adapter.channelType });
} catch (err) {
log.error('Failed to start channel adapter', { channel: name, err });
}
}
}
/** Tear down all active adapters. */
export async function teardownChannelAdapters(): Promise<void> {
for (const [name, adapter] of activeAdapters) {
try {
await adapter.teardown();
log.info('Channel adapter stopped', { channel: name });
} catch (err) {
log.error('Failed to stop channel adapter', { channel: name, err });
}
}
activeAdapters.clear();
}

View File

@@ -14,6 +14,7 @@ export {
getMessagingGroup,
getMessagingGroupByPlatform,
getAllMessagingGroups,
getMessagingGroupsByChannel,
updateMessagingGroup,
deleteMessagingGroup,
createMessagingGroupAgent,

View File

@@ -26,6 +26,12 @@ export function getAllMessagingGroups(): MessagingGroup[] {
return getDb().prepare('SELECT * FROM messaging_groups ORDER BY name').all() as MessagingGroup[];
}
export function getMessagingGroupsByChannel(channelType: string): MessagingGroup[] {
return getDb()
.prepare('SELECT * FROM messaging_groups WHERE channel_type = ?')
.all(channelType) as MessagingGroup[];
}
export function updateMessagingGroup(
id: string,
updates: Partial<Pick<MessagingGroup, 'name' | 'is_group' | 'admin_user_id'>>,

View File

@@ -1,19 +1,31 @@
/**
* NanoClaw v2 — main entry point.
*
* Thin orchestrator: init DB, run migrations, start delivery polls, start sweep.
* Channel adapters are started separately (Phase 4).
* Thin orchestrator: init DB, run migrations, start channel adapters,
* start delivery polls, start sweep, handle shutdown.
*/
import path from 'path';
import { DATA_DIR } from './config.js';
import { initDb } from './db/connection.js';
import { runMigrations } from './db/migrations/index.js';
import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messaging-groups.js';
import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js';
import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter } from './delivery.js';
import { startHostSweep } from './host-sweep.js';
import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js';
import { startHostSweep, stopHostSweep } from './host-sweep.js';
import { routeInbound } from './router-v2.js';
import { log } from './log.js';
// Channel imports — each triggers self-registration
// import './channels/discord-v2.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js';
import {
initChannelAdapters,
teardownChannelAdapters,
getChannelAdapter,
} from './channels/channel-registry.js';
async function main(): Promise<void> {
log.info('NanoClaw v2 starting');
@@ -27,22 +39,99 @@ async function main(): Promise<void> {
ensureContainerRuntimeRunning();
cleanupOrphans();
// 3. Channel adapters (Phase 4 — placeholder)
// TODO: init channel adapters, set up delivery adapter
// setDeliveryAdapter({ deliver: async (...) => { ... } });
// 3. Channel adapters
await initChannelAdapters((adapter: ChannelAdapter): ChannelSetup => {
const conversations = buildConversationConfigs(adapter.channelType);
return {
conversations,
onInbound(platformId, threadId, message) {
routeInbound({
channelType: adapter.channelType,
platformId,
threadId,
message: {
id: message.id,
kind: message.kind,
content: JSON.stringify(message.content),
timestamp: message.timestamp,
},
}).catch((err) => {
log.error('Failed to route inbound message', { channelType: adapter.channelType, err });
});
},
onMetadata(platformId, name, isGroup) {
log.info('Channel metadata discovered', {
channelType: adapter.channelType,
platformId,
name,
isGroup,
});
},
};
});
// 4. Start delivery polls
// 4. Delivery adapter bridge — dispatches to channel adapters
setDeliveryAdapter({
async deliver(channelType, platformId, threadId, kind, content) {
const adapter = getChannelAdapter(channelType);
if (!adapter) {
log.warn('No adapter for channel type', { channelType });
return;
}
await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) });
},
async setTyping(channelType, platformId, threadId) {
const adapter = getChannelAdapter(channelType);
await adapter?.setTyping?.(platformId, threadId);
},
});
// 5. Start delivery polls
startActiveDeliveryPoll();
startSweepDeliveryPoll();
log.info('Delivery polls started');
// 5. Start host sweep
// 6. Start host sweep
startHostSweep();
log.info('Host sweep started');
log.info('NanoClaw v2 running');
}
/** Build ConversationConfig[] for a channel type from the central DB. */
function buildConversationConfigs(channelType: string): ConversationConfig[] {
const groups = getMessagingGroupsByChannel(channelType);
const configs: ConversationConfig[] = [];
for (const mg of groups) {
const agents = getMessagingGroupAgents(mg.id);
for (const agent of agents) {
const triggerRules = agent.trigger_rules ? JSON.parse(agent.trigger_rules) : null;
configs.push({
platformId: mg.platform_id,
agentGroupId: agent.agent_group_id,
triggerPattern: triggerRules?.pattern,
requiresTrigger: triggerRules?.requiresTrigger ?? false,
sessionMode: agent.session_mode,
});
}
}
return configs;
}
/** Graceful shutdown. */
async function shutdown(signal: string): Promise<void> {
log.info('Shutdown signal received', { signal });
stopDeliveryPolls();
stopHostSweep();
await teardownChannelAdapters();
process.exit(0);
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
main().catch((err) => {
log.fatal('Startup failed', { err });
process.exit(1);