diff --git a/container/agent-runner/src/formatter.ts b/container/agent-runner/src/formatter.ts index ce48030..7324f1b 100644 --- a/container/agent-runner/src/formatter.ts +++ b/container/agent-runner/src/formatter.ts @@ -1,5 +1,51 @@ import type { MessageInRow } from './db/messages-in.js'; +/** + * Command categories for messages starting with '/'. + * - admin: requires NANOCLAW_ADMIN_USER_ID check + * - filtered: silently drop (mark completed without processing) + * - passthrough: pass raw to the agent (no XML wrapping) + * - none: not a command — format normally + */ +export type CommandCategory = 'admin' | 'filtered' | 'passthrough' | 'none'; + +const ADMIN_COMMANDS = new Set(['/remote-control', '/clear', '/compact']); +const FILTERED_COMMANDS = new Set(['/help', '/login', '/logout', '/doctor', '/config']); + +export interface CommandInfo { + category: CommandCategory; + command: string; // the command name (e.g., '/clear') + text: string; // full original text + senderId: string | null; +} + +/** + * Categorize a message as a command or not. + * Only applies to chat/chat-sdk messages. + */ +export function categorizeMessage(msg: MessageInRow): CommandInfo { + const content = parseContent(msg.content); + const text = (content.text || '').trim(); + const senderId = content.senderId || content.author?.userId || null; + + if (!text.startsWith('/')) { + return { category: 'none', command: '', text, senderId }; + } + + // Extract the command name (e.g., '/clear' from '/clear some args') + const command = text.split(/\s/)[0].toLowerCase(); + + if (ADMIN_COMMANDS.has(command)) { + return { category: 'admin', command, text, senderId }; + } + + if (FILTERED_COMMANDS.has(command)) { + return { category: 'filtered', command, text, senderId }; + } + + return { category: 'passthrough', command, text, senderId }; +} + /** * Routing context extracted from messages_in rows. * Copied to messages_out by default so responses go back to the sender. @@ -68,7 +114,8 @@ function formatChatMessages(messages: MessageInRow[]): string { const time = formatTime(msg.timestamp); const text = content.text || ''; const idAttr = msg.seq != null ? ` id="${msg.seq}"` : ''; - lines.push(`${escapeXml(text)}`); + const attachmentsSuffix = formatAttachments(content.attachments); + lines.push(`${escapeXml(text)}${attachmentsSuffix}`); } lines.push(''); return lines.join('\n'); @@ -80,7 +127,8 @@ function formatSingleChat(msg: MessageInRow): string { const time = formatTime(msg.timestamp); const text = content.text || ''; const idAttr = msg.seq != null ? ` id="${msg.seq}"` : ''; - return `${escapeXml(text)}`; + const attachmentsSuffix = formatAttachments(content.attachments); + return `${escapeXml(text)}${attachmentsSuffix}`; } function formatTaskMessage(msg: MessageInRow): string { @@ -105,6 +153,18 @@ function formatSystemMessage(msg: MessageInRow): string { return `[SYSTEM RESPONSE]\n\nAction: ${content.action || 'unknown'}\nStatus: ${content.status || 'unknown'}\nResult: ${JSON.stringify(content.result || null)}`; } +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function formatAttachments(attachments: any[] | undefined): string { + if (!Array.isArray(attachments) || attachments.length === 0) return ''; + const parts = attachments.map((a) => { + const name = a.name || a.filename || 'attachment'; + const type = a.type || 'file'; + const url = a.url || ''; + return url ? `[${type}: ${escapeXml(name)} (${escapeXml(url)})]` : `[${type}: ${escapeXml(name)}]`; + }); + return '\n' + parts.join('\n'); +} + // eslint-disable-next-line @typescript-eslint/no-explicit-any function parseContent(json: string): any { try { diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 8ae1238..aca3766 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -1,6 +1,6 @@ -import { getPendingMessages, markProcessing, markCompleted, touchProcessing } from './db/messages-in.js'; +import { getPendingMessages, markProcessing, markCompleted, touchProcessing, type MessageInRow } from './db/messages-in.js'; import { writeMessageOut } from './db/messages-out.js'; -import { formatMessages, extractRouting, type RoutingContext } from './formatter.js'; +import { formatMessages, extractRouting, categorizeMessage, type RoutingContext } from './formatter.js'; import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js'; const POLL_INTERVAL_MS = 1000; @@ -50,9 +50,69 @@ export async function runPollLoop(config: PollLoopConfig): Promise { markProcessing(ids); const routing = extractRouting(messages); - const prompt = formatMessages(messages); - log(`Processing ${messages.length} message(s), kinds: ${[...new Set(messages.map((m) => m.kind))].join(',')}`); + // Handle commands: categorize chat messages + const adminUserId = config.env.NANOCLAW_ADMIN_USER_ID; + const normalMessages = []; + const commandIds: string[] = []; + + for (const msg of messages) { + if (msg.kind !== 'chat' && msg.kind !== 'chat-sdk') { + normalMessages.push(msg); + continue; + } + + const cmdInfo = categorizeMessage(msg); + + if (cmdInfo.category === 'filtered') { + // Silently drop — mark completed, don't process + log(`Filtered command: ${cmdInfo.command} (msg: ${msg.id})`); + commandIds.push(msg.id); + continue; + } + + if (cmdInfo.category === 'admin') { + if (!adminUserId || cmdInfo.senderId !== adminUserId) { + // Not admin — send error, mark completed + log(`Admin command denied: ${cmdInfo.command} from ${cmdInfo.senderId} (msg: ${msg.id})`); + writeMessageOut({ + id: generateId(), + kind: 'chat', + platform_id: routing.platformId, + channel_type: routing.channelType, + thread_id: routing.threadId, + content: JSON.stringify({ text: `Permission denied: ${cmdInfo.command} requires admin access.` }), + }); + commandIds.push(msg.id); + continue; + } + // Admin user — format as system command + normalMessages.push(msg); + continue; + } + + // passthrough or none + normalMessages.push(msg); + } + + // Mark filtered/denied command messages as completed immediately + if (commandIds.length > 0) { + markCompleted(commandIds); + } + + // If all messages were filtered commands, skip processing + if (normalMessages.length === 0) { + // Mark remaining processing IDs as completed + const remainingIds = ids.filter((id) => !commandIds.includes(id)); + if (remainingIds.length > 0) markCompleted(remainingIds); + log(`All ${messages.length} message(s) were commands, skipping query`); + continue; + } + + // Format messages: passthrough commands get raw text, others get XML + const prompt = formatMessagesWithCommands(normalMessages); + + log(`Processing ${normalMessages.length} message(s), kinds: ${[...new Set(normalMessages.map((m) => m.kind))].join(',')}`); // Set routing context as env vars for MCP tools setRoutingEnv(routing, config.env); @@ -69,8 +129,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise { }); // Process the query while concurrently polling for new messages + const processingIds = ids.filter((id) => !commandIds.includes(id)); try { - const result = await processQuery(query, routing, config, ids); + const result = await processQuery(query, routing, config, processingIds); if (result.sessionId) sessionId = result.sessionId; if (result.resumeAt) resumeAt = result.resumeAt; } catch (err) { @@ -86,11 +147,55 @@ export async function runPollLoop(config: PollLoopConfig): Promise { }); } - markCompleted(ids); + markCompleted(processingIds); log(`Completed ${ids.length} message(s)`); } } +/** + * Format messages, handling passthrough commands differently. + * Passthrough commands (e.g., /foo) are sent raw (no XML wrapping). + * Admin commands from authorized users are formatted as system commands. + * Normal messages get standard XML formatting. + */ +function formatMessagesWithCommands(messages: MessageInRow[]): string { + // Check if any message is a passthrough command + const parts: string[] = []; + const normalBatch: MessageInRow[] = []; + + for (const msg of messages) { + if (msg.kind === 'chat' || msg.kind === 'chat-sdk') { + const cmdInfo = categorizeMessage(msg); + if (cmdInfo.category === 'passthrough') { + // Flush normal batch first + if (normalBatch.length > 0) { + parts.push(formatMessages(normalBatch)); + normalBatch.length = 0; + } + // Pass raw command text (no XML wrapping) + parts.push(cmdInfo.text); + continue; + } + if (cmdInfo.category === 'admin') { + // Format admin command as a system command block + if (normalBatch.length > 0) { + parts.push(formatMessages(normalBatch)); + normalBatch.length = 0; + } + parts.push(`[SYSTEM COMMAND: ${cmdInfo.command}]\n${cmdInfo.text}`); + continue; + } + } + normalBatch.push(msg); + } + + if (normalBatch.length > 0) { + parts.push(formatMessages(normalBatch)); + } + + return parts.join('\n\n'); +} + interface QueryResult { sessionId?: string; resumeAt?: string; diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index 0bd5edd..56eb8f0 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -34,10 +34,17 @@ export interface InboundMessage { timestamp: string; } +/** A file attachment to deliver alongside a message. */ +export interface OutboundFile { + filename: string; + data: Buffer; +} + /** Outbound message from host to adapter. */ export interface OutboundMessage { kind: string; content: unknown; // parsed JSON from messages_out + files?: OutboundFile[]; // file attachments from the session outbox } /** Discovered conversation info (from syncConversations). */ diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 50b6f27..853e2c4 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -104,31 +104,33 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter if (gatewayAbort?.signal.aborted) return; // Capture the long-running listener promise via waitUntil let listenerPromise: Promise | undefined; - adapter - .startGatewayListener!( - { waitUntil: (p: Promise) => { listenerPromise = p; } }, - 24 * 60 * 60 * 1000, - gatewayAbort!.signal, - ) - .then(() => { - // startGatewayListener resolves immediately with a Response; - // the actual work is in the listenerPromise passed to waitUntil - if (listenerPromise) { - listenerPromise - .then(() => { - if (!gatewayAbort?.signal.aborted) { - log.info('Gateway listener expired, restarting', { adapter: adapter.name }); - startGateway(); - } - }) - .catch((err) => { - if (!gatewayAbort?.signal.aborted) { - log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err }); - setTimeout(startGateway, 5000); - } - }); - } - }); + adapter.startGatewayListener!( + { + waitUntil: (p: Promise) => { + listenerPromise = p; + }, + }, + 24 * 60 * 60 * 1000, + gatewayAbort!.signal, + ).then(() => { + // startGatewayListener resolves immediately with a Response; + // the actual work is in the listenerPromise passed to waitUntil + if (listenerPromise) { + listenerPromise + .then(() => { + if (!gatewayAbort?.signal.aborted) { + log.info('Gateway listener expired, restarting', { adapter: adapter.name }); + startGateway(); + } + }) + .catch((err) => { + if (!gatewayAbort?.signal.aborted) { + log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err }); + setTimeout(startGateway, 5000); + } + }); + } + }); }; startGateway(); log.info('Gateway listener started', { adapter: adapter.name }); @@ -156,7 +158,17 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter // Normal message const text = (content.markdown as string) || (content.text as string); if (text) { - await adapter.postMessage(tid, { markdown: text }); + // Attach files if present (FileUpload format: { data, filename }) + const fileUploads = message.files?.map((f) => ({ data: f.data, filename: f.filename })); + if (fileUploads && fileUploads.length > 0) { + await adapter.postMessage(tid, { markdown: text, files: fileUploads }); + } else { + await adapter.postMessage(tid, { markdown: text }); + } + } else if (message.files && message.files.length > 0) { + // Files only, no text + const fileUploads = message.files.map((f) => ({ data: f.data, filename: f.filename })); + await adapter.postMessage(tid, { markdown: '', files: fileUploads }); } }, diff --git a/src/delivery.ts b/src/delivery.ts index b66c9c2..246e67c 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -3,12 +3,15 @@ * Polls active session DBs for undelivered messages_out, delivers through channel adapters. */ import Database from 'better-sqlite3'; +import fs from 'fs'; +import path from 'path'; import { getRunningSessions, getActiveSessions } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { log } from './log.js'; -import { openSessionDb, sessionDbPath } from './session-manager.js'; +import { openSessionDb, sessionDir } from './session-manager.js'; import { resetContainerIdleTimer } from './container-runner-v2.js'; +import type { OutboundFile } from './channels/adapter.js'; import type { Session } from './types-v2.js'; const ACTIVE_POLL_MS = 1000; @@ -21,6 +24,7 @@ export interface ChannelDeliveryAdapter { threadId: string | null, kind: string, content: string, + files?: OutboundFile[], ): Promise; setTyping?(channelType: string, platformId: string, threadId: string | null): Promise; } @@ -159,8 +163,29 @@ async function deliverMessage( return; } - await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content); - log.info('Message delivered', { id: msg.id, channelType: msg.channel_type, platformId: msg.platform_id }); + // Read file attachments from outbox if the content declares files + let files: OutboundFile[] | undefined; + const outboxDir = path.join(sessionDir(session.agent_group_id, session.id), 'outbox', msg.id); + if (Array.isArray(content.files) && content.files.length > 0 && fs.existsSync(outboxDir)) { + files = []; + for (const filename of content.files as string[]) { + const filePath = path.join(outboxDir, filename); + if (fs.existsSync(filePath)) { + files.push({ filename, data: fs.readFileSync(filePath) }); + } else { + log.warn('Outbox file not found', { messageId: msg.id, filename }); + } + } + if (files.length === 0) files = undefined; + } + + await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content, files); + log.info('Message delivered', { id: msg.id, channelType: msg.channel_type, platformId: msg.platform_id, fileCount: files?.length }); + + // Clean up outbox directory after successful delivery + if (fs.existsSync(outboxDir)) { + fs.rmSync(outboxDir, { recursive: true, force: true }); + } } export function stopDeliveryPolls(): void { diff --git a/src/host-sweep.ts b/src/host-sweep.ts index bcc4666..0c8ca41 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -58,7 +58,8 @@ async function sweepSession(session: Session): Promise { let db: Database.Database; try { db = new Database(dbPath); - db.pragma('journal_mode = WAL'); + db.pragma('journal_mode = DELETE'); + db.pragma('busy_timeout = 5000'); } catch { return; } @@ -125,10 +126,23 @@ async function sweepSession(session: Session): Promise { const nextRun = interval.next().toISOString(); const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + // Compute next seq from both tables (same pattern as session-manager.ts) + const nextSeq = ( + db + .prepare( + `SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM ( + SELECT seq FROM messages_in WHERE seq IS NOT NULL + UNION ALL + SELECT seq FROM messages_out WHERE seq IS NOT NULL + )`, + ) + .get() as { next: number } + ).next; + db.prepare( - `INSERT INTO messages_in (id, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) - VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, - ).run(newId, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content); + `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) + VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, + ).run(newId, nextSeq, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content); // Remove recurrence from the completed message so it doesn't spawn again db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id); diff --git a/src/index-v2.ts b/src/index-v2.ts index e4d6ec4..eca93f6 100644 --- a/src/index-v2.ts +++ b/src/index-v2.ts @@ -68,13 +68,13 @@ async function main(): Promise { // 4. Delivery adapter bridge — dispatches to channel adapters setDeliveryAdapter({ - async deliver(channelType, platformId, threadId, kind, content) { + async deliver(channelType, platformId, threadId, kind, content, files) { const adapter = getChannelAdapter(channelType); if (!adapter) { log.warn('No adapter for channel type', { channelType }); return; } - await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) }); + await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files }); }, async setTyping(channelType, platformId, threadId) { const adapter = getChannelAdapter(channelType);