diff --git a/container/agent-runner/src/mcp-tools/agents.ts b/container/agent-runner/src/mcp-tools/agents.ts index eadde41..b341b74 100644 --- a/container/agent-runner/src/mcp-tools/agents.ts +++ b/container/agent-runner/src/mcp-tools/agents.ts @@ -9,6 +9,7 @@ * (see mcp-tools/index.ts). The host re-checks permission on receive. */ import { writeMessageOut } from '../db/messages-out.js'; +import { registerTools } from './server.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -62,4 +63,4 @@ export const createAgent: McpToolDefinition = { }, }; -export const agentTools: McpToolDefinition[] = [createAgent]; +registerTools([createAgent]); diff --git a/container/agent-runner/src/mcp-tools/core.ts b/container/agent-runner/src/mcp-tools/core.ts index b805685..bf89ef8 100644 --- a/container/agent-runner/src/mcp-tools/core.ts +++ b/container/agent-runner/src/mcp-tools/core.ts @@ -12,6 +12,7 @@ import path from 'path'; import { findByName, getAllDestinations } from '../destinations.js'; import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js'; import { getSessionRouting } from '../db/session-routing.js'; +import { registerTools } from './server.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -258,4 +259,4 @@ export const addReaction: McpToolDefinition = { }, }; -export const coreTools: McpToolDefinition[] = [sendMessage, sendFile, editMessage, addReaction]; +registerTools([sendMessage, sendFile, editMessage, addReaction]); diff --git a/container/agent-runner/src/mcp-tools/index.ts b/container/agent-runner/src/mcp-tools/index.ts index 5551280..bdaef5c 100644 --- a/container/agent-runner/src/mcp-tools/index.ts +++ b/container/agent-runner/src/mcp-tools/index.ts @@ -1,59 +1,21 @@ /** - * MCP tools barrel — collects all tool modules and starts the server. + * MCP tools barrel — imports each tool module for its side-effect + * `registerTools([...])` call, then starts the MCP server. * - * Each module exports a McpToolDefinition[] array. This file registers - * them all with the MCP server. Adding a new tool module requires only - * importing it here and spreading its tools array. + * Adding a new tool module: create the file, call `registerTools([...])` + * at module scope, and append the import here. No central list. */ -import { Server } from '@modelcontextprotocol/sdk/server/index.js'; -import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; -import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js'; - -import type { McpToolDefinition } from './types.js'; -import { coreTools } from './core.js'; -import { schedulingTools } from './scheduling.js'; -import { interactiveTools } from './interactive.js'; -import { agentTools } from './agents.js'; -import { selfModTools } from './self-mod.js'; +import './core.js'; +import './scheduling.js'; +import './interactive.js'; +import './agents.js'; +import './self-mod.js'; +import { startMcpServer } from './server.js'; function log(msg: string): void { console.error(`[mcp-tools] ${msg}`); } -const allTools: McpToolDefinition[] = [ - ...coreTools, - ...schedulingTools, - ...interactiveTools, - ...agentTools, - ...selfModTools, -]; - -const toolMap = new Map(); -for (const t of allTools) { - toolMap.set(t.tool.name, t); -} - -async function startMcpServer(): Promise { - const server = new Server({ name: 'nanoclaw', version: '2.0.0' }, { capabilities: { tools: {} } }); - - server.setRequestHandler(ListToolsRequestSchema, async () => ({ - tools: allTools.map((t) => t.tool), - })); - - server.setRequestHandler(CallToolRequestSchema, async (request) => { - const { name, arguments: args } = request.params; - const tool = toolMap.get(name); - if (!tool) { - return { content: [{ type: 'text', text: `Unknown tool: ${name}` }] }; - } - return tool.handler(args ?? {}); - }); - - const transport = new StdioServerTransport(); - await server.connect(transport); - log(`MCP server started with ${allTools.length} tools: ${allTools.map((t) => t.tool.name).join(', ')}`); -} - startMcpServer().catch((err) => { log(`MCP server error: ${err instanceof Error ? err.message : String(err)}`); process.exit(1); diff --git a/container/agent-runner/src/mcp-tools/interactive.ts b/container/agent-runner/src/mcp-tools/interactive.ts index 82833c7..6924a9e 100644 --- a/container/agent-runner/src/mcp-tools/interactive.ts +++ b/container/agent-runner/src/mcp-tools/interactive.ts @@ -7,6 +7,7 @@ import { findQuestionResponse, markCompleted } from '../db/messages-in.js'; import { writeMessageOut } from '../db/messages-out.js'; import { getSessionRouting } from '../db/session-routing.js'; +import { registerTools } from './server.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -165,4 +166,4 @@ export const sendCard: McpToolDefinition = { }, }; -export const interactiveTools: McpToolDefinition[] = [askUserQuestion, sendCard]; +registerTools([askUserQuestion, sendCard]); diff --git a/container/agent-runner/src/mcp-tools/scheduling.ts b/container/agent-runner/src/mcp-tools/scheduling.ts index 1e362e2..168808c 100644 --- a/container/agent-runner/src/mcp-tools/scheduling.ts +++ b/container/agent-runner/src/mcp-tools/scheduling.ts @@ -8,6 +8,7 @@ import { getInboundDb } from '../db/connection.js'; import { writeMessageOut } from '../db/messages-out.js'; import { getSessionRouting } from '../db/session-routing.js'; +import { registerTools } from './server.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -265,4 +266,4 @@ export const updateTask: McpToolDefinition = { }, }; -export const schedulingTools: McpToolDefinition[] = [scheduleTask, listTasks, updateTask, cancelTask, pauseTask, resumeTask]; +registerTools([scheduleTask, listTasks, updateTask, cancelTask, pauseTask, resumeTask]); diff --git a/container/agent-runner/src/mcp-tools/self-mod.ts b/container/agent-runner/src/mcp-tools/self-mod.ts index 242ff78..775ec3b 100644 --- a/container/agent-runner/src/mcp-tools/self-mod.ts +++ b/container/agent-runner/src/mcp-tools/self-mod.ts @@ -9,6 +9,7 @@ * the host side (defense in depth). */ import { writeMessageOut } from '../db/messages-out.js'; +import { registerTools } from './server.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -140,4 +141,4 @@ export const requestRebuild: McpToolDefinition = { }, }; -export const selfModTools: McpToolDefinition[] = [installPackages, addMcpServer, requestRebuild]; +registerTools([installPackages, addMcpServer, requestRebuild]); diff --git a/container/agent-runner/src/mcp-tools/server.ts b/container/agent-runner/src/mcp-tools/server.ts new file mode 100644 index 0000000..3df45ed --- /dev/null +++ b/container/agent-runner/src/mcp-tools/server.ts @@ -0,0 +1,54 @@ +/** + * MCP server bootstrap + tool self-registration. + * + * Each tool module calls `registerTools([...])` at import time. The + * barrel (`index.ts`) imports every tool module for side effects, then + * calls `startMcpServer()` which uses whatever was registered. + * + * Default when only `core.ts` is imported: the core `send_message` / + * `send_file` / `edit_message` / `add_reaction` tools are available. + */ +import { Server } from '@modelcontextprotocol/sdk/server/index.js'; +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js'; + +import type { McpToolDefinition } from './types.js'; + +function log(msg: string): void { + console.error(`[mcp-tools] ${msg}`); +} + +const allTools: McpToolDefinition[] = []; +const toolMap = new Map(); + +export function registerTools(tools: McpToolDefinition[]): void { + for (const t of tools) { + if (toolMap.has(t.tool.name)) { + log(`Warning: tool "${t.tool.name}" already registered, skipping duplicate`); + continue; + } + allTools.push(t); + toolMap.set(t.tool.name, t); + } +} + +export async function startMcpServer(): Promise { + const server = new Server({ name: 'nanoclaw', version: '2.0.0' }, { capabilities: { tools: {} } }); + + server.setRequestHandler(ListToolsRequestSchema, async () => ({ + tools: allTools.map((t) => t.tool), + })); + + server.setRequestHandler(CallToolRequestSchema, async (request) => { + const { name, arguments: args } = request.params; + const tool = toolMap.get(name); + if (!tool) { + return { content: [{ type: 'text', text: `Unknown tool: ${name}` }] }; + } + return tool.handler(args ?? {}); + }); + + const transport = new StdioServerTransport(); + await server.connect(transport); + log(`MCP server started with ${allTools.length} tools: ${allTools.map((t) => t.tool.name).join(', ')}`); +} diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index d100218..eb06e89 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -156,11 +156,18 @@ export async function runPollLoop(config: PollLoopConfig): Promise { // Pre-task scripts: for any task rows with a `script`, run it before the // provider call. Scripts returning wakeAgent=false (or erroring) gate // their own task row only — surviving messages still go to the agent. + // + // MODULE-HOOK:scheduling-pre-task:start + // When scheduling is extracted (PR #4), `applyPreTaskScripts` moves + // to the scheduling module and the `/add-scheduling` skill replaces + // this block with a call to the module. Without scheduling installed, + // the block is empty (no script gating) and `keep = normalMessages`. const { keep, skipped } = await applyPreTaskScripts(normalMessages); if (skipped.length > 0) { markCompleted(skipped); log(`Pre-task script skipped ${skipped.length} task(s): ${skipped.join(', ')}`); } + // MODULE-HOOK:scheduling-pre-task:end if (keep.length === 0) { log(`All ${normalMessages.length} non-command message(s) gated by script, skipping query`); diff --git a/src/container-runner.ts b/src/container-runner.ts index 1cce1b3..18d7eb9 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -13,11 +13,12 @@ import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZO import { readContainerConfig, writeContainerConfig } from './container-config.js'; import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js'; import { getAgentGroup } from './db/agent-groups.js'; +import { getDb, hasTable } from './db/connection.js'; import { getAdminsOfAgentGroup, getGlobalAdmins, getOwners } from './db/user-roles.js'; import { initGroupFilesystem } from './group-init.js'; -import { stopTypingRefresh } from './delivery.js'; +import { stopTypingRefresh } from './modules/typing/index.js'; import { log } from './log.js'; -import { validateAdditionalMounts } from './mount-security.js'; +import { validateAdditionalMounts } from './modules/mount-security/index.js'; // Provider host-side config barrel — each provider that needs host-side // container setup self-registers on import. import './providers/index.js'; @@ -286,10 +287,16 @@ async function buildContainerArgs( // Users allowed to run admin commands (e.g. /clear) inside this container. // Computed at wake time: owners + global admins + admins scoped to this // agent group. Role changes take effect on next container spawn. + // + // Guarded: if the permissions module isn't installed, `user_roles` + // doesn't exist and the set stays empty — the formatter treats an + // empty admin set as permissionless (every sender is admin). const adminUserIds = new Set(); - for (const r of getOwners()) adminUserIds.add(r.user_id); - for (const r of getGlobalAdmins()) adminUserIds.add(r.user_id); - for (const r of getAdminsOfAgentGroup(agentGroup.id)) adminUserIds.add(r.user_id); + if (hasTable(getDb(), 'user_roles')) { + for (const r of getOwners()) adminUserIds.add(r.user_id); + for (const r of getGlobalAdmins()) adminUserIds.add(r.user_id); + for (const r of getAdminsOfAgentGroup(agentGroup.id)) adminUserIds.add(r.user_id); + } if (adminUserIds.size > 0) { args.push('-e', `NANOCLAW_ADMIN_USER_IDS=${Array.from(adminUserIds).join(',')}`); } diff --git a/src/db/connection.ts b/src/db/connection.ts index 6d13774..8a7a3c5 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -31,3 +31,18 @@ export function closeDb(): void { _db?.close(); _db = null; } + +/** + * Check whether a table exists. Used by core code that touches + * module-owned tables so that an uninstalled module degrades silently + * instead of raising SQLite errors. Cheap: a single indexed lookup on + * sqlite_master. Results are not cached — a module install adds the + * table at runtime (next service start), and callers may run before + * or after that boundary. + */ +export function hasTable(db: Database.Database, name: string): boolean { + const row = db.prepare(`SELECT 1 FROM sqlite_master WHERE type='table' AND name = ? LIMIT 1`).get(name) as + | { '1': number } + | undefined; + return row !== undefined; +} diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts index d8ee9ec..bb4d928 100644 --- a/src/db/migrations/index.ts +++ b/src/db/migrations/index.ts @@ -32,29 +32,34 @@ export function runMigrations(db: Database.Database): void { name TEXT NOT NULL, applied TEXT NOT NULL ); + CREATE UNIQUE INDEX IF NOT EXISTS idx_schema_version_name ON schema_version(name); `); - const currentVersion = - (db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number | null })?.v ?? 0; - - const pending = migrations.filter((m) => m.version > currentVersion); + // Uniqueness is keyed on `name`, not `version`. This lets module + // migrations (added later by install skills) pick arbitrary version + // numbers without coordinating across modules. `version` stays on + // the Migration object as an ordering hint within the barrel array; + // the stored `version` column is auto-assigned at insert time as an + // applied-order number. + const applied = new Set( + (db.prepare('SELECT name FROM schema_version').all() as { name: string }[]).map((r) => r.name), + ); + const pending = migrations.filter((m) => !applied.has(m.name)); if (pending.length === 0) return; - log.info('Running migrations', { - from: currentVersion, - to: pending[pending.length - 1].version, - count: pending.length, - }); + log.info('Running migrations', { count: pending.length }); for (const m of pending) { db.transaction(() => { m.up(db); + const next = + (db.prepare('SELECT COALESCE(MAX(version), 0) + 1 AS v FROM schema_version').get() as { v: number }).v; db.prepare('INSERT INTO schema_version (version, name, applied) VALUES (?, ?, ?)').run( - m.version, + next, m.name, new Date().toISOString(), ); })(); - log.info('Migration applied', { version: m.version, name: m.name }); + log.info('Migration applied', { name: m.name }); } } diff --git a/src/delivery.ts b/src/delivery.ts index df1f992..233ce90 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -21,6 +21,7 @@ import { } from './db/sessions.js'; import { getAgentGroup, createAgentGroup, updateAgentGroup, getAgentGroupByFolder } from './db/agent-groups.js'; import { createDestination, getDestinationByName, hasDestination, normalizeName } from './db/agent-destinations.js'; +import { getDb, hasTable } from './db/connection.js'; import { getMessagingGroup, getMessagingGroupByPlatform } from './db/messaging-groups.js'; import { pickApprovalDelivery, pickApprover } from './access.js'; import { @@ -38,18 +39,16 @@ import { import { log } from './log.js'; import { normalizeOptions, type RawOption } from './channels/ask-question.js'; import { - heartbeatPath, openInboundDb, openOutboundDb, sessionDir, - inboundDbPath, resolveSession, writeDestinations, writeSessionMessage, - writeSystemResponse, } from './session-manager.js'; import { resetContainerIdleTimer, wakeContainer } from './container-runner.js'; import { initGroupFilesystem } from './group-init.js'; +import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js'; import type { OutboundFile } from './channels/adapter.js'; import type { AgentGroup, Session } from './types.js'; @@ -93,6 +92,9 @@ let sweepPolling = false; export function setDeliveryAdapter(adapter: ChannelDeliveryAdapter): void { deliveryAdapter = adapter; + // Forward to the typing module so it can fire setTyping on its own + // interval. Direct call, not a registry — typing is a default module. + setTypingAdapter(adapter); } /** @@ -194,148 +196,6 @@ async function requestApproval( log.info('Approval requested', { action, approvalId, agentName, approver: target.userId }); } -/** Show typing indicator on a channel. Called when a message is routed to the agent. */ -export async function triggerTyping(channelType: string, platformId: string, threadId: string | null): Promise { - try { - await deliveryAdapter?.setTyping?.(channelType, platformId, threadId); - } catch { - // Typing is best-effort — don't fail routing if it errors - } -} - -// ── Typing refresh ── -// Most platforms expire a typing indicator after 5–10s, so a one-shot call -// on message arrival goes stale long before the agent finishes thinking. -// We keep it alive by re-firing setTyping on a short interval — but only -// while the agent is actually WORKING, not just while the container is -// alive. The agent-runner touches `heartbeat` on every SDK event, so we -// gate each tick on "is the heartbeat file fresh?". If it goes stale (agent -// finished its turn and is idle-polling), the refresh stops on its own -// without waiting for the container to exit. -// -// After delivering a user-facing message, the refresh is paused for -// POST_DELIVERY_PAUSE_MS — long enough for the client-side typing -// indicator to visually clear (Discord ~10s, Telegram ~5s). If the agent -// keeps touching heartbeat past the pause window, typing resumes -// naturally on the next refresh tick. -// -// `startTypingRefresh` is idempotent per session. `stopTypingRefresh` is -// called from container-runner.ts on container exit as a fast-path cleanup -// (the heartbeat-staleness path would catch it within one tick anyway). -const TYPING_REFRESH_MS = 4000; -// Grace window from startTypingRefresh: fire typing unconditionally for -// this long regardless of heartbeat state. Covers container spawn/wake -// latency, which can be 5–12s on a cold start before the first heartbeat -// touch lands. -const TYPING_GRACE_MS = 15000; -// After the grace window, a heartbeat must be mtimed within this many -// milliseconds of now to count as "agent is working." Heartbeats are -// touched on every SDK event (tool calls, result chunks), so during -// active work they land every few hundred ms. 6s is well above that -// while still being small enough to stop typing quickly when the agent -// goes idle. -const HEARTBEAT_FRESH_MS = 6000; -// After we deliver a user-facing message, pause typing for this long so -// the client-side indicator has time to visually clear. Tuned for the -// longest common client expiry (Discord ~10s). The interval stays -// running; ticks inside the pause just skip the setTyping call. -const POST_DELIVERY_PAUSE_MS = 10000; - -interface TypingTarget { - agentGroupId: string; - channelType: string; - platformId: string; - threadId: string | null; - interval: NodeJS.Timeout; - startedAt: number; - pausedUntil: number; // epoch ms; 0 = not paused -} - -const typingRefreshers = new Map(); - -function isHeartbeatFresh(agentGroupId: string, sessionId: string): boolean { - const hbPath = heartbeatPath(agentGroupId, sessionId); - try { - const stat = fs.statSync(hbPath); - return Date.now() - stat.mtimeMs < HEARTBEAT_FRESH_MS; - } catch { - return false; - } -} - -export function startTypingRefresh( - sessionId: string, - agentGroupId: string, - channelType: string, - platformId: string, - threadId: string | null, -): void { - const existing = typingRefreshers.get(sessionId); - if (existing) { - // Already refreshing. Fire an immediate tick for the new inbound - // event and reset the grace window — the new message restarts the - // container-wake latency budget. Also clear any lingering - // post-delivery pause: a new inbound means the user expects typing - // to show immediately. - triggerTyping(channelType, platformId, threadId).catch(() => {}); - existing.startedAt = Date.now(); - existing.pausedUntil = 0; - return; - } - - // Immediate tick + periodic refresh. - triggerTyping(channelType, platformId, threadId).catch(() => {}); - const startedAt = Date.now(); - const interval = setInterval(() => { - const entry = typingRefreshers.get(sessionId); - if (!entry) return; // stopped externally since this tick was scheduled - - // Inside a post-delivery pause: skip setTyping but keep the interval - // running so we resume automatically once the pause expires. - if (entry.pausedUntil > Date.now()) return; - - const withinGrace = Date.now() - entry.startedAt < TYPING_GRACE_MS; - if (withinGrace || isHeartbeatFresh(entry.agentGroupId, sessionId)) { - triggerTyping(entry.channelType, entry.platformId, entry.threadId).catch(() => {}); - return; - } - - // Out of grace AND heartbeat stale — agent is idle, stop refreshing. - clearInterval(entry.interval); - typingRefreshers.delete(sessionId); - }, TYPING_REFRESH_MS); - // unref so a stale refresher can't hold the event loop alive. - interval.unref(); - typingRefreshers.set(sessionId, { - agentGroupId, - channelType, - platformId, - threadId, - interval, - startedAt, - pausedUntil: 0, - }); -} - -/** - * Pause the typing refresh for POST_DELIVERY_PAUSE_MS. Called after a - * user-facing message is delivered so the client-side indicator has a - * chance to visually clear before the agent's next SDK event pushes it - * back on. No-op if no refresh is active for this session. - */ -export function pauseTypingRefreshAfterDelivery(sessionId: string): void { - const entry = typingRefreshers.get(sessionId); - if (!entry) return; - entry.pausedUntil = Date.now() + POST_DELIVERY_PAUSE_MS; -} - -export function stopTypingRefresh(sessionId: string): void { - const entry = typingRefreshers.get(sessionId); - if (!entry) return; - clearInterval(entry.interval); - typingRefreshers.delete(sessionId); -} - /** Start the active container poll loop (~1s). */ export function startActiveDeliveryPoll(): void { if (activePolling) return; @@ -553,15 +413,22 @@ async function deliverMessage( throw new Error(`unknown messaging group for ${msg.channel_type}/${msg.platform_id} (message ${msg.id})`); } const isOriginChat = session.messaging_group_id === mg.id; - if (!isOriginChat && !hasDestination(session.agent_group_id, 'channel', mg.id)) { + // Guarded: without the agent-to-agent module, `agent_destinations` + // doesn't exist and we permit all non-origin channel sends (the + // origin-chat case is always allowed regardless). + const checkDestinations = hasTable(getDb(), 'agent_destinations'); + if (!isOriginChat && checkDestinations && !hasDestination(session.agent_group_id, 'channel', mg.id)) { throw new Error( `unauthorized channel destination: ${session.agent_group_id} cannot send to ${mg.channel_type}/${mg.platform_id}`, ); } } - // Track pending questions for ask_user_question flow - if (content.type === 'ask_question' && content.questionId) { + // Track pending questions for ask_user_question flow. + // Guarded: without the interactive module, `pending_questions` doesn't + // exist and we skip persistence — the card still delivers to the user, + // but the response path has nowhere to land and will log unclaimed. + if (content.type === 'ask_question' && content.questionId && hasTable(getDb(), 'pending_questions')) { const title = content.title as string | undefined; const rawOptions = content.options as unknown; if (!title || !Array.isArray(rawOptions)) { @@ -637,6 +504,34 @@ async function deliverMessage( return platformMsgId; } +/** + * Delivery action registry. + * + * Modules register handlers for system-kind outbound message actions via + * `registerDeliveryAction`. Core checks the registry first in + * `handleSystemAction` and falls through to the inline switch when no + * handler is registered. The switch will shrink as modules are extracted + * (scheduling, approvals, agent-to-agent) and eventually only its default + * branch remains. + * + * Default when no handler registered and the switch doesn't match: log + * "Unknown system action" and return. + */ +export type DeliveryActionHandler = ( + content: Record, + session: Session, + inDb: Database.Database, +) => Promise; + +const actionHandlers = new Map(); + +export function registerDeliveryAction(action: string, handler: DeliveryActionHandler): void { + if (actionHandlers.has(action)) { + log.warn('Delivery action handler overwritten', { action }); + } + actionHandlers.set(action, handler); +} + /** * Handle system actions from the container agent. * These are written to messages_out because the container can't write to inbound.db. @@ -650,6 +545,12 @@ async function handleSystemAction( const action = content.action as string; log.info('System action from agent', { sessionId: session.id, action }); + const registered = actionHandlers.get(action); + if (registered) { + await registered(content, session, inDb); + return; + } + switch (action) { case 'schedule_task': { const taskId = content.taskId as string; diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 9e8f6b4..3958c8c 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -100,8 +100,14 @@ async function sweepSession(session: Session): Promise { detectStaleContainers(inDb, outDb, session, agentGroup.id); } - // 4. Handle recurrence for completed messages + // 4. Handle recurrence for completed messages. + // MODULE-HOOK:scheduling-recurrence:start + // When scheduling is extracted (PR #4), `handleRecurrence` moves to + // `src/modules/scheduling/` and the `/add-scheduling` skill replaces + // this block with a call to the module. Without scheduling + // installed, the block is empty and recurrence is a no-op. await handleRecurrence(inDb, session); + // MODULE-HOOK:scheduling-recurrence:end } finally { inDb.close(); outDb?.close(); diff --git a/src/index.ts b/src/index.ts index e2a1b9e..575c075 100644 --- a/src/index.ts +++ b/src/index.ts @@ -33,10 +33,55 @@ import { writeSessionMessage } from './session-manager.js'; import { wakeContainer, buildAgentGroupImage, killContainer } from './container-runner.js'; import { log } from './log.js'; +/** + * Response handler registry. + * + * Button-click / question responses arrive via the channel adapter's + * `onAction` callback. Core iterates registered handlers in registration + * order; the first one that returns `true` claims the response. + * Unclaimed responses fall through to the inline `handleQuestionResponse` + * below (which handles OneCLI credential approvals, pending_approvals, + * and pending_questions). As those modules are extracted, the inline + * function will shrink and the registry will own the full dispatch. + */ +export interface ResponsePayload { + questionId: string; + value: string; + userId: string | null; + channelType: string; + platformId: string; + threadId: string | null; +} + +export type ResponseHandler = (payload: ResponsePayload) => Promise; + +const responseHandlers: ResponseHandler[] = []; + +export function registerResponseHandler(handler: ResponseHandler): void { + responseHandlers.push(handler); +} + +async function dispatchResponse(payload: ResponsePayload): Promise { + for (const handler of responseHandlers) { + try { + const claimed = await handler(payload); + if (claimed) return; + } catch (err) { + log.error('Response handler threw', { questionId: payload.questionId, err }); + } + } + // Unclaimed — fall through to inline handler. + await handleQuestionResponse(payload.questionId, payload.value, payload.userId ?? ''); +} + // Channel barrel — each enabled channel self-registers on import. // Channel skills uncomment lines in channels/index.ts to enable them. import './channels/index.js'; +// Modules barrel — default modules (typing, mount-security) ship here; skills +// append registry-based modules. Imported for side effects (registrations). +import './modules/index.js'; + import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js'; import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js'; @@ -82,7 +127,18 @@ async function main(): Promise { }); }, onAction(questionId, selectedOption, userId) { - handleQuestionResponse(questionId, selectedOption, userId).catch((err) => { + dispatchResponse({ + questionId, + value: selectedOption, + userId, + channelType: adapter.channelType, + // platformId/threadId aren't surfaced by the current onAction + // signature — the inline fallback looks them up from the + // pending_question / pending_approval row. Registered handlers + // typically do the same. + platformId: '', + threadId: null, + }).catch((err) => { log.error('Failed to handle question response', { questionId, err }); }); }, diff --git a/src/modules/index.ts b/src/modules/index.ts new file mode 100644 index 0000000..7d8d298 --- /dev/null +++ b/src/modules/index.ts @@ -0,0 +1,16 @@ +/** + * Modules barrel. + * + * Each module self-registers at import time. This barrel is imported by + * src/index.ts for side effects (registry registrations, typing impl setup, + * etc.). Core runs with an empty barrel — the registries have inline + * fallbacks and `sqlite_master` guards. + * + * Default modules (ship with main, direct core import): + * - src/modules/typing/ → imported directly by router/delivery/container-runner + * - src/modules/mount-security/ → imported directly by container-runner + * + * Registry-based modules (installed via /add- skills, pulled from the + * `modules` branch): append imports below. + */ +export {}; diff --git a/src/mount-security.ts b/src/modules/mount-security/index.ts similarity index 99% rename from src/mount-security.ts rename to src/modules/mount-security/index.ts index aaf593b..3e113df 100644 --- a/src/mount-security.ts +++ b/src/modules/mount-security/index.ts @@ -9,8 +9,8 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; -import { MOUNT_ALLOWLIST_PATH } from './config.js'; -import { log } from './log.js'; +import { MOUNT_ALLOWLIST_PATH } from '../../config.js'; +import { log } from '../../log.js'; export interface AdditionalMount { hostPath: string; diff --git a/src/modules/typing/index.ts b/src/modules/typing/index.ts new file mode 100644 index 0000000..df8ff78 --- /dev/null +++ b/src/modules/typing/index.ts @@ -0,0 +1,165 @@ +/** + * Typing indicator refresh — default module. + * + * Most platforms expire a typing indicator after 5–10s, so a one-shot + * call on message arrival goes stale long before the agent finishes + * thinking. This module keeps it alive by re-firing `setTyping` on a + * short interval — but only while the agent is actually WORKING, gated + * on the heartbeat file's mtime after an initial grace period. + * + * After delivering a user-facing message, the refresh is paused for + * POST_DELIVERY_PAUSE_MS so the client-side indicator can visually + * clear. + * + * Default module status: + * - Lives in src/modules/ for signaling (not really core), but ships + * on main and is imported directly by core. No registry, no hook. + * - Removing requires editing src/router.ts, src/delivery.ts, and + * src/container-runner.ts to drop the calls. + */ +import fs from 'fs'; + +import { heartbeatPath } from '../../session-manager.js'; + +const TYPING_REFRESH_MS = 4000; +/** + * Grace window from startTypingRefresh: fire typing unconditionally + * for this long regardless of heartbeat state. Covers container + * spawn/wake latency (5–12s on cold start before first heartbeat). + */ +const TYPING_GRACE_MS = 15000; +/** + * After the grace window, a heartbeat must be mtimed within this + * many ms of now to count as "agent is working." Heartbeats land + * every few hundred ms during active work, so 6s is well above + * the working floor and small enough to stop typing quickly when + * the agent goes idle. + */ +const HEARTBEAT_FRESH_MS = 6000; +/** + * After we deliver a user-facing message, pause typing for this + * long so the client-side indicator has time to visually clear. + * Tuned for the longest common expiry (Discord ~10s). The interval + * stays running; ticks inside the pause just skip the setTyping call. + */ +const POST_DELIVERY_PAUSE_MS = 10000; + +interface TypingAdapter { + setTyping?(channelType: string, platformId: string, threadId: string | null): Promise; +} + +interface TypingTarget { + agentGroupId: string; + channelType: string; + platformId: string; + threadId: string | null; + interval: NodeJS.Timeout; + startedAt: number; + pausedUntil: number; // epoch ms; 0 = not paused +} + +let adapter: TypingAdapter | null = null; +const typingRefreshers = new Map(); + +/** + * Bind the typing module to the channel delivery adapter so it can + * call `setTyping`. Called once by `src/delivery.ts` inside + * `setDeliveryAdapter`. Passing a fresh adapter replaces the prior + * binding and leaves active refreshers in place (they'll use the + * new adapter on their next tick). + */ +export function setTypingAdapter(a: TypingAdapter): void { + adapter = a; +} + +async function triggerTyping(channelType: string, platformId: string, threadId: string | null): Promise { + try { + await adapter?.setTyping?.(channelType, platformId, threadId); + } catch { + // Typing is best-effort — don't let it fail delivery or routing. + } +} + +function isHeartbeatFresh(agentGroupId: string, sessionId: string): boolean { + const hbPath = heartbeatPath(agentGroupId, sessionId); + try { + const stat = fs.statSync(hbPath); + return Date.now() - stat.mtimeMs < HEARTBEAT_FRESH_MS; + } catch { + return false; + } +} + +export function startTypingRefresh( + sessionId: string, + agentGroupId: string, + channelType: string, + platformId: string, + threadId: string | null, +): void { + const existing = typingRefreshers.get(sessionId); + if (existing) { + // Already refreshing. Fire an immediate tick for the new inbound + // event and reset the grace window — the new message restarts + // the container-wake latency budget. Also clear any lingering + // post-delivery pause: a new inbound means the user expects + // typing to show immediately. + triggerTyping(channelType, platformId, threadId).catch(() => {}); + existing.startedAt = Date.now(); + existing.pausedUntil = 0; + return; + } + + // Immediate tick + periodic refresh. + triggerTyping(channelType, platformId, threadId).catch(() => {}); + const startedAt = Date.now(); + const interval = setInterval(() => { + const entry = typingRefreshers.get(sessionId); + if (!entry) return; // stopped externally since this tick was scheduled + + // Inside a post-delivery pause: skip setTyping but keep the + // interval running so we resume automatically once the pause + // expires. + if (entry.pausedUntil > Date.now()) return; + + const withinGrace = Date.now() - entry.startedAt < TYPING_GRACE_MS; + if (withinGrace || isHeartbeatFresh(entry.agentGroupId, sessionId)) { + triggerTyping(entry.channelType, entry.platformId, entry.threadId).catch(() => {}); + return; + } + + // Out of grace AND heartbeat stale — agent is idle, stop refreshing. + clearInterval(entry.interval); + typingRefreshers.delete(sessionId); + }, TYPING_REFRESH_MS); + // unref so a stale refresher can't hold the event loop alive. + interval.unref(); + typingRefreshers.set(sessionId, { + agentGroupId, + channelType, + platformId, + threadId, + interval, + startedAt, + pausedUntil: 0, + }); +} + +/** + * Pause the typing refresh for POST_DELIVERY_PAUSE_MS. Called after + * a user-facing message is delivered so the client-side indicator + * has a chance to visually clear before the agent's next SDK event + * pushes it back on. No-op if no refresh is active for this session. + */ +export function pauseTypingRefreshAfterDelivery(sessionId: string): void { + const entry = typingRefreshers.get(sessionId); + if (!entry) return; + entry.pausedUntil = Date.now() + POST_DELIVERY_PAUSE_MS; +} + +export function stopTypingRefresh(sessionId: string): void { + const entry = typingRefreshers.get(sessionId); + if (!entry) return; + clearInterval(entry.interval); + typingRefreshers.delete(sessionId); +} diff --git a/src/router.ts b/src/router.ts index 67d0cb4..b1ef684 100644 --- a/src/router.ts +++ b/src/router.ts @@ -21,7 +21,7 @@ import { getChannelAdapter } from './channels/channel-registry.js'; import { isMember } from './db/agent-group-members.js'; import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js'; import { upsertUser, getUser } from './db/users.js'; -import { startTypingRefresh } from './delivery.js'; +import { startTypingRefresh } from './modules/typing/index.js'; import { log } from './log.js'; import { resolveSession, writeSessionMessage } from './session-manager.js'; import { wakeContainer } from './container-runner.js'; @@ -45,6 +45,34 @@ export interface InboundEvent { }; } +/** + * Inbound gate registry. + * + * A module (permissions, today) can register a single gate function that + * owns sender resolution + access decision. Without a registered gate, + * core falls back to the inline `extractAndUpsertUser` + + * `enforceAccess` + `handleUnknownSender` chain. + * + * Takes the raw event so the gate can read sender fields from + * `event.message.content`. Returns either allowed=true with a `userId` + * (null if unresolved) or allowed=false with a reason; core drops the + * message on refusal. + */ +export type InboundGateResult = + | { allowed: true; userId: string | null } + | { allowed: false; userId: string | null; reason: string }; + +export type InboundGateFn = (event: InboundEvent, mg: MessagingGroup, agentGroupId: string) => InboundGateResult; + +let inboundGate: InboundGateFn | null = null; + +export function setInboundGate(fn: InboundGateFn): void { + if (inboundGate) { + log.warn('Inbound gate overwritten'); + } + inboundGate = fn; +} + /** * Route an inbound message from a channel adapter to the correct session. * Creates messaging group + session if they don't exist yet. @@ -81,13 +109,8 @@ export async function routeInbound(event: InboundEvent): Promise { }); } - // 2. Resolve sender → user id. Upsert into users table on first sight so - // subsequent messages find an existing row. `userId` is null if the - // adapter didn't give us enough to identify a sender (the gate will - // then apply unknown_sender_policy). - const userId = extractAndUpsertUser(event); - - // 3. Resolve agent groups wired to this messaging group + // 2. Resolve agent groups wired to this messaging group. (The gate runs + // after this so it can decide based on the target agent group.) const agents = getMessagingGroupAgents(mg.id); if (agents.length === 0) { log.warn('MESSAGE DROPPED — no agent groups wired to this channel. Run setup register step to configure.', { @@ -128,13 +151,31 @@ export async function routeInbound(event: InboundEvent): Promise { return; } - // 4. Access gate. Public channels skip the gate entirely. - if (mg.unknown_sender_policy !== 'public') { - const gate = enforceAccess(userId, match.agent_group_id); - if (!gate.allowed) { - handleUnknownSender(mg, userId, match.agent_group_id, gate.reason, event); + // 3. Inbound gate: sender resolution + access decision. If a module + // registered a gate, it owns the whole thing (it can upsert users, + // check roles, etc.). Otherwise fall back to the inline chain. + let userId: string | null; + if (inboundGate) { + const result = inboundGate(event, mg, match.agent_group_id); + userId = result.userId; + if (!result.allowed) { + log.info('MESSAGE DROPPED — inbound gate refused', { + messagingGroupId: mg.id, + agentGroupId: match.agent_group_id, + userId, + reason: result.reason, + }); return; } + } else { + userId = extractAndUpsertUser(event); + if (mg.unknown_sender_policy !== 'public') { + const gate = enforceAccess(userId, match.agent_group_id); + if (!gate.allowed) { + handleUnknownSender(mg, userId, match.agent_group_id, gate.reason, event); + return; + } + } } // 5. Resolve or create session. diff --git a/src/session-manager.ts b/src/session-manager.ts index 0f8c8a3..1c83d80 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -17,6 +17,7 @@ import path from 'path'; import { DATA_DIR } from './config.js'; import { getAgentGroup } from './db/agent-groups.js'; import { getDestinations } from './db/agent-destinations.js'; +import { getDb, hasTable } from './db/connection.js'; import { getMessagingGroup } from './db/messaging-groups.js'; import { createSession, findSession, findSessionByAgentGroup, getSession, updateSession } from './db/sessions.js'; import { @@ -183,6 +184,11 @@ export function writeDestinations(agentGroupId: string, sessionId: string): void const dbPath = inboundDbPath(agentGroupId, sessionId); if (!fs.existsSync(dbPath)) return; + // Guarded: when the agent-to-agent module isn't installed, the + // `agent_destinations` table doesn't exist. Skip silently — core + // container spawn continues without projecting destinations. + if (!hasTable(getDb(), 'agent_destinations')) return; + const rows = getDestinations(agentGroupId); const resolved: DestinationRow[] = [];