diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 31f2fb2..0877531 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -90,8 +90,10 @@ export function initTestSessionDb(): { inbound: Database.Database; outbound: Dat content TEXT NOT NULL ); CREATE TABLE delivered ( - message_out_id TEXT PRIMARY KEY, - delivered_at TEXT NOT NULL + message_out_id TEXT PRIMARY KEY, + platform_message_id TEXT, + status TEXT NOT NULL DEFAULT 'delivered', + delivered_at TEXT NOT NULL ); `); diff --git a/container/agent-runner/src/db/messages-out.ts b/container/agent-runner/src/db/messages-out.ts index 55e078c..3d2f411 100644 --- a/container/agent-runner/src/db/messages-out.ts +++ b/container/agent-runner/src/db/messages-out.ts @@ -70,16 +70,37 @@ export function writeMessageOut(msg: WriteMessageOut): number { /** * Look up a message's platform ID by seq number. * Searches both inbound and outbound DBs since seq spans both. + * + * For inbound messages, the Chat SDK message ID is already the platform message ID + * (e.g., "6037840640:42" for Telegram). + * + * For outbound messages, the internal ID (msg-xxx) won't work for edits/reactions. + * Instead, look up the platform_message_id from the delivered table (host writes this + * after successful delivery). */ export function getMessageIdBySeq(seq: number): string | null { - const inRow = getInboundDb().prepare('SELECT id FROM messages_in WHERE seq = ?').get(seq) as + const inbound = getInboundDb(); + + // Inbound messages: ID is already the platform message ID + const inRow = inbound.prepare('SELECT id FROM messages_in WHERE seq = ?').get(seq) as | { id: string } | undefined; if (inRow) return inRow.id; + + // Outbound messages: look up platform message ID from delivered table const outRow = getOutboundDb().prepare('SELECT id FROM messages_out WHERE seq = ?').get(seq) as | { id: string } | undefined; - return outRow?.id ?? null; + if (!outRow) return null; + + // Check if host has stored the platform message ID after delivery + const deliveredRow = inbound + .prepare('SELECT platform_message_id FROM delivered WHERE message_out_id = ?') + .get(outRow.id) as { platform_message_id: string | null } | undefined; + if (deliveredRow?.platform_message_id) return deliveredRow.platform_message_id; + + // Fallback to internal ID (edits/reactions on undelivered messages won't work) + return outRow.id; } /** Get undelivered messages (for host polling — reads from outbound.db). */ diff --git a/container/agent-runner/src/formatter.ts b/container/agent-runner/src/formatter.ts index 8b0b1e8..87be2d6 100644 --- a/container/agent-runner/src/formatter.ts +++ b/container/agent-runner/src/formatter.ts @@ -109,13 +109,7 @@ function formatChatMessages(messages: MessageInRow[]): string { const lines = ['']; for (const msg of messages) { - const content = parseContent(msg.content); - const sender = content.sender || content.author?.fullName || content.author?.userName || 'Unknown'; - const time = formatTime(msg.timestamp); - const text = content.text || ''; - const idAttr = msg.seq != null ? ` id="${msg.seq}"` : ''; - const attachmentsSuffix = formatAttachments(content.attachments); - lines.push(`${escapeXml(text)}${attachmentsSuffix}`); + lines.push(formatSingleChat(msg)); } lines.push(''); return lines.join('\n'); @@ -127,8 +121,9 @@ function formatSingleChat(msg: MessageInRow): string { const time = formatTime(msg.timestamp); const text = content.text || ''; const idAttr = msg.seq != null ? ` id="${msg.seq}"` : ''; + const replyPrefix = formatReplyContext(content.replyTo); const attachmentsSuffix = formatAttachments(content.attachments); - return `${escapeXml(text)}${attachmentsSuffix}`; + return `${replyPrefix}${escapeXml(text)}${attachmentsSuffix}`; } function formatTaskMessage(msg: MessageInRow): string { @@ -153,13 +148,26 @@ function formatSystemMessage(msg: MessageInRow): string { return `[SYSTEM RESPONSE]\n\nAction: ${content.action || 'unknown'}\nStatus: ${content.status || 'unknown'}\nResult: ${JSON.stringify(content.result || null)}`; } +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function formatReplyContext(replyTo: any): string { + if (!replyTo) return ''; + const sender = replyTo.sender || 'Unknown'; + const text = replyTo.text || ''; + const preview = text.length > 100 ? text.slice(0, 100) + '…' : text; + return `\n${escapeXml(preview)}\n`; +} + // eslint-disable-next-line @typescript-eslint/no-explicit-any function formatAttachments(attachments: any[] | undefined): string { if (!Array.isArray(attachments) || attachments.length === 0) return ''; const parts = attachments.map((a) => { const name = a.name || a.filename || 'attachment'; const type = a.type || 'file'; + const localPath = a.localPath ? `/workspace/${a.localPath}` : ''; const url = a.url || ''; + if (localPath) { + return `[${type}: ${escapeXml(name)} — saved to ${escapeXml(localPath)}]`; + } return url ? `[${type}: ${escapeXml(name)} (${escapeXml(url)})]` : `[${type}: ${escapeXml(name)}]`; }); return '\n' + parts.join('\n'); diff --git a/container/agent-runner/src/index.ts b/container/agent-runner/src/index.ts index 8f91e6e..1513f5c 100644 --- a/container/agent-runner/src/index.ts +++ b/container/agent-runner/src/index.ts @@ -76,20 +76,36 @@ async function main(): Promise { CLAUDE_CODE_AUTO_COMPACT_WINDOW: '165000', }; + // Build MCP servers config: nanoclaw built-in + any additional from host + const mcpServers: Record }> = { + nanoclaw: { + command: 'node', + args: [mcpServerPath], + env: { + SESSION_INBOUND_DB_PATH: process.env.SESSION_INBOUND_DB_PATH || '/workspace/inbound.db', + SESSION_OUTBOUND_DB_PATH: process.env.SESSION_OUTBOUND_DB_PATH || '/workspace/outbound.db', + SESSION_HEARTBEAT_PATH: process.env.SESSION_HEARTBEAT_PATH || '/workspace/.heartbeat', + }, + }, + }; + + // Merge additional MCP servers from host configuration + if (process.env.NANOCLAW_MCP_SERVERS) { + try { + const additional = JSON.parse(process.env.NANOCLAW_MCP_SERVERS) as Record }>; + for (const [name, config] of Object.entries(additional)) { + mcpServers[name] = config; + log(`Additional MCP server: ${name} (${config.command})`); + } + } catch (e) { + log(`Failed to parse NANOCLAW_MCP_SERVERS: ${e}`); + } + } + await runPollLoop({ provider, cwd: CWD, - mcpServers: { - nanoclaw: { - command: 'node', - args: [mcpServerPath], - env: { - SESSION_INBOUND_DB_PATH: process.env.SESSION_INBOUND_DB_PATH || '/workspace/inbound.db', - SESSION_OUTBOUND_DB_PATH: process.env.SESSION_OUTBOUND_DB_PATH || '/workspace/outbound.db', - SESSION_HEARTBEAT_PATH: process.env.SESSION_HEARTBEAT_PATH || '/workspace/.heartbeat', - }, - }, - }, + mcpServers, systemPrompt, env, additionalDirectories: additionalDirectories.length > 0 ? additionalDirectories : undefined, diff --git a/container/agent-runner/src/mcp-tools/agents.ts b/container/agent-runner/src/mcp-tools/agents.ts index 54e50b6..a9443de 100644 --- a/container/agent-runner/src/mcp-tools/agents.ts +++ b/container/agent-runner/src/mcp-tools/agents.ts @@ -1,6 +1,7 @@ /** - * Agent-to-agent MCP tools: send_to_agent. + * Agent-to-agent MCP tools: send_to_agent, create_agent. */ +import { findQuestionResponse, markCompleted } from '../db/messages-in.js'; import { writeMessageOut } from '../db/messages-out.js'; import type { McpToolDefinition } from './types.js'; @@ -20,6 +21,10 @@ function err(text: string) { return { content: [{ type: 'text' as const, text: `Error: ${text}` }], isError: true }; } +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + export const sendToAgent: McpToolDefinition = { tool: { name: 'send_to_agent', @@ -55,4 +60,56 @@ export const sendToAgent: McpToolDefinition = { }, }; -export const agentTools: McpToolDefinition[] = [sendToAgent]; +export const createAgent: McpToolDefinition = { + tool: { + name: 'create_agent', + description: 'Create a new agent group dynamically. Returns the new agent group ID.', + inputSchema: { + type: 'object' as const, + properties: { + name: { type: 'string', description: 'Agent display name' }, + instructions: { type: 'string', description: 'CLAUDE.md content (agent instructions/personality)' }, + folder: { type: 'string', description: 'Folder name (default: auto-generated from name)' }, + }, + required: ['name'], + }, + }, + async handler(args) { + const name = args.name as string; + if (!name) return err('name is required'); + + const requestId = generateId(); + + writeMessageOut({ + id: requestId, + kind: 'system', + content: JSON.stringify({ + action: 'create_agent', + requestId, + name, + instructions: (args.instructions as string) || null, + folder: (args.folder as string) || null, + }), + }); + + log(`create_agent: ${requestId} → "${name}"`); + + // Poll for host response + const deadline = Date.now() + 30_000; + while (Date.now() < deadline) { + const response = findQuestionResponse(requestId); + if (response) { + const parsed = JSON.parse(response.content); + markCompleted([response.id]); + if (parsed.status === 'success') { + return ok(`Agent created: ${parsed.result.agentGroupId} (name: ${parsed.result.name}, folder: ${parsed.result.folder})`); + } + return err(parsed.result?.error || 'Failed to create agent'); + } + await sleep(1000); + } + return err('Timed out waiting for agent creation response'); + }, +}; + +export const agentTools: McpToolDefinition[] = [sendToAgent, createAgent]; diff --git a/container/agent-runner/src/mcp-tools/index.ts b/container/agent-runner/src/mcp-tools/index.ts index 254d802..f98143d 100644 --- a/container/agent-runner/src/mcp-tools/index.ts +++ b/container/agent-runner/src/mcp-tools/index.ts @@ -14,12 +14,13 @@ import { coreTools } from './core.js'; import { schedulingTools } from './scheduling.js'; import { interactiveTools } from './interactive.js'; import { agentTools } from './agents.js'; +import { selfModTools } from './self-mod.js'; function log(msg: string): void { console.error(`[mcp-tools] ${msg}`); } -const allTools: McpToolDefinition[] = [...coreTools, ...schedulingTools, ...interactiveTools, ...agentTools]; +const allTools: McpToolDefinition[] = [...coreTools, ...schedulingTools, ...interactiveTools, ...agentTools, ...selfModTools]; const toolMap = new Map(); for (const t of allTools) { diff --git a/container/agent-runner/src/mcp-tools/self-mod.ts b/container/agent-runner/src/mcp-tools/self-mod.ts new file mode 100644 index 0000000..9a0ef18 --- /dev/null +++ b/container/agent-runner/src/mcp-tools/self-mod.ts @@ -0,0 +1,155 @@ +/** + * Self-modification MCP tools: install_packages, add_mcp_server, request_rebuild. + * + * These tools request changes to the agent's container configuration. + * install_packages and request_rebuild require admin approval. + * add_mcp_server takes effect on next container restart without approval. + */ +import { findQuestionResponse, markCompleted } from '../db/messages-in.js'; +import { writeMessageOut } from '../db/messages-out.js'; +import type { McpToolDefinition } from './types.js'; + +function log(msg: string): void { + console.error(`[mcp-tools] ${msg}`); +} + +function generateId(): string { + return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +function ok(text: string) { + return { content: [{ type: 'text' as const, text }] }; +} + +function err(text: string) { + return { content: [{ type: 'text' as const, text: `Error: ${text}` }], isError: true }; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function pollForResponse(requestId: string, timeoutMs: number) { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const response = findQuestionResponse(requestId); + if (response) { + const parsed = JSON.parse(response.content); + markCompleted([response.id]); + if (parsed.status === 'success') { + return ok(JSON.stringify(parsed.result || 'Success')); + } + return err(parsed.result?.error || parsed.selectedOption || 'Request denied'); + } + await sleep(2000); + } + return err(`Request timed out after ${timeoutMs / 1000}s`); +} + +export const installPackages: McpToolDefinition = { + tool: { + name: 'install_packages', + description: + 'Request installation of system (apt) or Node.js (npm) packages in the container. Requires admin approval. Takes effect after container rebuild.', + inputSchema: { + type: 'object' as const, + properties: { + apt: { type: 'array', items: { type: 'string' }, description: 'apt packages to install' }, + npm: { type: 'array', items: { type: 'string' }, description: 'npm packages to install globally' }, + reason: { type: 'string', description: 'Why these packages are needed' }, + }, + }, + }, + async handler(args) { + const apt = (args.apt as string[]) || []; + const npm = (args.npm as string[]) || []; + if (apt.length === 0 && npm.length === 0) return err('At least one apt or npm package is required'); + + const requestId = generateId(); + writeMessageOut({ + id: requestId, + kind: 'system', + content: JSON.stringify({ + action: 'install_packages', + requestId, + apt, + npm, + reason: (args.reason as string) || '', + }), + }); + + log(`install_packages: ${requestId} → apt=[${apt.join(',')}] npm=[${npm.join(',')}]`); + return await pollForResponse(requestId, 300_000); + }, +}; + +export const addMcpServer: McpToolDefinition = { + tool: { + name: 'add_mcp_server', + description: + "Add an MCP server to this agent's configuration. Takes effect on next container restart (no rebuild needed, no approval required).", + inputSchema: { + type: 'object' as const, + properties: { + name: { type: 'string', description: 'MCP server name (unique identifier)' }, + command: { type: 'string', description: 'Command to run the MCP server' }, + args: { type: 'array', items: { type: 'string' }, description: 'Command arguments' }, + env: { type: 'object', description: 'Environment variables for the server' }, + }, + required: ['name', 'command'], + }, + }, + async handler(args) { + const name = args.name as string; + const command = args.command as string; + if (!name || !command) return err('name and command are required'); + + const requestId = generateId(); + writeMessageOut({ + id: requestId, + kind: 'system', + content: JSON.stringify({ + action: 'add_mcp_server', + requestId, + name, + command, + args: (args.args as string[]) || [], + env: (args.env as Record) || {}, + }), + }); + + log(`add_mcp_server: ${requestId} → "${name}" (${command})`); + return await pollForResponse(requestId, 30_000); + }, +}; + +export const requestRebuild: McpToolDefinition = { + tool: { + name: 'request_rebuild', + description: + 'Request a container rebuild to apply pending package installations. Requires admin approval. The current container will be stopped and restarted with the new image.', + inputSchema: { + type: 'object' as const, + properties: { + reason: { type: 'string', description: 'Why the rebuild is needed' }, + }, + }, + }, + async handler(args) { + const requestId = generateId(); + writeMessageOut({ + id: requestId, + kind: 'system', + content: JSON.stringify({ + action: 'request_rebuild', + requestId, + reason: (args.reason as string) || '', + }), + }); + + log(`request_rebuild: ${requestId}`); + return await pollForResponse(requestId, 300_000); + }, +}; + +export const selfModTools: McpToolDefinition[] = [installPackages, addMcpServer, requestRebuild]; diff --git a/groups/global/CLAUDE.md b/groups/global/CLAUDE.md index b3c44c6..d2b2658 100644 --- a/groups/global/CLAUDE.md +++ b/groups/global/CLAUDE.md @@ -77,6 +77,34 @@ Standard Markdown works: `**bold**`, `*italic*`, `[links](url)`, `# headings`. --- +## Installing Packages & Tools + +Your container is ephemeral — anything installed via `apt-get` or `npm install -g` is lost on restart. To install packages that persist, use the self-modification tools: + +1. **`install_packages`** — request system (apt) or global npm packages. Requires admin approval. +2. **`request_rebuild`** — rebuild your container image so approved packages are baked in. Always call this after `install_packages` to apply the changes. + +Example flow: +``` +install_packages({ apt: ["ffmpeg"], npm: ["@xenova/transformers"], reason: "Audio transcription" }) +# → Admin gets an approval card → approves +request_rebuild({ reason: "Apply ffmpeg + transformers" }) +# → Admin approves → image rebuilt with the packages +``` + +**When to use this vs workspace npm install:** +- `npm install` in `/workspace/agent/` persists on disk (it's mounted) but isn't on the global PATH — use it for project-level dependencies +- `install_packages` is for system tools (ffmpeg, imagemagick) and global npm packages that need to be on PATH + +### MCP Servers + +Use **`add_mcp_server`** to add an MCP server to your configuration, then **`request_rebuild`** to apply. Browse available servers at https://mcp.so — it's a curated directory of high-quality MCP servers. Most Node.js servers run via `npx`, e.g.: + +``` +add_mcp_server({ name: "memory", command: "npx", args: ["@modelcontextprotocol/server-memory"] }) +request_rebuild({ reason: "Add memory MCP server" }) +``` + ## Task Scripts For any recurring task, use `schedule_task`. Frequent agent invocations — especially multiple times a day — consume API credits and can risk account restrictions. If a simple check can determine whether action is needed, add a `script` — it runs first, and the agent is only called when the check passes. This keeps invocations to a minimum. diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index 23271ed..d02f62c 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -67,8 +67,8 @@ export interface ChannelAdapter { teardown(): Promise; isConnected(): boolean; - // Outbound delivery - deliver(platformId: string, threadId: string | null, message: OutboundMessage): Promise; + // Outbound delivery — returns the platform message ID if available + deliver(platformId: string, threadId: string | null, message: OutboundMessage): Promise; // Optional setTyping?(platformId: string, threadId: string | null): Promise; diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index 2fc183b..25ceab3 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -54,8 +54,9 @@ function createMockAdapter( return setupConfig !== null; }, - async deliver(_platformId: string, _threadId: string | null, message: OutboundMessage) { + async deliver(_platformId: string, _threadId: string | null, message: OutboundMessage): Promise { delivered.push(message); + return undefined; }, async setTyping() {}, @@ -213,8 +214,8 @@ describe('channel + router integration', () => { setDeliveryAdapter({ async deliver(channelType, platformId, threadId, kind, content) { const adapter = getChannelAdapter(channelType); - if (!adapter) return; - await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) }); + if (!adapter) return undefined; + return adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) }); }, }); diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 1d84b00..9f8f9d2 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -30,11 +30,23 @@ interface GatewayAdapter extends Adapter { ): Promise; } +/** Reply context extracted from a platform's raw message. */ +export interface ReplyContext { + text: string; + sender: string; +} + +/** Extract reply context from a platform-specific raw message. Return null if no reply. */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ReplyContextExtractor = (raw: Record) => ReplyContext | null; + export interface ChatSdkBridgeConfig { adapter: Adapter; concurrency?: ConcurrencyStrategy; /** Bot token for authenticating forwarded Gateway events (required for interaction handling). */ botToken?: string; + /** Platform-specific reply context extraction. */ + extractReplyContext?: ReplyContextExtractor; } export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter { @@ -53,11 +65,50 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter return map; } - function messageToInbound(message: ChatMessage): InboundMessage { + async function messageToInbound(message: ChatMessage): Promise { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const serialized = message.toJSON() as Record; + + // Download attachment data before serialization loses fetchData() + if (message.attachments && message.attachments.length > 0) { + const enriched = []; + for (const att of message.attachments) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const entry: Record = { + type: att.type, + name: att.name, + mimeType: att.mimeType, + size: att.size, + width: (att as unknown as Record).width, + height: (att as unknown as Record).height, + }; + if (att.fetchData) { + try { + const buffer = await att.fetchData(); + entry.data = buffer.toString('base64'); + } catch (err) { + log.warn('Failed to download attachment', { type: att.type, err }); + } + } + enriched.push(entry); + } + serialized.attachments = enriched; + } + + // Extract reply context via platform-specific hook + if (config.extractReplyContext && message.raw) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const replyTo = config.extractReplyContext(message.raw as Record); + if (replyTo) serialized.replyTo = replyTo; + } + + // Drop raw to save DB space (can be very large) + serialized.raw = undefined; + return { id: message.id, kind: 'chat-sdk', - content: message.toJSON(), + content: serialized, timestamp: message.metadata.dateSent.toISOString(), }; } @@ -83,20 +134,20 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter // Subscribed threads — forward all messages chat.onSubscribedMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - setupConfig.onInbound(channelId, thread.id, messageToInbound(message)); + setupConfig.onInbound(channelId, thread.id, await messageToInbound(message)); }); // @mention in unsubscribed thread — forward + subscribe chat.onNewMention(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - setupConfig.onInbound(channelId, thread.id, messageToInbound(message)); + setupConfig.onInbound(channelId, thread.id, await messageToInbound(message)); await thread.subscribe(); }); // DMs — always forward + subscribe chat.onDirectMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - setupConfig.onInbound(channelId, null, messageToInbound(message)); + setupConfig.onInbound(channelId, null, await messageToInbound(message)); await thread.subscribe(); }); @@ -108,6 +159,17 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter const questionId = parts[1]; const selectedOption = event.value || ''; const userId = event.user?.userId || ''; + + // Update the card to show the selected answer and remove buttons + try { + const tid = event.threadId; + await adapter.editMessage(tid, event.messageId, { + markdown: `❓ **Question**\n\n${selectedOption ? `✅ **${selectedOption}**` : '(clicked)'}`, + }); + } catch (err) { + log.warn('Failed to update card after action', { err }); + } + setupConfig.onAction(questionId, selectedOption, userId); }); @@ -161,7 +223,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter log.info('Chat SDK bridge initialized', { adapter: adapter.name }); }, - async deliver(platformId: string, threadId: string | null, message) { + async deliver(platformId: string, threadId: string | null, message): Promise { // platformId is already in the adapter's encoded format (e.g. "telegram:6037840640", // "discord:guildId:channelId") — use it directly as the thread ID const tid = threadId ?? platformId; @@ -190,24 +252,36 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter Actions(options.map((opt) => Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }))), ], }); - await adapter.postMessage(tid, { card, fallbackText: `${content.question}\nOptions: ${options.join(', ')}` }); - return; + const result = await adapter.postMessage(tid, { + card, + fallbackText: `${content.question}\nOptions: ${options.join(', ')}`, + }); + return result?.id; } // Normal message const text = (content.markdown as string) || (content.text as string); if (text) { // Attach files if present (FileUpload format: { data, filename }) - const fileUploads = message.files?.map((f) => ({ data: f.data, filename: f.filename })); + const fileUploads = message.files?.map((f: { data: Buffer; filename: string }) => ({ + data: f.data, + filename: f.filename, + })); if (fileUploads && fileUploads.length > 0) { - await adapter.postMessage(tid, { markdown: text, files: fileUploads }); + const result = await adapter.postMessage(tid, { markdown: text, files: fileUploads }); + return result?.id; } else { - await adapter.postMessage(tid, { markdown: text }); + const result = await adapter.postMessage(tid, { markdown: text }); + return result?.id; } } else if (message.files && message.files.length > 0) { // Files only, no text - const fileUploads = message.files.map((f) => ({ data: f.data, filename: f.filename })); - await adapter.postMessage(tid, { markdown: '', files: fileUploads }); + const fileUploads = message.files.map((f: { data: Buffer; filename: string }) => ({ + data: f.data, + filename: f.filename, + })); + const result = await adapter.postMessage(tid, { markdown: '', files: fileUploads }); + return result?.id; } }, diff --git a/src/channels/discord.ts b/src/channels/discord.ts index 01ed4c5..d23a1e2 100644 --- a/src/channels/discord.ts +++ b/src/channels/discord.ts @@ -5,9 +5,19 @@ import { createDiscordAdapter } from '@chat-adapter/discord'; import { readEnvFile } from '../env.js'; -import { createChatSdkBridge } from './chat-sdk-bridge.js'; +import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js'; import { registerChannelAdapter } from './channel-registry.js'; +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function extractReplyContext(raw: Record): ReplyContext | null { + if (!raw.referenced_message) return null; + const reply = raw.referenced_message; + return { + text: reply.content || '', + sender: reply.author?.global_name || reply.author?.username || 'Unknown', + }; +} + registerChannelAdapter('discord', { factory: () => { const env = readEnvFile(['DISCORD_BOT_TOKEN', 'DISCORD_PUBLIC_KEY', 'DISCORD_APPLICATION_ID']); @@ -17,6 +27,11 @@ registerChannelAdapter('discord', { publicKey: env.DISCORD_PUBLIC_KEY, applicationId: env.DISCORD_APPLICATION_ID, }); - return createChatSdkBridge({ adapter: discordAdapter, concurrency: 'concurrent', botToken: env.DISCORD_BOT_TOKEN }); + return createChatSdkBridge({ + adapter: discordAdapter, + concurrency: 'concurrent', + botToken: env.DISCORD_BOT_TOKEN, + extractReplyContext, + }); }, }); diff --git a/src/channels/telegram.ts b/src/channels/telegram.ts index c4ae5fe..345419f 100644 --- a/src/channels/telegram.ts +++ b/src/channels/telegram.ts @@ -5,9 +5,19 @@ import { createTelegramAdapter } from '@chat-adapter/telegram'; import { readEnvFile } from '../env.js'; -import { createChatSdkBridge } from './chat-sdk-bridge.js'; +import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js'; import { registerChannelAdapter } from './channel-registry.js'; +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function extractReplyContext(raw: Record): ReplyContext | null { + if (!raw.reply_to_message) return null; + const reply = raw.reply_to_message; + return { + text: reply.text || reply.caption || '', + sender: reply.from?.first_name || reply.from?.username || 'Unknown', + }; +} + registerChannelAdapter('telegram', { factory: () => { const env = readEnvFile(['TELEGRAM_BOT_TOKEN']); @@ -16,6 +26,6 @@ registerChannelAdapter('telegram', { botToken: env.TELEGRAM_BOT_TOKEN, mode: 'polling', }); - return createChatSdkBridge({ adapter: telegramAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: telegramAdapter, concurrency: 'concurrent', extractReplyContext }); }, }); diff --git a/src/container-runner.ts b/src/container-runner.ts index bc54632..743b7ce 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -3,7 +3,7 @@ * Spawns agent containers with session folder + agent group folder mounts. * The container runs the v2 agent-runner which polls the session DB. */ -import { ChildProcess, spawn } from 'child_process'; +import { ChildProcess, execSync, spawn } from 'child_process'; import fs from 'fs'; import path from 'path'; @@ -274,9 +274,19 @@ async function buildContainerArgs( } } + // Pass additional MCP servers from container config + const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {}; + if (containerConfig.mcpServers && Object.keys(containerConfig.mcpServers).length > 0) { + args.push('-e', `NANOCLAW_MCP_SERVERS=${JSON.stringify(containerConfig.mcpServers)}`); + } + // Override entrypoint: compile agent-runner source, run v2 entry point (no stdin) args.push('--entrypoint', 'bash'); - args.push(CONTAINER_IMAGE); + + // Use per-agent-group image if one has been built, otherwise base image + const imageTag = containerConfig.imageTag || CONTAINER_IMAGE; + args.push(imageTag); + args.push( '-c', 'cd /app && npx tsc --outDir /tmp/dist 2>&1 >&2 && ln -sf /app/node_modules /tmp/dist/node_modules && node /tmp/dist/index.js', @@ -284,3 +294,51 @@ async function buildContainerArgs( return args; } + +/** Build a per-agent-group Docker image with custom packages. */ +export async function buildAgentGroupImage(agentGroupId: string): Promise { + const agentGroup = getAgentGroup(agentGroupId); + if (!agentGroup) throw new Error('Agent group not found'); + + const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {}; + const packages = containerConfig.packages || { apt: [], npm: [] }; + const aptPackages = (packages.apt || []) as string[]; + const npmPackages = (packages.npm || []) as string[]; + + if (aptPackages.length === 0 && npmPackages.length === 0) { + throw new Error('No packages to install. Use install_packages first.'); + } + + let dockerfile = `FROM ${CONTAINER_IMAGE}\nUSER root\n`; + if (aptPackages.length > 0) { + dockerfile += `RUN apt-get update && apt-get install -y ${aptPackages.join(' ')} && rm -rf /var/lib/apt/lists/*\n`; + } + if (npmPackages.length > 0) { + dockerfile += `RUN npm install -g ${npmPackages.join(' ')}\n`; + } + dockerfile += 'USER node\n'; + + const imageTag = `nanoclaw-agent:${agentGroupId}`; + + log.info('Building per-agent-group image', { agentGroupId, imageTag, apt: aptPackages, npm: npmPackages }); + + // Write Dockerfile to temp file and build + const tmpDockerfile = path.join(DATA_DIR, `Dockerfile.${agentGroupId}`); + fs.writeFileSync(tmpDockerfile, dockerfile); + try { + execSync(`${CONTAINER_RUNTIME_BIN} build -t ${imageTag} -f ${tmpDockerfile} .`, { + cwd: DATA_DIR, + stdio: 'pipe', + timeout: 300_000, + }); + } finally { + fs.unlinkSync(tmpDockerfile); + } + + // Store the image tag in container_config + containerConfig.imageTag = imageTag; + const { updateAgentGroup } = await import('./db/agent-groups.js'); + updateAgentGroup(agentGroupId, { container_config: JSON.stringify(containerConfig) }); + + log.info('Per-agent-group image built', { agentGroupId, imageTag }); +} diff --git a/src/db/db-v2.test.ts b/src/db/db-v2.test.ts index bea9334..81cd68e 100644 --- a/src/db/db-v2.test.ts +++ b/src/db/db-v2.test.ts @@ -62,7 +62,7 @@ describe('migrations', () => { const db = initTestDb(); runMigrations(db); const row = db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number }; - expect(row.v).toBe(2); + expect(row.v).toBe(3); }); }); diff --git a/src/db/messaging-groups.ts b/src/db/messaging-groups.ts index 6c792d8..1acf16f 100644 --- a/src/db/messaging-groups.ts +++ b/src/db/messaging-groups.ts @@ -109,3 +109,14 @@ export function updateMessagingGroupAgent( export function deleteMessagingGroupAgent(id: string): void { getDb().prepare('DELETE FROM messaging_group_agents WHERE id = ?').run(id); } + +/** Get all messaging groups wired to an agent group (reverse lookup). */ +export function getMessagingGroupsByAgentGroup(agentGroupId: string): MessagingGroup[] { + return getDb() + .prepare( + `SELECT mg.* FROM messaging_groups mg + JOIN messaging_group_agents mga ON mga.messaging_group_id = mg.id + WHERE mga.agent_group_id = ?`, + ) + .all(agentGroupId) as MessagingGroup[]; +} diff --git a/src/db/migrations/003-pending-approvals.ts b/src/db/migrations/003-pending-approvals.ts new file mode 100644 index 0000000..9fc2704 --- /dev/null +++ b/src/db/migrations/003-pending-approvals.ts @@ -0,0 +1,18 @@ +import type { Migration } from './index.js'; + +export const migration003: Migration = { + version: 3, + name: 'pending-approvals', + up(db) { + db.exec(` + CREATE TABLE pending_approvals ( + approval_id TEXT PRIMARY KEY, + session_id TEXT NOT NULL REFERENCES sessions(id), + request_id TEXT NOT NULL, + action TEXT NOT NULL, + payload TEXT NOT NULL, + created_at TEXT NOT NULL + ); + `); + }, +}; diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts index 114a521..3a51c5f 100644 --- a/src/db/migrations/index.ts +++ b/src/db/migrations/index.ts @@ -3,6 +3,7 @@ import type Database from 'better-sqlite3'; import { log } from '../../log.js'; import { migration001 } from './001-initial.js'; import { migration002 } from './002-chat-sdk-state.js'; +import { migration003 } from './003-pending-approvals.js'; export interface Migration { version: number; @@ -10,7 +11,7 @@ export interface Migration { up: (db: Database.Database) => void; } -const migrations: Migration[] = [migration001, migration002]; +const migrations: Migration[] = [migration001, migration002, migration003]; export function runMigrations(db: Database.Database): void { db.exec(` diff --git a/src/db/schema.ts b/src/db/schema.ts index b54210d..d2ed36a 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -93,11 +93,13 @@ CREATE TABLE messages_in ( content TEXT NOT NULL ); --- Host tracks which messages_out IDs have been delivered. +-- Host tracks delivery outcomes for messages_out IDs. -- Avoids writing to outbound.db (container-owned). CREATE TABLE delivered ( - message_out_id TEXT PRIMARY KEY, - delivered_at TEXT NOT NULL + message_out_id TEXT PRIMARY KEY, + platform_message_id TEXT, + status TEXT NOT NULL DEFAULT 'delivered', + delivered_at TEXT NOT NULL ); `; diff --git a/src/db/sessions.ts b/src/db/sessions.ts index c2373f3..45e911f 100644 --- a/src/db/sessions.ts +++ b/src/db/sessions.ts @@ -1,4 +1,4 @@ -import type { PendingQuestion, Session } from '../types.js'; +import type { PendingApproval, PendingQuestion, Session } from '../types.js'; import { getDb } from './connection.js'; // ── Sessions ── @@ -90,3 +90,24 @@ export function getPendingQuestion(questionId: string): PendingQuestion | undefi export function deletePendingQuestion(questionId: string): void { getDb().prepare('DELETE FROM pending_questions WHERE question_id = ?').run(questionId); } + +// ── Pending Approvals ── + +export function createPendingApproval(pa: PendingApproval): void { + getDb() + .prepare( + `INSERT INTO pending_approvals (approval_id, session_id, request_id, action, payload, created_at) + VALUES (@approval_id, @session_id, @request_id, @action, @payload, @created_at)`, + ) + .run(pa); +} + +export function getPendingApproval(approvalId: string): PendingApproval | undefined { + return getDb().prepare('SELECT * FROM pending_approvals WHERE approval_id = ?').get(approvalId) as + | PendingApproval + | undefined; +} + +export function deletePendingApproval(approvalId: string): void { + getDb().prepare('DELETE FROM pending_approvals WHERE approval_id = ?').run(approvalId); +} diff --git a/src/delivery.ts b/src/delivery.ts index 12676f3..74be38d 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -11,16 +11,22 @@ import Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; -import { getRunningSessions, getActiveSessions, createPendingQuestion } from './db/sessions.js'; -import { getAgentGroup } from './db/agent-groups.js'; +import { GROUPS_DIR } from './config.js'; +import { getRunningSessions, getActiveSessions, createPendingQuestion, getSession, createPendingApproval } from './db/sessions.js'; +import { getAgentGroup, getAdminAgentGroup, createAgentGroup, updateAgentGroup } from './db/agent-groups.js'; +import { getMessagingGroupsByAgentGroup } from './db/messaging-groups.js'; import { log } from './log.js'; -import { openInboundDb, openOutboundDb, sessionDir, inboundDbPath } from './session-manager.js'; -import { resetContainerIdleTimer } from './container-runner.js'; +import { openInboundDb, openOutboundDb, sessionDir, inboundDbPath, resolveSession, writeSessionMessage, writeSystemResponse } from './session-manager.js'; +import { resetContainerIdleTimer, wakeContainer } from './container-runner.js'; import type { OutboundFile } from './channels/adapter.js'; import type { Session } from './types.js'; const ACTIVE_POLL_MS = 1000; const SWEEP_POLL_MS = 60_000; +const MAX_DELIVERY_ATTEMPTS = 3; + +/** Track delivery attempt counts. Resets on process restart (gives failed messages a fresh chance). */ +const deliveryAttempts = new Map(); export interface ChannelDeliveryAdapter { deliver( @@ -30,7 +36,7 @@ export interface ChannelDeliveryAdapter { kind: string, content: string, files?: OutboundFile[], - ): Promise; + ): Promise; setTyping?(channelType: string, platformId: string, threadId: string | null): Promise; } @@ -136,16 +142,44 @@ async function deliverSessionMessages(session: Session): Promise { const undelivered = allDue.filter((m) => !deliveredIds.has(m.id)); if (undelivered.length === 0) return; + // Ensure platform_message_id column exists (migration for existing sessions) + migrateDeliveredTable(inDb); + for (const msg of undelivered) { try { - await deliverMessage(msg, session, inDb); - // Track delivery in inbound.db (host-owned) — not outbound.db + const platformMsgId = await deliverMessage(msg, session, inDb); inDb - .prepare("INSERT OR IGNORE INTO delivered (message_out_id, delivered_at) VALUES (?, datetime('now'))") - .run(msg.id); + .prepare( + "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, ?, 'delivered', datetime('now'))", + ) + .run(msg.id, platformMsgId ?? null); + deliveryAttempts.delete(msg.id); resetContainerIdleTimer(session.id); } catch (err) { - log.error('Failed to deliver message', { messageId: msg.id, sessionId: session.id, err }); + const attempts = (deliveryAttempts.get(msg.id) ?? 0) + 1; + deliveryAttempts.set(msg.id, attempts); + if (attempts >= MAX_DELIVERY_ATTEMPTS) { + log.error('Message delivery failed permanently, giving up', { + messageId: msg.id, + sessionId: session.id, + attempts, + err, + }); + inDb + .prepare( + "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, NULL, 'failed', datetime('now'))", + ) + .run(msg.id); + deliveryAttempts.delete(msg.id); + } else { + log.warn('Message delivery failed, will retry', { + messageId: msg.id, + sessionId: session.id, + attempt: attempts, + maxAttempts: MAX_DELIVERY_ATTEMPTS, + err, + }); + } } } } finally { @@ -165,7 +199,7 @@ async function deliverMessage( }, session: Session, inDb: Database.Database, -): Promise { +): Promise { if (!deliveryAdapter) { log.warn('No delivery adapter configured, dropping message', { id: msg.id }); return; @@ -181,8 +215,7 @@ async function deliverMessage( // Agent-to-agent — route to target session if (msg.channel_type === 'agent') { - log.info('Agent-to-agent message', { from: session.id, target: msg.platform_id }); - // TODO: route to target agent's session DB + await routeAgentMessage(msg, session); return; } @@ -222,11 +255,19 @@ async function deliverMessage( if (files.length === 0) files = undefined; } - await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content, files); + const platformMsgId = await deliveryAdapter.deliver( + msg.channel_type, + msg.platform_id, + msg.thread_id, + msg.kind, + msg.content, + files, + ); log.info('Message delivered', { id: msg.id, channelType: msg.channel_type, platformId: msg.platform_id, + platformMsgId, fileCount: files?.length, }); @@ -234,6 +275,71 @@ async function deliverMessage( if (fs.existsSync(outboxDir)) { fs.rmSync(outboxDir, { recursive: true, force: true }); } + + return platformMsgId; +} + +/** Route an agent-to-agent message to the target agent's session. */ +async function routeAgentMessage( + msg: { id: string; platform_id: string | null; content: string }, + sourceSession: Session, +): Promise { + const targetAgentGroupId = msg.platform_id; + if (!targetAgentGroupId) { + log.warn('Agent message missing target agent group ID', { id: msg.id }); + return; + } + + const targetGroup = getAgentGroup(targetAgentGroupId); + if (!targetGroup) { + log.warn('Target agent group not found', { id: msg.id, targetAgentGroupId }); + return; + } + + const sourceGroup = getAgentGroup(sourceSession.agent_group_id); + const sourceAgentName = sourceGroup?.name || sourceSession.agent_group_id; + + // Find or create a session for the target agent + const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared'); + + // Enrich content with sender info + const content = JSON.parse(msg.content); + const enrichedContent = JSON.stringify({ + text: content.text, + sender: sourceAgentName, + senderId: sourceSession.agent_group_id, + }); + + const messageId = `agent-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + writeSessionMessage(targetAgentGroupId, targetSession.id, { + id: messageId, + kind: 'chat', + timestamp: new Date().toISOString(), + platformId: sourceSession.agent_group_id, + channelType: 'agent', + threadId: null, + content: enrichedContent, + }); + + log.info('Agent message routed', { from: sourceSession.agent_group_id, to: targetAgentGroupId, targetSession: targetSession.id }); + + const freshSession = getSession(targetSession.id); + if (freshSession) { + await wakeContainer(freshSession); + } +} + +/** Ensure the delivered table has new columns (migration for existing sessions). */ +function migrateDeliveredTable(db: Database.Database): void { + const cols = new Set( + (db.prepare("PRAGMA table_info('delivered')").all() as Array<{ name: string }>).map((c) => c.name), + ); + if (!cols.has('platform_message_id')) { + db.prepare('ALTER TABLE delivered ADD COLUMN platform_message_id TEXT').run(); + } + if (!cols.has('status')) { + db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run(); + } } /** @@ -309,6 +415,207 @@ async function handleSystemAction( break; } + case 'create_agent': { + const requestId = content.requestId as string; + const name = content.name as string; + let folder = + (content.folder as string) || name.toLowerCase().replace(/[^a-z0-9_-]/g, '_').replace(/_+/g, '_'); + const instructions = content.instructions as string | null; + + try { + // Avoid duplicate folders + const { getAgentGroupByFolder } = await import('./db/agent-groups.js'); + if (getAgentGroupByFolder(folder)) { + folder = `${folder}_${Date.now()}`; + } + + const agentGroupId = `ag-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + + createAgentGroup({ + id: agentGroupId, + name, + folder, + is_admin: 0, + agent_provider: null, + container_config: null, + created_at: new Date().toISOString(), + }); + + const groupPath = path.join(GROUPS_DIR, folder); + fs.mkdirSync(groupPath, { recursive: true }); + + if (instructions) { + fs.writeFileSync(path.join(groupPath, 'CLAUDE.md'), instructions); + } + + writeSystemResponse(session.agent_group_id, session.id, requestId, 'success', { + agentGroupId, + name, + folder, + }); + + log.info('Agent group created via system action', { agentGroupId, name, folder }); + } catch (e) { + writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { + error: e instanceof Error ? e.message : String(e), + }); + } + break; + } + + case 'add_mcp_server': { + const requestId = content.requestId as string; + const serverName = content.name as string; + const command = content.command as string; + const serverArgs = content.args as string[]; + const serverEnv = content.env as Record; + + try { + const agentGroup = getAgentGroup(session.agent_group_id); + if (!agentGroup) throw new Error('Agent group not found'); + + const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {}; + if (!containerConfig.mcpServers) containerConfig.mcpServers = {}; + containerConfig.mcpServers[serverName] = { command, args: serverArgs || [], env: serverEnv || {} }; + + updateAgentGroup(session.agent_group_id, { container_config: JSON.stringify(containerConfig) }); + + writeSystemResponse(session.agent_group_id, session.id, requestId, 'success', { + message: `MCP server "${serverName}" added. Will take effect on next container restart.`, + }); + + log.info('MCP server added', { agentGroupId: session.agent_group_id, name: serverName }); + } catch (e) { + writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { + error: e instanceof Error ? e.message : String(e), + }); + } + break; + } + + case 'install_packages': { + const requestId = content.requestId as string; + const apt = (content.apt as string[]) || []; + const npm = (content.npm as string[]) || []; + const reason = content.reason as string; + + const agentGroup = getAgentGroup(session.agent_group_id); + if (!agentGroup) { + writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { error: 'Agent group not found' }); + break; + } + + // Find admin channel for approval card + const adminGroup = getAdminAgentGroup(); + let approvalChannelType: string | null = null; + let approvalPlatformId: string | null = null; + + if (adminGroup) { + const adminMGs = getMessagingGroupsByAgentGroup(adminGroup.id); + if (adminMGs.length > 0) { + approvalChannelType = adminMGs[0].channel_type; + approvalPlatformId = adminMGs[0].platform_id; + } + } + + if (!approvalChannelType || !approvalPlatformId) { + writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { + error: 'No admin channel found for approval', + }); + break; + } + + const approvalId = `appr-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + + createPendingApproval({ + approval_id: approvalId, + session_id: session.id, + request_id: requestId, + action: 'install_packages', + payload: JSON.stringify({ apt, npm, reason }), + created_at: new Date().toISOString(), + }); + + const packageList = [...apt.map((p: string) => `apt: ${p}`), ...npm.map((p: string) => `npm: ${p}`)].join(', '); + if (deliveryAdapter) { + await deliveryAdapter.deliver( + approvalChannelType, + approvalPlatformId, + null, + 'chat-sdk', + JSON.stringify({ + type: 'ask_question', + questionId: approvalId, + question: `Agent "${agentGroup.name}" requests package installation:\n${packageList}${reason ? `\nReason: ${reason}` : ''}`, + options: ['Approve', 'Reject'], + }), + ); + } + + log.info('Package install approval requested', { approvalId, agentGroup: agentGroup.name, apt, npm }); + break; + } + + case 'request_rebuild': { + const requestId = content.requestId as string; + const reason = content.reason as string; + + const agentGroup = getAgentGroup(session.agent_group_id); + if (!agentGroup) { + writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { error: 'Agent group not found' }); + break; + } + + // Find admin channel for approval card + const adminGroup2 = getAdminAgentGroup(); + let rebuildChannelType: string | null = null; + let rebuildPlatformId: string | null = null; + + if (adminGroup2) { + const adminMGs2 = getMessagingGroupsByAgentGroup(adminGroup2.id); + if (adminMGs2.length > 0) { + rebuildChannelType = adminMGs2[0].channel_type; + rebuildPlatformId = adminMGs2[0].platform_id; + } + } + + if (!rebuildChannelType || !rebuildPlatformId) { + writeSystemResponse(session.agent_group_id, session.id, requestId, 'error', { + error: 'No admin channel found for approval', + }); + break; + } + + const rebuildApprovalId = `appr-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + + createPendingApproval({ + approval_id: rebuildApprovalId, + session_id: session.id, + request_id: requestId, + action: 'request_rebuild', + payload: JSON.stringify({ reason }), + created_at: new Date().toISOString(), + }); + + if (deliveryAdapter) { + await deliveryAdapter.deliver( + rebuildChannelType, + rebuildPlatformId, + null, + 'chat-sdk', + JSON.stringify({ + type: 'ask_question', + questionId: rebuildApprovalId, + question: `Agent "${agentGroup.name}" requests a container rebuild.${reason ? `\nReason: ${reason}` : ''}`, + options: ['Approve', 'Reject'], + }), + ); + } + + log.info('Container rebuild approval requested', { approvalId: rebuildApprovalId, agentGroup: agentGroup.name }); + break; + } + default: log.warn('Unknown system action', { action }); } diff --git a/src/index.ts b/src/index.ts index f24a4cb..0b29e6f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -14,9 +14,10 @@ import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runti import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js'; import { startHostSweep, stopHostSweep } from './host-sweep.js'; import { routeInbound } from './router.js'; -import { getPendingQuestion, deletePendingQuestion, getSession } from './db/sessions.js'; -import { writeSessionMessage } from './session-manager.js'; -import { wakeContainer } from './container-runner.js'; +import { getPendingQuestion, deletePendingQuestion, getPendingApproval, deletePendingApproval, getSession } from './db/sessions.js'; +import { getAgentGroup, updateAgentGroup } from './db/agent-groups.js'; +import { writeSessionMessage, writeSystemResponse } from './session-manager.js'; +import { wakeContainer, buildAgentGroupImage } from './container-runner.js'; import { log } from './log.js'; // Channel barrel — each enabled channel self-registers on import. @@ -83,7 +84,7 @@ async function main(): Promise { log.warn('No adapter for channel type', { channelType }); return; } - await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files }); + return adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files }); }, async setTyping(channelType, platformId, threadId) { const adapter = getChannelAdapter(channelType); @@ -125,8 +126,15 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] { return configs; } -/** Handle a user's response to an ask_user_question card. */ +/** Handle a user's response to an ask_user_question card or an approval card. */ async function handleQuestionResponse(questionId: string, selectedOption: string, userId: string): Promise { + // Check if this is a pending approval (install_packages, request_rebuild) + const approval = getPendingApproval(questionId); + if (approval) { + await handleApprovalResponse(approval, selectedOption, userId); + return; + } + const pq = getPendingQuestion(questionId); if (!pq) { log.warn('Pending question not found (may have expired)', { questionId }); @@ -163,6 +171,66 @@ async function handleQuestionResponse(questionId: string, selectedOption: string await wakeContainer(session); } +/** Handle an admin's response to an approval card. */ +async function handleApprovalResponse( + approval: import('./types.js').PendingApproval, + selectedOption: string, + userId: string, +): Promise { + const session = getSession(approval.session_id); + if (!session) { + deletePendingApproval(approval.approval_id); + return; + } + + if (selectedOption === 'Approve') { + const payload = JSON.parse(approval.payload); + + if (approval.action === 'install_packages') { + const agentGroup = getAgentGroup(session.agent_group_id); + const containerConfig = agentGroup?.container_config ? JSON.parse(agentGroup.container_config) : {}; + if (!containerConfig.packages) containerConfig.packages = { apt: [], npm: [] }; + if (payload.apt) containerConfig.packages.apt.push(...payload.apt); + if (payload.npm) containerConfig.packages.npm.push(...payload.npm); + + updateAgentGroup(session.agent_group_id, { container_config: JSON.stringify(containerConfig) }); + + writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'success', { + message: 'Packages approved. Run request_rebuild to apply.', + approved: { apt: payload.apt, npm: payload.npm }, + }); + + log.info('Package install approved', { approvalId: approval.approval_id, userId }); + } else if (approval.action === 'request_rebuild') { + try { + await buildAgentGroupImage(session.agent_group_id); + writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'success', { + message: 'Container image rebuilt. Changes will take effect on next container start.', + }); + log.info('Container rebuild approved and completed', { approvalId: approval.approval_id, userId }); + } catch (e) { + writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'error', { + error: `Rebuild failed: ${e instanceof Error ? e.message : String(e)}`, + }); + log.error('Container rebuild failed', { approvalId: approval.approval_id, err: e }); + } + } + } else { + // Rejected + writeSystemResponse(session.agent_group_id, session.id, approval.request_id, 'error', { + error: `Request rejected by admin (${userId})`, + }); + log.info('Approval rejected', { approvalId: approval.approval_id, action: approval.action, userId }); + } + + deletePendingApproval(approval.approval_id); + + // Wake container so the agent's polling MCP tool picks up the response + if (session) { + await wakeContainer(session); + } +} + /** Graceful shutdown. */ async function shutdown(signal: string): Promise { log.info('Shutdown signal received', { signal }); diff --git a/src/session-manager.ts b/src/session-manager.ts index 94a1d58..804c38d 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -64,7 +64,7 @@ function generateId(): string { */ export function resolveSession( agentGroupId: string, - messagingGroupId: string, + messagingGroupId: string | null, threadId: string | null, sessionMode: 'shared' | 'per-thread' | 'agent-shared', ): { session: Session; created: boolean } { @@ -74,7 +74,7 @@ export function resolveSession( if (existing) { return { session: existing, created: false }; } - } else { + } else if (messagingGroupId) { const lookupThreadId = sessionMode === 'shared' ? null : threadId; const existing = findSession(messagingGroupId, lookupThreadId); if (existing) { @@ -144,6 +144,9 @@ export function writeSessionMessage( recurrence?: string | null; }, ): void { + // Extract base64 attachment data, save to inbox, replace with file paths + const content = extractAttachmentFiles(agentGroupId, sessionId, message.id, message.content); + const dbPath = inboundDbPath(agentGroupId, sessionId); const db = new Database(dbPath); db.pragma('journal_mode = DELETE'); @@ -166,7 +169,7 @@ export function writeSessionMessage( platformId: message.platformId ?? null, channelType: message.channelType ?? null, threadId: message.threadId ?? null, - content: message.content, + content, processAfter: message.processAfter ?? null, recurrence: message.recurrence ?? null, }); @@ -177,6 +180,44 @@ export function writeSessionMessage( updateSession(sessionId, { last_active: new Date().toISOString() }); } +/** + * If message content has attachments with base64 `data`, save them to + * the session's inbox directory and replace with `localPath`. + */ +function extractAttachmentFiles( + agentGroupId: string, + sessionId: string, + messageId: string, + contentStr: string, +): string { + let parsed: Record; + try { + parsed = JSON.parse(contentStr); + } catch { + return contentStr; + } + + const attachments = parsed.attachments as Array> | undefined; + if (!Array.isArray(attachments)) return contentStr; + + let changed = false; + for (const att of attachments) { + if (typeof att.data === 'string') { + const inboxDir = path.join(sessionDir(agentGroupId, sessionId), 'inbox', messageId); + fs.mkdirSync(inboxDir, { recursive: true }); + const filename = (att.name as string) || `attachment-${Date.now()}`; + const filePath = path.join(inboxDir, filename); + fs.writeFileSync(filePath, Buffer.from(att.data as string, 'base64')); + att.localPath = `inbox/${messageId}/${filename}`; + delete att.data; + changed = true; + log.debug('Saved attachment to inbox', { messageId, filename, size: att.size }); + } + } + + return changed ? JSON.stringify(parsed) : contentStr; +} + /** Open the inbound DB for a session (host reads/writes). */ export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database { const dbPath = inboundDbPath(agentGroupId, sessionId); @@ -201,6 +242,27 @@ export function openSessionDb(agentGroupId: string, sessionId: string): Database return openInboundDb(agentGroupId, sessionId); } +/** Write a system response to a session's inbound.db so the container's findQuestionResponse() picks it up. */ +export function writeSystemResponse( + agentGroupId: string, + sessionId: string, + requestId: string, + status: string, + result: Record, +): void { + writeSessionMessage(agentGroupId, sessionId, { + id: `sys-resp-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'system', + timestamp: new Date().toISOString(), + content: JSON.stringify({ + type: 'question_response', + questionId: requestId, + status, + result, + }), + }); +} + /** Mark a container as running for a session. */ export function markContainerRunning(sessionId: string): void { updateSession(sessionId, { container_status: 'running', last_active: new Date().toISOString() }); diff --git a/src/types.ts b/src/types.ts index 5d473d6..0d6983d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -88,3 +88,14 @@ export interface PendingQuestion { thread_id: string | null; created_at: string; } + +// ── Pending approvals (central DB) ── + +export interface PendingApproval { + approval_id: string; + session_id: string; + request_id: string; + action: string; + payload: string; // JSON + created_at: string; +}