feat: agent-to-agent communication, dynamic agent creation, self-modification tools

Agent-to-agent: host routes messages with channel_type='agent' to target
agent's inbound.db, enriches with sender info, wakes target container.
Bidirectional routing works via inherited routing context.

Dynamic agents: create_agent MCP tool + system action handler creates
agent groups, folders, and optional CLAUDE.md on the fly.

Self-modification: install_packages (apt/npm, requires admin approval),
add_mcp_server (no approval), request_rebuild (builds per-agent-group
Docker image with approved packages). Approval flow reuses interactive
card infrastructure with pending_approvals table.

Also includes fixes from prior session: attachment download, reply context
extraction, message editing (platform message ID tracking), delivery retry
limits, and card update on button click.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-10 01:10:34 +03:00
parent 9af9bc947a
commit d8fbd3b239
24 changed files with 1025 additions and 78 deletions

View File

@@ -67,8 +67,8 @@ export interface ChannelAdapter {
teardown(): Promise<void>;
isConnected(): boolean;
// Outbound delivery
deliver(platformId: string, threadId: string | null, message: OutboundMessage): Promise<void>;
// Outbound delivery — returns the platform message ID if available
deliver(platformId: string, threadId: string | null, message: OutboundMessage): Promise<string | undefined>;
// Optional
setTyping?(platformId: string, threadId: string | null): Promise<void>;

View File

@@ -54,8 +54,9 @@ function createMockAdapter(
return setupConfig !== null;
},
async deliver(_platformId: string, _threadId: string | null, message: OutboundMessage) {
async deliver(_platformId: string, _threadId: string | null, message: OutboundMessage): Promise<string | undefined> {
delivered.push(message);
return undefined;
},
async setTyping() {},
@@ -213,8 +214,8 @@ describe('channel + router integration', () => {
setDeliveryAdapter({
async deliver(channelType, platformId, threadId, kind, content) {
const adapter = getChannelAdapter(channelType);
if (!adapter) return;
await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) });
if (!adapter) return undefined;
return adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) });
},
});

View File

@@ -30,11 +30,23 @@ interface GatewayAdapter extends Adapter {
): Promise<Response>;
}
/** Reply context extracted from a platform's raw message. */
export interface ReplyContext {
text: string;
sender: string;
}
/** Extract reply context from a platform-specific raw message. Return null if no reply. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ReplyContextExtractor = (raw: Record<string, any>) => ReplyContext | null;
export interface ChatSdkBridgeConfig {
adapter: Adapter;
concurrency?: ConcurrencyStrategy;
/** Bot token for authenticating forwarded Gateway events (required for interaction handling). */
botToken?: string;
/** Platform-specific reply context extraction. */
extractReplyContext?: ReplyContextExtractor;
}
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
@@ -53,11 +65,50 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
return map;
}
function messageToInbound(message: ChatMessage): InboundMessage {
async function messageToInbound(message: ChatMessage): Promise<InboundMessage> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const serialized = message.toJSON() as Record<string, any>;
// Download attachment data before serialization loses fetchData()
if (message.attachments && message.attachments.length > 0) {
const enriched = [];
for (const att of message.attachments) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const entry: Record<string, any> = {
type: att.type,
name: att.name,
mimeType: att.mimeType,
size: att.size,
width: (att as unknown as Record<string, unknown>).width,
height: (att as unknown as Record<string, unknown>).height,
};
if (att.fetchData) {
try {
const buffer = await att.fetchData();
entry.data = buffer.toString('base64');
} catch (err) {
log.warn('Failed to download attachment', { type: att.type, err });
}
}
enriched.push(entry);
}
serialized.attachments = enriched;
}
// Extract reply context via platform-specific hook
if (config.extractReplyContext && message.raw) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const replyTo = config.extractReplyContext(message.raw as Record<string, any>);
if (replyTo) serialized.replyTo = replyTo;
}
// Drop raw to save DB space (can be very large)
serialized.raw = undefined;
return {
id: message.id,
kind: 'chat-sdk',
content: message.toJSON(),
content: serialized,
timestamp: message.metadata.dateSent.toISOString(),
};
}
@@ -83,20 +134,20 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
// Subscribed threads — forward all messages
chat.onSubscribedMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, thread.id, messageToInbound(message));
setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
});
// @mention in unsubscribed thread — forward + subscribe
chat.onNewMention(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, thread.id, messageToInbound(message));
setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
await thread.subscribe();
});
// DMs — always forward + subscribe
chat.onDirectMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
setupConfig.onInbound(channelId, null, messageToInbound(message));
setupConfig.onInbound(channelId, null, await messageToInbound(message));
await thread.subscribe();
});
@@ -108,6 +159,17 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
const questionId = parts[1];
const selectedOption = event.value || '';
const userId = event.user?.userId || '';
// Update the card to show the selected answer and remove buttons
try {
const tid = event.threadId;
await adapter.editMessage(tid, event.messageId, {
markdown: `❓ **Question**\n\n${selectedOption ? `✅ **${selectedOption}**` : '(clicked)'}`,
});
} catch (err) {
log.warn('Failed to update card after action', { err });
}
setupConfig.onAction(questionId, selectedOption, userId);
});
@@ -161,7 +223,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
log.info('Chat SDK bridge initialized', { adapter: adapter.name });
},
async deliver(platformId: string, threadId: string | null, message) {
async deliver(platformId: string, threadId: string | null, message): Promise<string | undefined> {
// platformId is already in the adapter's encoded format (e.g. "telegram:6037840640",
// "discord:guildId:channelId") — use it directly as the thread ID
const tid = threadId ?? platformId;
@@ -190,24 +252,36 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
Actions(options.map((opt) => Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }))),
],
});
await adapter.postMessage(tid, { card, fallbackText: `${content.question}\nOptions: ${options.join(', ')}` });
return;
const result = await adapter.postMessage(tid, {
card,
fallbackText: `${content.question}\nOptions: ${options.join(', ')}`,
});
return result?.id;
}
// Normal message
const text = (content.markdown as string) || (content.text as string);
if (text) {
// Attach files if present (FileUpload format: { data, filename })
const fileUploads = message.files?.map((f) => ({ data: f.data, filename: f.filename }));
const fileUploads = message.files?.map((f: { data: Buffer; filename: string }) => ({
data: f.data,
filename: f.filename,
}));
if (fileUploads && fileUploads.length > 0) {
await adapter.postMessage(tid, { markdown: text, files: fileUploads });
const result = await adapter.postMessage(tid, { markdown: text, files: fileUploads });
return result?.id;
} else {
await adapter.postMessage(tid, { markdown: text });
const result = await adapter.postMessage(tid, { markdown: text });
return result?.id;
}
} else if (message.files && message.files.length > 0) {
// Files only, no text
const fileUploads = message.files.map((f) => ({ data: f.data, filename: f.filename }));
await adapter.postMessage(tid, { markdown: '', files: fileUploads });
const fileUploads = message.files.map((f: { data: Buffer; filename: string }) => ({
data: f.data,
filename: f.filename,
}));
const result = await adapter.postMessage(tid, { markdown: '', files: fileUploads });
return result?.id;
}
},

View File

@@ -5,9 +5,19 @@
import { createDiscordAdapter } from '@chat-adapter/discord';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
if (!raw.referenced_message) return null;
const reply = raw.referenced_message;
return {
text: reply.content || '',
sender: reply.author?.global_name || reply.author?.username || 'Unknown',
};
}
registerChannelAdapter('discord', {
factory: () => {
const env = readEnvFile(['DISCORD_BOT_TOKEN', 'DISCORD_PUBLIC_KEY', 'DISCORD_APPLICATION_ID']);
@@ -17,6 +27,11 @@ registerChannelAdapter('discord', {
publicKey: env.DISCORD_PUBLIC_KEY,
applicationId: env.DISCORD_APPLICATION_ID,
});
return createChatSdkBridge({ adapter: discordAdapter, concurrency: 'concurrent', botToken: env.DISCORD_BOT_TOKEN });
return createChatSdkBridge({
adapter: discordAdapter,
concurrency: 'concurrent',
botToken: env.DISCORD_BOT_TOKEN,
extractReplyContext,
});
},
});

View File

@@ -5,9 +5,19 @@
import { createTelegramAdapter } from '@chat-adapter/telegram';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
if (!raw.reply_to_message) return null;
const reply = raw.reply_to_message;
return {
text: reply.text || reply.caption || '',
sender: reply.from?.first_name || reply.from?.username || 'Unknown',
};
}
registerChannelAdapter('telegram', {
factory: () => {
const env = readEnvFile(['TELEGRAM_BOT_TOKEN']);
@@ -16,6 +26,6 @@ registerChannelAdapter('telegram', {
botToken: env.TELEGRAM_BOT_TOKEN,
mode: 'polling',
});
return createChatSdkBridge({ adapter: telegramAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: telegramAdapter, concurrency: 'concurrent', extractReplyContext });
},
});

View File

@@ -3,7 +3,7 @@
* Spawns agent containers with session folder + agent group folder mounts.
* The container runs the v2 agent-runner which polls the session DB.
*/
import { ChildProcess, spawn } from 'child_process';
import { ChildProcess, execSync, spawn } from 'child_process';
import fs from 'fs';
import path from 'path';
@@ -274,9 +274,19 @@ async function buildContainerArgs(
}
}
// Pass additional MCP servers from container config
const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {};
if (containerConfig.mcpServers && Object.keys(containerConfig.mcpServers).length > 0) {
args.push('-e', `NANOCLAW_MCP_SERVERS=${JSON.stringify(containerConfig.mcpServers)}`);
}
// Override entrypoint: compile agent-runner source, run v2 entry point (no stdin)
args.push('--entrypoint', 'bash');
args.push(CONTAINER_IMAGE);
// Use per-agent-group image if one has been built, otherwise base image
const imageTag = containerConfig.imageTag || CONTAINER_IMAGE;
args.push(imageTag);
args.push(
'-c',
'cd /app && npx tsc --outDir /tmp/dist 2>&1 >&2 && ln -sf /app/node_modules /tmp/dist/node_modules && node /tmp/dist/index.js',
@@ -284,3 +294,51 @@ async function buildContainerArgs(
return args;
}
/** Build a per-agent-group Docker image with custom packages. */
export async function buildAgentGroupImage(agentGroupId: string): Promise<void> {
const agentGroup = getAgentGroup(agentGroupId);
if (!agentGroup) throw new Error('Agent group not found');
const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {};
const packages = containerConfig.packages || { apt: [], npm: [] };
const aptPackages = (packages.apt || []) as string[];
const npmPackages = (packages.npm || []) as string[];
if (aptPackages.length === 0 && npmPackages.length === 0) {
throw new Error('No packages to install. Use install_packages first.');
}
let dockerfile = `FROM ${CONTAINER_IMAGE}\nUSER root\n`;
if (aptPackages.length > 0) {
dockerfile += `RUN apt-get update && apt-get install -y ${aptPackages.join(' ')} && rm -rf /var/lib/apt/lists/*\n`;
}
if (npmPackages.length > 0) {
dockerfile += `RUN npm install -g ${npmPackages.join(' ')}\n`;
}
dockerfile += 'USER node\n';
const imageTag = `nanoclaw-agent:${agentGroupId}`;
log.info('Building per-agent-group image', { agentGroupId, imageTag, apt: aptPackages, npm: npmPackages });
// Write Dockerfile to temp file and build
const tmpDockerfile = path.join(DATA_DIR, `Dockerfile.${agentGroupId}`);
fs.writeFileSync(tmpDockerfile, dockerfile);
try {
execSync(`${CONTAINER_RUNTIME_BIN} build -t ${imageTag} -f ${tmpDockerfile} .`, {
cwd: DATA_DIR,
stdio: 'pipe',
timeout: 300_000,
});
} finally {
fs.unlinkSync(tmpDockerfile);
}
// Store the image tag in container_config
containerConfig.imageTag = imageTag;
const { updateAgentGroup } = await import('./db/agent-groups.js');
updateAgentGroup(agentGroupId, { container_config: JSON.stringify(containerConfig) });
log.info('Per-agent-group image built', { agentGroupId, imageTag });
}

View File

@@ -62,7 +62,7 @@ describe('migrations', () => {
const db = initTestDb();
runMigrations(db);
const row = db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number };
expect(row.v).toBe(2);
expect(row.v).toBe(3);
});
});

View File

@@ -109,3 +109,14 @@ export function updateMessagingGroupAgent(
export function deleteMessagingGroupAgent(id: string): void {
getDb().prepare('DELETE FROM messaging_group_agents WHERE id = ?').run(id);
}
/** Get all messaging groups wired to an agent group (reverse lookup). */
export function getMessagingGroupsByAgentGroup(agentGroupId: string): MessagingGroup[] {
return getDb()
.prepare(
`SELECT mg.* FROM messaging_groups mg
JOIN messaging_group_agents mga ON mga.messaging_group_id = mg.id
WHERE mga.agent_group_id = ?`,
)
.all(agentGroupId) as MessagingGroup[];
}

View File

@@ -0,0 +1,18 @@
import type { Migration } from './index.js';
export const migration003: Migration = {
version: 3,
name: 'pending-approvals',
up(db) {
db.exec(`
CREATE TABLE pending_approvals (
approval_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(id),
request_id TEXT NOT NULL,
action TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL
);
`);
},
};

View File

@@ -3,6 +3,7 @@ import type Database from 'better-sqlite3';
import { log } from '../../log.js';
import { migration001 } from './001-initial.js';
import { migration002 } from './002-chat-sdk-state.js';
import { migration003 } from './003-pending-approvals.js';
export interface Migration {
version: number;
@@ -10,7 +11,7 @@ export interface Migration {
up: (db: Database.Database) => void;
}
const migrations: Migration[] = [migration001, migration002];
const migrations: Migration[] = [migration001, migration002, migration003];
export function runMigrations(db: Database.Database): void {
db.exec(`

View File

@@ -93,11 +93,13 @@ CREATE TABLE messages_in (
content TEXT NOT NULL
);
-- Host tracks which messages_out IDs have been delivered.
-- Host tracks delivery outcomes for messages_out IDs.
-- Avoids writing to outbound.db (container-owned).
CREATE TABLE delivered (
message_out_id TEXT PRIMARY KEY,
delivered_at TEXT NOT NULL
message_out_id TEXT PRIMARY KEY,
platform_message_id TEXT,
status TEXT NOT NULL DEFAULT 'delivered',
delivered_at TEXT NOT NULL
);
`;

View File

@@ -1,4 +1,4 @@
import type { PendingQuestion, Session } from '../types.js';
import type { PendingApproval, PendingQuestion, Session } from '../types.js';
import { getDb } from './connection.js';
// ── Sessions ──
@@ -90,3 +90,24 @@ export function getPendingQuestion(questionId: string): PendingQuestion | undefi
export function deletePendingQuestion(questionId: string): void {
getDb().prepare('DELETE FROM pending_questions WHERE question_id = ?').run(questionId);
}
// ── Pending Approvals ──
export function createPendingApproval(pa: PendingApproval): void {
getDb()
.prepare(
`INSERT INTO pending_approvals (approval_id, session_id, request_id, action, payload, created_at)
VALUES (@approval_id, @session_id, @request_id, @action, @payload, @created_at)`,
)
.run(pa);
}
export function getPendingApproval(approvalId: string): PendingApproval | undefined {
return getDb().prepare('SELECT * FROM pending_approvals WHERE approval_id = ?').get(approvalId) as
| PendingApproval
| undefined;
}
export function deletePendingApproval(approvalId: string): void {
getDb().prepare('DELETE FROM pending_approvals WHERE approval_id = ?').run(approvalId);
}

View File

@@ -11,16 +11,22 @@ import Database from 'better-sqlite3';
import fs from 'fs';
import path from 'path';
import { getRunningSessions, getActiveSessions, createPendingQuestion } from './db/sessions.js';
import { getAgentGroup } from './db/agent-groups.js';
import { GROUPS_DIR } from './config.js';
import { getRunningSessions, getActiveSessions, createPendingQuestion, getSession, createPendingApproval } from './db/sessions.js';
import { getAgentGroup, getAdminAgentGroup, createAgentGroup, updateAgentGroup } from './db/agent-groups.js';
import { getMessagingGroupsByAgentGroup } from './db/messaging-groups.js';
import { log } from './log.js';
import { openInboundDb, openOutboundDb, sessionDir, inboundDbPath } from './session-manager.js';
import { resetContainerIdleTimer } from './container-runner.js';
import { openInboundDb, openOutboundDb, sessionDir, inboundDbPath, resolveSession, writeSessionMessage, writeSystemResponse } from './session-manager.js';
import { resetContainerIdleTimer, wakeContainer } from './container-runner.js';
import type { OutboundFile } from './channels/adapter.js';
import type { Session } from './types.js';
const ACTIVE_POLL_MS = 1000;
const SWEEP_POLL_MS = 60_000;
const MAX_DELIVERY_ATTEMPTS = 3;
/** Track delivery attempt counts. Resets on process restart (gives failed messages a fresh chance). */
const deliveryAttempts = new Map<string, number>();
export interface ChannelDeliveryAdapter {
deliver(
@@ -30,7 +36,7 @@ export interface ChannelDeliveryAdapter {
kind: string,
content: string,
files?: OutboundFile[],
): Promise<void>;
): Promise<string | undefined>;
setTyping?(channelType: string, platformId: string, threadId: string | null): Promise<void>;
}
@@ -136,16 +142,44 @@ async function deliverSessionMessages(session: Session): Promise<void> {
const undelivered = allDue.filter((m) => !deliveredIds.has(m.id));
if (undelivered.length === 0) return;
// Ensure platform_message_id column exists (migration for existing sessions)
migrateDeliveredTable(inDb);
for (const msg of undelivered) {
try {
await deliverMessage(msg, session, inDb);
// Track delivery in inbound.db (host-owned) — not outbound.db
const platformMsgId = await deliverMessage(msg, session, inDb);
inDb
.prepare("INSERT OR IGNORE INTO delivered (message_out_id, delivered_at) VALUES (?, datetime('now'))")
.run(msg.id);
.prepare(
"INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, ?, 'delivered', datetime('now'))",
)
.run(msg.id, platformMsgId ?? null);
deliveryAttempts.delete(msg.id);
resetContainerIdleTimer(session.id);
} catch (err) {
log.error('Failed to deliver message', { messageId: msg.id, sessionId: session.id, err });
const attempts = (deliveryAttempts.get(msg.id) ?? 0) + 1;
deliveryAttempts.set(msg.id, attempts);
if (attempts >= MAX_DELIVERY_ATTEMPTS) {
log.error('Message delivery failed permanently, giving up', {
messageId: msg.id,
sessionId: session.id,
attempts,
err,
});
inDb
.prepare(
"INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, NULL, 'failed', datetime('now'))",
)
.run(msg.id);
deliveryAttempts.delete(msg.id);
} else {
log.warn('Message delivery failed, will retry', {
messageId: msg.id,
sessionId: session.id,
attempt: attempts,
maxAttempts: MAX_DELIVERY_ATTEMPTS,
err,
});
}
}
}
} finally {
@@ -165,7 +199,7 @@ async function deliverMessage(
},
session: Session,
inDb: Database.Database,
): Promise<void> {
): Promise<string | undefined> {
if (!deliveryAdapter) {
log.warn('No delivery adapter configured, dropping message', { id: msg.id });
return;
@@ -181,8 +215,7 @@ async function deliverMessage(
// Agent-to-agent — route to target session
if (msg.channel_type === 'agent') {
log.info('Agent-to-agent message', { from: session.id, target: msg.platform_id });
// TODO: route to target agent's session DB
await routeAgentMessage(msg, session);
return;
}
@@ -222,11 +255,19 @@ async function deliverMessage(
if (files.length === 0) files = undefined;
}
await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content, files);
const platformMsgId = await deliveryAdapter.deliver(
msg.channel_type,
msg.platform_id,
msg.thread_id,
msg.kind,
msg.content,
files,
);
log.info('Message delivered', {
id: msg.id,
channelType: msg.channel_type,
platformId: msg.platform_id,
platformMsgId,
fileCount: files?.length,
});
@@ -234,6 +275,71 @@ async function deliverMessage(
if (fs.existsSync(outboxDir)) {
fs.rmSync(outboxDir, { recursive: true, force: true });
}
return platformMsgId;
}
/** Route an agent-to-agent message to the target agent's session. */
async function routeAgentMessage(
msg: { id: string; platform_id: string | null; content: string },
sourceSession: Session,
): Promise<void> {
const targetAgentGroupId = msg.platform_id;
if (!targetAgentGroupId) {
log.warn('Agent message missing target agent group ID', { id: msg.id });
return;
}
const targetGroup = getAgentGroup(targetAgentGroupId);
if (!targetGroup) {
log.warn('Target agent group not found', { id: msg.id, targetAgentGroupId });
return;
}
const sourceGroup = getAgentGroup(sourceSession.agent_group_id);
const sourceAgentName = sourceGroup?.name || sourceSession.agent_group_id;
// Find or create a session for the target agent
const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared');
// Enrich content with sender info
const content = JSON.parse(msg.content);
const enrichedContent = JSON.stringify({
text: content.text,
sender: sourceAgentName,
senderId: sourceSession.agent_group_id,
});
const messageId = `agent-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
writeSessionMessage(targetAgentGroupId, targetSession.id, {
id: messageId,
kind: 'chat',
timestamp: new Date().toISOString(),
platformId: sourceSession.agent_group_id,
channelType: 'agent',
threadId: null,
content: enrichedContent,
});
log.info('Agent message routed', { from: sourceSession.agent_group_id, to: targetAgentGroupId, targetSession: targetSession.id });
const freshSession = getSession(targetSession.id);
if (freshSession) {
await wakeContainer(freshSession);
}
}
/** Ensure the delivered table has new columns (migration for existing sessions). */
function migrateDeliveredTable(db: Database.Database): void {
const cols = new Set(
(db.prepare("PRAGMA table_info('delivered')").all() as Array<{ name: string }>).map((c) => c.name),
);
if (!cols.has('platform_message_id')) {
db.prepare('ALTER TABLE delivered ADD COLUMN platform_message_id TEXT').run();
}
if (!cols.has('status')) {
db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run();
}
}
/**
@@ -309,6 +415,207 @@ async function handleSystemAction(
break;
}
case 'create_agent': {
const requestId = content.requestId as string;
const name = content.name as string;
let folder =
(content.folder as string) || name.toLowerCase().replace(/[^a-z0-9_-]/g, '_').replace(/_+/g, '_');
const instructions = content.instructions as string | null;
try {
// Avoid duplicate folders
const { getAgentGroupByFolder } = await import('./db/agent-groups.js');
if (getAgentGroupByFolder(folder)) {
folder = `${folder}_${Date.now()}`;
}
const agentGroupId = `ag-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
createAgentGroup({
id: agentGroupId,
name,
folder,
is_admin: 0,
agent_provider: null,
container_config: null,
created_at: new Date().toISOString(),
});
const groupPath = path.join(GROUPS_DIR, folder);
fs.mkdirSync(groupPath, { recursive: true });
if (instructions) {
fs.writeFileSync(path.join(groupPath, 'CLAUDE.md'), instructions);
}
writeSystemResponse(session.agent_group_id, session.id, requestId, 'success', {
agentGroupId,
name,
folder,
});
log.info('Agent group created via system action', { agentGroupId, name, folder });
} catch (e) {
writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', {
error: e instanceof Error ? e.message : String(e),
});
}
break;
}
case 'add_mcp_server': {
const requestId = content.requestId as string;
const serverName = content.name as string;
const command = content.command as string;
const serverArgs = content.args as string[];
const serverEnv = content.env as Record<string, string>;
try {
const agentGroup = getAgentGroup(session.agent_group_id);
if (!agentGroup) throw new Error('Agent group not found');
const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {};
if (!containerConfig.mcpServers) containerConfig.mcpServers = {};
containerConfig.mcpServers[serverName] = { command, args: serverArgs || [], env: serverEnv || {} };
updateAgentGroup(session.agent_group_id, { container_config: JSON.stringify(containerConfig) });
writeSystemResponse(session.agent_group_id, session.id, requestId, 'success', {
message: `MCP server "${serverName}" added. Will take effect on next container restart.`,
});
log.info('MCP server added', { agentGroupId: session.agent_group_id, name: serverName });
} catch (e) {
writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', {
error: e instanceof Error ? e.message : String(e),
});
}
break;
}
case 'install_packages': {
const requestId = content.requestId as string;
const apt = (content.apt as string[]) || [];
const npm = (content.npm as string[]) || [];
const reason = content.reason as string;
const agentGroup = getAgentGroup(session.agent_group_id);
if (!agentGroup) {
writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { error: 'Agent group not found' });
break;
}
// Find admin channel for approval card
const adminGroup = getAdminAgentGroup();
let approvalChannelType: string | null = null;
let approvalPlatformId: string | null = null;
if (adminGroup) {
const adminMGs = getMessagingGroupsByAgentGroup(adminGroup.id);
if (adminMGs.length > 0) {
approvalChannelType = adminMGs[0].channel_type;
approvalPlatformId = adminMGs[0].platform_id;
}
}
if (!approvalChannelType || !approvalPlatformId) {
writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', {
error: 'No admin channel found for approval',
});
break;
}
const approvalId = `appr-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
createPendingApproval({
approval_id: approvalId,
session_id: session.id,
request_id: requestId,
action: 'install_packages',
payload: JSON.stringify({ apt, npm, reason }),
created_at: new Date().toISOString(),
});
const packageList = [...apt.map((p: string) => `apt: ${p}`), ...npm.map((p: string) => `npm: ${p}`)].join(', ');
if (deliveryAdapter) {
await deliveryAdapter.deliver(
approvalChannelType,
approvalPlatformId,
null,
'chat-sdk',
JSON.stringify({
type: 'ask_question',
questionId: approvalId,
question: `Agent "${agentGroup.name}" requests package installation:\n${packageList}${reason ? `\nReason: ${reason}` : ''}`,
options: ['Approve', 'Reject'],
}),
);
}
log.info('Package install approval requested', { approvalId, agentGroup: agentGroup.name, apt, npm });
break;
}
case 'request_rebuild': {
const requestId = content.requestId as string;
const reason = content.reason as string;
const agentGroup = getAgentGroup(session.agent_group_id);
if (!agentGroup) {
writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { error: 'Agent group not found' });
break;
}
// Find admin channel for approval card
const adminGroup2 = getAdminAgentGroup();
let rebuildChannelType: string | null = null;
let rebuildPlatformId: string | null = null;
if (adminGroup2) {
const adminMGs2 = getMessagingGroupsByAgentGroup(adminGroup2.id);
if (adminMGs2.length > 0) {
rebuildChannelType = adminMGs2[0].channel_type;
rebuildPlatformId = adminMGs2[0].platform_id;
}
}
if (!rebuildChannelType || !rebuildPlatformId) {
writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', {
error: 'No admin channel found for approval',
});
break;
}
const rebuildApprovalId = `appr-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
createPendingApproval({
approval_id: rebuildApprovalId,
session_id: session.id,
request_id: requestId,
action: 'request_rebuild',
payload: JSON.stringify({ reason }),
created_at: new Date().toISOString(),
});
if (deliveryAdapter) {
await deliveryAdapter.deliver(
rebuildChannelType,
rebuildPlatformId,
null,
'chat-sdk',
JSON.stringify({
type: 'ask_question',
questionId: rebuildApprovalId,
question: `Agent "${agentGroup.name}" requests a container rebuild.${reason ? `\nReason: ${reason}` : ''}`,
options: ['Approve', 'Reject'],
}),
);
}
log.info('Container rebuild approval requested', { approvalId: rebuildApprovalId, agentGroup: agentGroup.name });
break;
}
default:
log.warn('Unknown system action', { action });
}

View File

@@ -14,9 +14,10 @@ import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runti
import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js';
import { startHostSweep, stopHostSweep } from './host-sweep.js';
import { routeInbound } from './router.js';
import { getPendingQuestion, deletePendingQuestion, getSession } from './db/sessions.js';
import { writeSessionMessage } from './session-manager.js';
import { wakeContainer } from './container-runner.js';
import { getPendingQuestion, deletePendingQuestion, getPendingApproval, deletePendingApproval, getSession } from './db/sessions.js';
import { getAgentGroup, updateAgentGroup } from './db/agent-groups.js';
import { writeSessionMessage, writeSystemResponse } from './session-manager.js';
import { wakeContainer, buildAgentGroupImage } from './container-runner.js';
import { log } from './log.js';
// Channel barrel — each enabled channel self-registers on import.
@@ -83,7 +84,7 @@ async function main(): Promise<void> {
log.warn('No adapter for channel type', { channelType });
return;
}
await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files });
return adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files });
},
async setTyping(channelType, platformId, threadId) {
const adapter = getChannelAdapter(channelType);
@@ -125,8 +126,15 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] {
return configs;
}
/** Handle a user's response to an ask_user_question card. */
/** Handle a user's response to an ask_user_question card or an approval card. */
async function handleQuestionResponse(questionId: string, selectedOption: string, userId: string): Promise<void> {
// Check if this is a pending approval (install_packages, request_rebuild)
const approval = getPendingApproval(questionId);
if (approval) {
await handleApprovalResponse(approval, selectedOption, userId);
return;
}
const pq = getPendingQuestion(questionId);
if (!pq) {
log.warn('Pending question not found (may have expired)', { questionId });
@@ -163,6 +171,66 @@ async function handleQuestionResponse(questionId: string, selectedOption: string
await wakeContainer(session);
}
/** Handle an admin's response to an approval card. */
async function handleApprovalResponse(
approval: import('./types.js').PendingApproval,
selectedOption: string,
userId: string,
): Promise<void> {
const session = getSession(approval.session_id);
if (!session) {
deletePendingApproval(approval.approval_id);
return;
}
if (selectedOption === 'Approve') {
const payload = JSON.parse(approval.payload);
if (approval.action === 'install_packages') {
const agentGroup = getAgentGroup(session.agent_group_id);
const containerConfig = agentGroup?.container_config ? JSON.parse(agentGroup.container_config) : {};
if (!containerConfig.packages) containerConfig.packages = { apt: [], npm: [] };
if (payload.apt) containerConfig.packages.apt.push(...payload.apt);
if (payload.npm) containerConfig.packages.npm.push(...payload.npm);
updateAgentGroup(session.agent_group_id, { container_config: JSON.stringify(containerConfig) });
writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'success', {
message: 'Packages approved. Run request_rebuild to apply.',
approved: { apt: payload.apt, npm: payload.npm },
});
log.info('Package install approved', { approvalId: approval.approval_id, userId });
} else if (approval.action === 'request_rebuild') {
try {
await buildAgentGroupImage(session.agent_group_id);
writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'success', {
message: 'Container image rebuilt. Changes will take effect on next container start.',
});
log.info('Container rebuild approved and completed', { approvalId: approval.approval_id, userId });
} catch (e) {
writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'error', {
error: `Rebuild failed: ${e instanceof Error ? e.message : String(e)}`,
});
log.error('Container rebuild failed', { approvalId: approval.approval_id, err: e });
}
}
} else {
// Rejected
writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'error', {
error: `Request rejected by admin (${userId})`,
});
log.info('Approval rejected', { approvalId: approval.approval_id, action: approval.action, userId });
}
deletePendingApproval(approval.approval_id);
// Wake container so the agent's polling MCP tool picks up the response
if (session) {
await wakeContainer(session);
}
}
/** Graceful shutdown. */
async function shutdown(signal: string): Promise<void> {
log.info('Shutdown signal received', { signal });

View File

@@ -64,7 +64,7 @@ function generateId(): string {
*/
export function resolveSession(
agentGroupId: string,
messagingGroupId: string,
messagingGroupId: string | null,
threadId: string | null,
sessionMode: 'shared' | 'per-thread' | 'agent-shared',
): { session: Session; created: boolean } {
@@ -74,7 +74,7 @@ export function resolveSession(
if (existing) {
return { session: existing, created: false };
}
} else {
} else if (messagingGroupId) {
const lookupThreadId = sessionMode === 'shared' ? null : threadId;
const existing = findSession(messagingGroupId, lookupThreadId);
if (existing) {
@@ -144,6 +144,9 @@ export function writeSessionMessage(
recurrence?: string | null;
},
): void {
// Extract base64 attachment data, save to inbox, replace with file paths
const content = extractAttachmentFiles(agentGroupId, sessionId, message.id, message.content);
const dbPath = inboundDbPath(agentGroupId, sessionId);
const db = new Database(dbPath);
db.pragma('journal_mode = DELETE');
@@ -166,7 +169,7 @@ export function writeSessionMessage(
platformId: message.platformId ?? null,
channelType: message.channelType ?? null,
threadId: message.threadId ?? null,
content: message.content,
content,
processAfter: message.processAfter ?? null,
recurrence: message.recurrence ?? null,
});
@@ -177,6 +180,44 @@ export function writeSessionMessage(
updateSession(sessionId, { last_active: new Date().toISOString() });
}
/**
* If message content has attachments with base64 `data`, save them to
* the session's inbox directory and replace with `localPath`.
*/
function extractAttachmentFiles(
agentGroupId: string,
sessionId: string,
messageId: string,
contentStr: string,
): string {
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(contentStr);
} catch {
return contentStr;
}
const attachments = parsed.attachments as Array<Record<string, unknown>> | undefined;
if (!Array.isArray(attachments)) return contentStr;
let changed = false;
for (const att of attachments) {
if (typeof att.data === 'string') {
const inboxDir = path.join(sessionDir(agentGroupId, sessionId), 'inbox', messageId);
fs.mkdirSync(inboxDir, { recursive: true });
const filename = (att.name as string) || `attachment-${Date.now()}`;
const filePath = path.join(inboxDir, filename);
fs.writeFileSync(filePath, Buffer.from(att.data as string, 'base64'));
att.localPath = `inbox/${messageId}/${filename}`;
delete att.data;
changed = true;
log.debug('Saved attachment to inbox', { messageId, filename, size: att.size });
}
}
return changed ? JSON.stringify(parsed) : contentStr;
}
/** Open the inbound DB for a session (host reads/writes). */
export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database {
const dbPath = inboundDbPath(agentGroupId, sessionId);
@@ -201,6 +242,27 @@ export function openSessionDb(agentGroupId: string, sessionId: string): Database
return openInboundDb(agentGroupId, sessionId);
}
/** Write a system response to a session's inbound.db so the container's findQuestionResponse() picks it up. */
export function writeSystemResponse(
agentGroupId: string,
sessionId: string,
requestId: string,
status: string,
result: Record<string, unknown>,
): void {
writeSessionMessage(agentGroupId, sessionId, {
id: `sys-resp-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
kind: 'system',
timestamp: new Date().toISOString(),
content: JSON.stringify({
type: 'question_response',
questionId: requestId,
status,
result,
}),
});
}
/** Mark a container as running for a session. */
export function markContainerRunning(sessionId: string): void {
updateSession(sessionId, { container_status: 'running', last_active: new Date().toISOString() });

View File

@@ -88,3 +88,14 @@ export interface PendingQuestion {
thread_id: string | null;
created_at: string;
}
// ── Pending approvals (central DB) ──
export interface PendingApproval {
approval_id: string;
session_id: string;
request_id: string;
action: string;
payload: string; // JSON
created_at: string;
}