diff --git a/container/agent-runner/src/db/index.ts b/container/agent-runner/src/db/index.ts index cbd0e7e..f7ebc06 100644 --- a/container/agent-runner/src/db/index.ts +++ b/container/agent-runner/src/db/index.ts @@ -7,7 +7,15 @@ export { touchHeartbeat, clearStaleProcessingAcks, } from './connection.js'; -export { getPendingMessages, markProcessing, markCompleted, markFailed, getMessageIn, findQuestionResponse } from './messages-in.js'; +export { + getPendingMessages, + markProcessing, + markCompleted, + markFailed, + getMessageIn, + findQuestionResponse, + findCredentialResponse, +} from './messages-in.js'; export type { MessageInRow } from './messages-in.js'; export { writeMessageOut, getUndeliveredMessages } from './messages-out.js'; export type { MessageOutRow, WriteMessageOut } from './messages-out.js'; diff --git a/container/agent-runner/src/db/messages-in.ts b/container/agent-runner/src/db/messages-in.ts index fe2a222..b3e713d 100644 --- a/container/agent-runner/src/db/messages-in.ts +++ b/container/agent-runner/src/db/messages-in.ts @@ -112,3 +112,20 @@ export function findQuestionResponse(questionId: string): MessageInRow | undefin return response; } + +/** Find a pending credential_response system message for a given credential id. */ +export function findCredentialResponse(credentialId: string): MessageInRow | undefined { + const inbound = getInboundDb(); + const outbound = getOutboundDb(); + + const response = inbound + .prepare("SELECT * FROM messages_in WHERE status = 'pending' AND kind = 'system' AND content LIKE ?") + .get(`%"credentialId":"${credentialId}"%`) as MessageInRow | undefined; + + if (!response) return undefined; + + const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id); + if (acked) return undefined; + + return response; +} diff --git a/container/agent-runner/src/db/session-routing.ts b/container/agent-runner/src/db/session-routing.ts new file mode 100644 index 0000000..94abca6 --- /dev/null +++ b/container/agent-runner/src/db/session-routing.ts @@ -0,0 +1,30 @@ +/** + * Default reply routing for this session — written by the host on every + * container wake (see src/session-manager.ts `writeSessionRouting`). + * + * Read by the MCP tools as the default destination for outbound messages + * when the agent doesn't specify an explicit `to`. This is what makes + * "agent replies in the thread it's currently in" work: the router strips + * or preserves thread_id based on the adapter's thread support, and we + * just read the fixed routing the host committed for this session. + */ +import { getInboundDb } from './connection.js'; + +export interface SessionRouting { + channel_type: string | null; + platform_id: string | null; + thread_id: string | null; +} + +export function getSessionRouting(): SessionRouting { + const db = getInboundDb(); + try { + const row = db + .prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1') + .get() as SessionRouting | undefined; + if (row) return row; + } catch { + // Table may not exist on an older session DB — fall through to defaults + } + return { channel_type: null, platform_id: null, thread_id: null }; +} diff --git a/container/agent-runner/src/mcp-tools/core.ts b/container/agent-runner/src/mcp-tools/core.ts index 0180b72..cef0d6c 100644 --- a/container/agent-runner/src/mcp-tools/core.ts +++ b/container/agent-runner/src/mcp-tools/core.ts @@ -11,6 +11,7 @@ import path from 'path'; import { findByName, getAllDestinations } from '../destinations.js'; import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js'; +import { getSessionRouting } from '../db/session-routing.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -37,14 +38,31 @@ function destinationList(): string { /** * Resolve a destination name to routing fields. - * If `to` is omitted and the agent has exactly one destination, that one is used. - * With multiple destinations, omitting `to` is an error. + * + * If `to` is omitted, use the session's default reply routing (channel + + * thread the conversation is in) — the agent replies in place. + * + * If `to` is specified, look up the named destination; thread_id is null + * because a cross-destination send starts a new conversation elsewhere. */ function resolveRouting( to: string | undefined, -): { channel_type: string; platform_id: string; resolvedName: string } | { error: string } { - let name = to; - if (!name) { +): + | { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } + | { error: string } { + if (!to) { + // Default: reply to whatever thread/channel this session is bound to. + const session = getSessionRouting(); + if (session.channel_type && session.platform_id) { + return { + channel_type: session.channel_type, + platform_id: session.platform_id, + thread_id: session.thread_id, + resolvedName: '(current conversation)', + }; + } + // No session routing (e.g., agent-shared or internal-only agent) — + // fall back to the legacy single-destination shortcut. const all = getAllDestinations(); if (all.length === 0) return { error: 'No destinations configured.' }; if (all.length > 1) { @@ -52,14 +70,19 @@ function resolveRouting( error: `You have multiple destinations — specify "to". Options: ${all.map((d) => d.name).join(', ')}`, }; } - name = all[0].name; + to = all[0].name; } - const dest = findByName(name); - if (!dest) return { error: `Unknown destination "${name}". Known: ${destinationList()}` }; + const dest = findByName(to); + if (!dest) return { error: `Unknown destination "${to}". Known: ${destinationList()}` }; if (dest.type === 'channel') { - return { channel_type: dest.channelType!, platform_id: dest.platformId!, resolvedName: name }; + return { + channel_type: dest.channelType!, + platform_id: dest.platformId!, + thread_id: null, + resolvedName: to, + }; } - return { channel_type: 'agent', platform_id: dest.agentGroupId!, resolvedName: name }; + return { channel_type: 'agent', platform_id: dest.agentGroupId!, thread_id: null, resolvedName: to }; } export const sendMessage: McpToolDefinition = { @@ -89,7 +112,7 @@ export const sendMessage: McpToolDefinition = { kind: 'chat', platform_id: routing.platform_id, channel_type: routing.channel_type, - thread_id: null, + thread_id: routing.thread_id, content: JSON.stringify({ text }), }); @@ -135,7 +158,7 @@ export const sendFile: McpToolDefinition = { kind: 'chat', platform_id: routing.platform_id, channel_type: routing.channel_type, - thread_id: null, + thread_id: routing.thread_id, content: JSON.stringify({ text: (args.text as string) || '', files: [filename] }), }); diff --git a/container/agent-runner/src/mcp-tools/credentials.ts b/container/agent-runner/src/mcp-tools/credentials.ts new file mode 100644 index 0000000..6a68f01 --- /dev/null +++ b/container/agent-runner/src/mcp-tools/credentials.ts @@ -0,0 +1,132 @@ +/** + * Credential collection MCP tool. + * + * trigger_credential_collection sends a card to the user and blocks until the + * host reports back whether the credential was saved, rejected, or failed. + * The credential value NEVER enters agent context — the user submits it into + * a modal whose value is consumed entirely on the host side, and the host + * only writes back a status string. + */ +import { findCredentialResponse, markCompleted } from '../db/messages-in.js'; +import { writeMessageOut } from '../db/messages-out.js'; +import type { McpToolDefinition } from './types.js'; + +function log(msg: string): void { + console.error(`[mcp-tools] ${msg}`); +} + +function generateId(): string { + return `cred-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +function ok(text: string) { + return { content: [{ type: 'text' as const, text }] }; +} + +function err(text: string) { + return { content: [{ type: 'text' as const, text: `Error: ${text}` }], isError: true }; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export const triggerCredentialCollection: McpToolDefinition = { + tool: { + name: 'trigger_credential_collection', + description: + 'Collect a credential (API key, token, etc.) from the user for a third-party service. Research the service first so you can pass the correct host pattern, header name, and value format. A card is sent to the user with a button that opens a secure input modal — the value is inserted directly into OneCLI and never enters your context. Blocks until the user saves, rejects, or the request fails.', + inputSchema: { + type: 'object' as const, + properties: { + name: { + type: 'string', + description: 'Display name for the secret (e.g. "Resend API Key").', + }, + type: { + type: 'string', + enum: ['generic', 'anthropic'], + description: "Secret type. Use 'generic' for most third-party APIs; 'anthropic' is reserved for Anthropic API keys.", + }, + hostPattern: { + type: 'string', + description: 'Host pattern to match (e.g. "api.resend.com"). Used by OneCLI to know when to inject this credential.', + }, + pathPattern: { + type: 'string', + description: 'Optional path pattern to match (e.g. "/v1/*").', + }, + headerName: { + type: 'string', + description: 'Header name to inject the credential into (e.g. "Authorization"). Required for generic type.', + }, + valueFormat: { + type: 'string', + description: 'Value format template. Use {value} as the placeholder. Example: "Bearer {value}". Defaults to "{value}".', + }, + description: { + type: 'string', + description: 'User-facing explanation shown on the card and in the input modal.', + }, + timeout: { + type: 'number', + description: 'Timeout in seconds (default: 600).', + }, + }, + required: ['name', 'hostPattern'], + }, + }, + async handler(args) { + const name = args.name as string; + const type = ((args.type as string) || 'generic') as 'generic' | 'anthropic'; + const hostPattern = args.hostPattern as string; + const pathPattern = (args.pathPattern as string) || ''; + const headerName = (args.headerName as string) || ''; + const valueFormat = (args.valueFormat as string) || ''; + const description = (args.description as string) || ''; + const timeoutMs = ((args.timeout as number) || 600) * 1000; + + if (!name || !hostPattern) return err('name and hostPattern are required'); + + const credentialId = generateId(); + writeMessageOut({ + id: credentialId, + kind: 'system', + content: JSON.stringify({ + action: 'request_credential', + credentialId, + name, + type, + hostPattern, + pathPattern, + headerName, + valueFormat, + description, + }), + }); + + log(`trigger_credential_collection: ${credentialId} → ${name} (${hostPattern})`); + + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const response = findCredentialResponse(credentialId); + if (response) { + const parsed = JSON.parse(response.content) as { + status: 'saved' | 'rejected' | 'failed'; + detail?: string; + }; + markCompleted([response.id]); + log(`trigger_credential_collection result: ${credentialId} → ${parsed.status}`); + if (parsed.status === 'saved') return ok(parsed.detail || 'Credential saved.'); + if (parsed.status === 'rejected') return err(parsed.detail || 'Credential request rejected.'); + return err(parsed.detail || 'Credential request failed.'); + } + await sleep(1000); + } + + log(`trigger_credential_collection timeout: ${credentialId}`); + return err(`Credential request timed out after ${timeoutMs / 1000}s`); + }, +}; + +export const credentialTools: McpToolDefinition[] = [triggerCredentialCollection]; diff --git a/container/agent-runner/src/mcp-tools/index.ts b/container/agent-runner/src/mcp-tools/index.ts index b1e7bbd..fb427b5 100644 --- a/container/agent-runner/src/mcp-tools/index.ts +++ b/container/agent-runner/src/mcp-tools/index.ts @@ -15,6 +15,7 @@ import { schedulingTools } from './scheduling.js'; import { interactiveTools } from './interactive.js'; import { agentTools } from './agents.js'; import { selfModTools } from './self-mod.js'; +import { credentialTools } from './credentials.js'; function log(msg: string): void { console.error(`[mcp-tools] ${msg}`); @@ -32,6 +33,7 @@ const allTools: McpToolDefinition[] = [ ...interactiveTools, ...conditionalAgentTools, ...selfModTools, + ...credentialTools, ]; const toolMap = new Map(); diff --git a/container/agent-runner/src/mcp-tools/interactive.ts b/container/agent-runner/src/mcp-tools/interactive.ts index f726876..330c50c 100644 --- a/container/agent-runner/src/mcp-tools/interactive.ts +++ b/container/agent-runner/src/mcp-tools/interactive.ts @@ -6,6 +6,7 @@ */ import { findQuestionResponse, markCompleted } from '../db/messages-in.js'; import { writeMessageOut } from '../db/messages-out.js'; +import { getSessionRouting } from '../db/session-routing.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -17,11 +18,7 @@ function generateId(): string { } function routing() { - return { - platform_id: process.env.NANOCLAW_PLATFORM_ID || null, - channel_type: process.env.NANOCLAW_CHANNEL_TYPE || null, - thread_id: process.env.NANOCLAW_THREAD_ID || null, - }; + return getSessionRouting(); } function ok(text: string) { diff --git a/container/agent-runner/src/mcp-tools/scheduling.ts b/container/agent-runner/src/mcp-tools/scheduling.ts index be3b576..6d32e88 100644 --- a/container/agent-runner/src/mcp-tools/scheduling.ts +++ b/container/agent-runner/src/mcp-tools/scheduling.ts @@ -7,6 +7,7 @@ */ import { getInboundDb } from '../db/connection.js'; import { writeMessageOut } from '../db/messages-out.js'; +import { getSessionRouting } from '../db/session-routing.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -18,11 +19,7 @@ function generateId(): string { } function routing() { - return { - platform_id: process.env.NANOCLAW_PLATFORM_ID || null, - channel_type: process.env.NANOCLAW_CHANNEL_TYPE || null, - thread_id: process.env.NANOCLAW_THREAD_ID || null, - }; + return getSessionRouting(); } function ok(text: string) { diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 52b3839..208c89a 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -387,13 +387,16 @@ function dispatchResultText(text: string, routing: RoutingContext): void { function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void { const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!; const channelType = dest.type === 'channel' ? dest.channelType! : 'agent'; + // Inherit thread_id from the inbound routing context so replies land in the + // same thread the conversation is in. For non-threaded adapters the router + // strips thread_id at ingest, so this will already be null. writeMessageOut({ id: generateId(), in_reply_to: routing.inReplyTo, kind: 'chat', platform_id: platformId, channel_type: channelType, - thread_id: null, + thread_id: routing.threadId, content: JSON.stringify({ text: body }), }); } diff --git a/package-lock.json b/package-lock.json index 6a1e28c..bd9276d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,7 +19,7 @@ "@chat-adapter/teams": "^4.24.0", "@chat-adapter/telegram": "^4.24.0", "@chat-adapter/whatsapp": "^4.24.0", - "@onecli-sh/sdk": "^0.2.0", + "@onecli-sh/sdk": "^0.3.1", "@resend/chat-sdk-adapter": "^0.1.1", "better-sqlite3": "11.10.0", "chat": "^4.24.0", @@ -1881,9 +1881,10 @@ } }, "node_modules/@onecli-sh/sdk": { - "version": "0.2.0", - "resolved": "https://registry.npmjs.org/@onecli-sh/sdk/-/sdk-0.2.0.tgz", - "integrity": "sha512-u7PqWROEvTV9f0ADVkjigTrd2AZn3klbPrv7GGpeRHIJpjAxJUdlWqxr5kiGt6qTDKL8t3nq76xr4X2pxTiyBg==", + "version": "0.3.1", + "resolved": "https://registry.npmjs.org/@onecli-sh/sdk/-/sdk-0.3.1.tgz", + "integrity": "sha512-oMSa4DUCVS52vec41nFOg3XdCBTbMVEZdCFCsaUd9sRXVorCPWd3VyZq4giXsmk4g09DA/zLjsnrY7l6G94Ulg==", + "license": "MIT", "engines": { "node": ">=20" } diff --git a/package.json b/package.json index 1997774..c63213c 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "@chat-adapter/teams": "^4.24.0", "@chat-adapter/telegram": "^4.24.0", "@chat-adapter/whatsapp": "^4.24.0", - "@onecli-sh/sdk": "^0.2.0", + "@onecli-sh/sdk": "^0.3.1", "@resend/chat-sdk-adapter": "^0.1.1", "better-sqlite3": "11.10.0", "chat": "^4.24.0", diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index d02f62c..00e942d 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -27,6 +27,12 @@ export interface ChannelSetup { /** Called when a user clicks a button/action in a card (e.g., ask_user_question response). */ onAction(questionId: string, selectedOption: string, userId: string): void; + + /** Credential collection hooks — used by chat-sdk-bridge to route the modal flow. */ + getCredentialForModal?(credentialId: string): { name: string; description: string | null; hostPattern: string } | null; + onCredentialReject?(credentialId: string): void; + onCredentialSubmit?(credentialId: string, value: string): void; + onCredentialChannelUnsupported?(credentialId: string): void; } /** Inbound message from adapter to host. */ @@ -62,6 +68,18 @@ export interface ChannelAdapter { name: string; channelType: string; + /** + * Whether this adapter models conversations as threads. + * + * true — adapter's platform uses threads as the primary conversation unit + * (Discord, Slack, Linear, GitHub). One thread = one session; the + * agent replies into the originating thread. + * false — adapter's platform treats the channel itself as the conversation + * (Telegram, WhatsApp, iMessage). Thread ids are stripped at the + * router; agent replies go to the channel. + */ + supportsThreads: boolean; + // Lifecycle setup(config: ChannelSetup): Promise; teardown(): Promise; diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index fafb565..b773162 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -39,6 +39,7 @@ function createMockAdapter( return { name: channelType, channelType, + supportsThreads: false, delivered, inbound, diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 9f8f9d2..ab49adf 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -12,6 +12,8 @@ import { CardText, Actions, Button, + Modal, + TextInput, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage, @@ -47,6 +49,13 @@ export interface ChatSdkBridgeConfig { botToken?: string; /** Platform-specific reply context extraction. */ extractReplyContext?: ReplyContextExtractor; + /** + * Whether this platform uses threads as the primary conversation unit. + * See `ChannelAdapter.supportsThreads`. Declared by the calling channel + * skill, not inferred, because some platforms (Discord) can be used either + * way and the default depends on installation style. + */ + supportsThreads: boolean; } export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter { @@ -116,6 +125,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter return { name: adapter.name, channelType: adapter.name, + supportsThreads: config.supportsThreads, async setup(hostConfig: ChannelSetup) { setupConfig = hostConfig; @@ -151,8 +161,75 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter await thread.subscribe(); }); - // Handle button clicks (ask_user_question responses) + // Handle button clicks (ask_user_question, credential card) chat.onAction(async (event) => { + // Credential card actions: nccr:: + if (event.actionId.startsWith('nccr:')) { + const [, credentialId, subAction] = event.actionId.split(':'); + if (!credentialId || !subAction) return; + + if (subAction === 'reject') { + try { + await adapter.editMessage(event.threadId, event.messageId, { + markdown: `🔑 Credential request\n\n❌ Rejected`, + }); + } catch (err) { + log.warn('Failed to update credential card after reject', { err }); + } + setupConfig.onCredentialReject?.(credentialId); + return; + } + + if (subAction === 'enter') { + const pending = setupConfig.getCredentialForModal?.(credentialId); + if (!pending) { + log.warn('Credential card clicked but row not pending', { credentialId }); + return; + } + try { + const modalChildren = [ + CardText( + pending.description ?? + `Enter the value for ${pending.name} (host: ${pending.hostPattern}).`, + ), + TextInput({ + id: 'value', + label: pending.name, + placeholder: 'Paste your credential value', + }), + ]; + // Modal children include a text element for context; the SDK + // accepts TextElement in ModalChild so this is valid. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const modal = Modal({ + callbackId: `nccm:${credentialId}`, + title: 'Enter credential', + submitLabel: 'Save', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + children: modalChildren as any, + }); + const result = await event.openModal(modal); + if (!result) { + log.warn('openModal returned undefined — channel unsupported', { credentialId }); + setupConfig.onCredentialChannelUnsupported?.(credentialId); + try { + await adapter.editMessage(event.threadId, event.messageId, { + markdown: `🔑 Credential request\n\n⚠️ This channel does not support modals.`, + }); + } catch { + // best effort + } + } + } catch (err) { + log.error('Failed to open credential modal', { credentialId, err }); + setupConfig.onCredentialChannelUnsupported?.(credentialId); + } + return; + } + + return; + } + if (!event.actionId.startsWith('ncq:')) return; const parts = event.actionId.split(':'); if (parts.length < 3) return; @@ -173,6 +250,18 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter setupConfig.onAction(questionId, selectedOption, userId); }); + // Modal submissions for credential collection + chat.onModalSubmit(async (event) => { + if (!event.callbackId.startsWith('nccm:')) return; + const credentialId = event.callbackId.slice('nccm:'.length); + const value = event.values?.value ?? ''; + if (!value) { + log.warn('Credential modal submitted with empty value', { credentialId }); + return; + } + setupConfig.onCredentialSubmit?.(credentialId, value); + }); + await chat.initialize(); // Start Gateway listener for adapters that support it (e.g., Discord) @@ -259,6 +348,26 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter return result?.id; } + // Credential request card — buttons open a modal for secure input + if (content.type === 'credential_request' && content.credentialId) { + const credentialId = content.credentialId as string; + const card = Card({ + title: '🔑 Credential request', + children: [ + CardText(content.question as string), + Actions([ + Button({ id: `nccr:${credentialId}:enter`, label: 'Enter credential', value: 'enter' }), + Button({ id: `nccr:${credentialId}:reject`, label: 'Reject', value: 'reject' }), + ]), + ], + }); + const result = await adapter.postMessage(tid, { + card, + fallbackText: `Credential request — open in a channel that supports modals.`, + }); + return result?.id; + } + // Normal message const text = (content.markdown as string) || (content.text as string); if (text) { diff --git a/src/channels/discord.ts b/src/channels/discord.ts index d23a1e2..6d87634 100644 --- a/src/channels/discord.ts +++ b/src/channels/discord.ts @@ -32,6 +32,7 @@ registerChannelAdapter('discord', { concurrency: 'concurrent', botToken: env.DISCORD_BOT_TOKEN, extractReplyContext, + supportsThreads: true, }); }, }); diff --git a/src/channels/gchat.ts b/src/channels/gchat.ts index 48376f2..98fc539 100644 --- a/src/channels/gchat.ts +++ b/src/channels/gchat.ts @@ -15,6 +15,6 @@ registerChannelAdapter('gchat', { const gchatAdapter = createGoogleChatAdapter({ credentials: JSON.parse(env.GCHAT_CREDENTIALS), }); - return createChatSdkBridge({ adapter: gchatAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: gchatAdapter, concurrency: 'concurrent', supportsThreads: true }); }, }); diff --git a/src/channels/github.ts b/src/channels/github.ts index 19b90d2..d1fe42c 100644 --- a/src/channels/github.ts +++ b/src/channels/github.ts @@ -17,6 +17,6 @@ registerChannelAdapter('github', { token: env.GITHUB_TOKEN, webhookSecret: env.GITHUB_WEBHOOK_SECRET, }); - return createChatSdkBridge({ adapter: githubAdapter, concurrency: 'queue' }); + return createChatSdkBridge({ adapter: githubAdapter, concurrency: 'queue', supportsThreads: true }); }, }); diff --git a/src/channels/imessage.ts b/src/channels/imessage.ts index 4bda288..1ffba36 100644 --- a/src/channels/imessage.ts +++ b/src/channels/imessage.ts @@ -24,6 +24,6 @@ registerChannelAdapter('imessage', { const imessageAdapter = Object.assign(rawAdapter, { channelIdFromThreadId: (threadId: string) => threadId, }); - return createChatSdkBridge({ adapter: imessageAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: imessageAdapter, concurrency: 'concurrent', supportsThreads: false }); }, }); diff --git a/src/channels/linear.ts b/src/channels/linear.ts index 11014f8..6436adf 100644 --- a/src/channels/linear.ts +++ b/src/channels/linear.ts @@ -17,6 +17,6 @@ registerChannelAdapter('linear', { apiKey: env.LINEAR_API_KEY, webhookSecret: env.LINEAR_WEBHOOK_SECRET, }); - return createChatSdkBridge({ adapter: linearAdapter, concurrency: 'queue' }); + return createChatSdkBridge({ adapter: linearAdapter, concurrency: 'queue', supportsThreads: true }); }, }); diff --git a/src/channels/matrix.ts b/src/channels/matrix.ts index a286fda..f84278f 100644 --- a/src/channels/matrix.ts +++ b/src/channels/matrix.ts @@ -18,6 +18,6 @@ registerChannelAdapter('matrix', { if (env.MATRIX_USER_ID) process.env.MATRIX_USER_ID = env.MATRIX_USER_ID; if (env.MATRIX_BOT_USERNAME) process.env.MATRIX_BOT_USERNAME = env.MATRIX_BOT_USERNAME; const matrixAdapter = createMatrixAdapter(); - return createChatSdkBridge({ adapter: matrixAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: matrixAdapter, concurrency: 'concurrent', supportsThreads: false }); }, }); diff --git a/src/channels/resend.ts b/src/channels/resend.ts index 5dfe5ab..5a4565b 100644 --- a/src/channels/resend.ts +++ b/src/channels/resend.ts @@ -18,6 +18,6 @@ registerChannelAdapter('resend', { fromName: env.RESEND_FROM_NAME, webhookSecret: env.RESEND_WEBHOOK_SECRET, }); - return createChatSdkBridge({ adapter: resendAdapter, concurrency: 'queue' }); + return createChatSdkBridge({ adapter: resendAdapter, concurrency: 'queue', supportsThreads: false }); }, }); diff --git a/src/channels/slack.ts b/src/channels/slack.ts index 1413c05..6ee33db 100644 --- a/src/channels/slack.ts +++ b/src/channels/slack.ts @@ -16,6 +16,6 @@ registerChannelAdapter('slack', { botToken: env.SLACK_BOT_TOKEN, signingSecret: env.SLACK_SIGNING_SECRET, }); - return createChatSdkBridge({ adapter: slackAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: slackAdapter, concurrency: 'concurrent', supportsThreads: true }); }, }); diff --git a/src/channels/teams.ts b/src/channels/teams.ts index 591c5c7..f184bfe 100644 --- a/src/channels/teams.ts +++ b/src/channels/teams.ts @@ -16,6 +16,6 @@ registerChannelAdapter('teams', { appId: env.TEAMS_APP_ID, appPassword: env.TEAMS_APP_PASSWORD, }); - return createChatSdkBridge({ adapter: teamsAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: teamsAdapter, concurrency: 'concurrent', supportsThreads: true }); }, }); diff --git a/src/channels/telegram.ts b/src/channels/telegram.ts index 345419f..31bb197 100644 --- a/src/channels/telegram.ts +++ b/src/channels/telegram.ts @@ -26,6 +26,11 @@ registerChannelAdapter('telegram', { botToken: env.TELEGRAM_BOT_TOKEN, mode: 'polling', }); - return createChatSdkBridge({ adapter: telegramAdapter, concurrency: 'concurrent', extractReplyContext }); + return createChatSdkBridge({ + adapter: telegramAdapter, + concurrency: 'concurrent', + extractReplyContext, + supportsThreads: false, + }); }, }); diff --git a/src/channels/webex.ts b/src/channels/webex.ts index 63f1870..37b0e8e 100644 --- a/src/channels/webex.ts +++ b/src/channels/webex.ts @@ -16,6 +16,6 @@ registerChannelAdapter('webex', { botToken: env.WEBEX_BOT_TOKEN, webhookSecret: env.WEBEX_WEBHOOK_SECRET, }); - return createChatSdkBridge({ adapter: webexAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: webexAdapter, concurrency: 'concurrent', supportsThreads: true }); }, }); diff --git a/src/channels/whatsapp-cloud.ts b/src/channels/whatsapp-cloud.ts index e56eb99..9d3a5b1 100644 --- a/src/channels/whatsapp-cloud.ts +++ b/src/channels/whatsapp-cloud.ts @@ -24,6 +24,6 @@ registerChannelAdapter('whatsapp-cloud', { appSecret: env.WHATSAPP_APP_SECRET, verifyToken: env.WHATSAPP_VERIFY_TOKEN, }); - return createChatSdkBridge({ adapter: whatsappAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: whatsappAdapter, concurrency: 'concurrent', supportsThreads: false }); }, }); diff --git a/src/container-runner.ts b/src/container-runner.ts index 9881ca2..794f2a3 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -21,6 +21,7 @@ import { markContainerStopped, sessionDir, writeDestinations, + writeSessionRouting, } from './session-manager.js'; import type { AgentGroup, Session } from './types.js'; @@ -35,6 +36,16 @@ interface VolumeMount { /** Active containers tracked by session ID. */ const activeContainers = new Map(); +/** + * In-flight wake promises, keyed by session id. Deduplicates concurrent + * `wakeContainer` calls while the first spawn is still mid-setup (async + * buildContainerArgs, OneCLI gateway apply, etc.) — otherwise a second + * wake in that window passes the `activeContainers.has` check and spawns + * a duplicate container against the same session directory, producing + * racy double-replies. + */ +const wakePromises = new Map>(); + export function getActiveContainerCount(): number { return activeContainers.size; } @@ -44,27 +55,47 @@ export function isContainerRunning(sessionId: string): boolean { } /** - * Wake up a container for a session. If already running, no-op. + * Wake up a container for a session. If already running or mid-spawn, no-op + * (the in-flight wake promise is reused). + * * The container runs the v2 agent-runner which polls the session DB. */ -export async function wakeContainer(session: Session): Promise { +export function wakeContainer(session: Session): Promise { if (activeContainers.has(session.id)) { log.debug('Container already running', { sessionId: session.id }); - return; + return Promise.resolve(); } + const existing = wakePromises.get(session.id); + if (existing) { + log.debug('Container wake already in-flight — joining existing promise', { sessionId: session.id }); + return existing; + } + const promise = spawnContainer(session).finally(() => { + wakePromises.delete(session.id); + }); + wakePromises.set(session.id, promise); + return promise; +} +async function spawnContainer(session: Session): Promise { const agentGroup = getAgentGroup(session.agent_group_id); if (!agentGroup) { log.error('Agent group not found', { agentGroupId: session.agent_group_id }); return; } - // Refresh the destination map so any admin changes take effect on wake + // Refresh the destination map and default reply routing so any admin + // changes take effect on wake. writeDestinations(agentGroup.id, session.id); + writeSessionRouting(agentGroup.id, session.id); const mounts = buildMounts(agentGroup, session); const containerName = `nanoclaw-v2-${agentGroup.folder}-${Date.now()}`; - const agentIdentifier = agentGroup.is_admin ? undefined : agentGroup.folder.toLowerCase().replace(/_/g, '-'); + // OneCLI agent identifier is the agent group id. The admin group uses OneCLI's + // default agent (undefined), so unscoped credentials apply. Non-admin groups + // use their stable ag-xxx id, which is reversible via getAgentGroup() for + // approval-request routing. + const agentIdentifier = agentGroup.is_admin ? undefined : agentGroup.id; const args = await buildContainerArgs(mounts, containerName, session, agentGroup, agentIdentifier); log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName }); diff --git a/src/credentials.ts b/src/credentials.ts new file mode 100644 index 0000000..f4955c2 --- /dev/null +++ b/src/credentials.ts @@ -0,0 +1,312 @@ +/** + * Credential collection flow. + * + * Agent calls `trigger_credential_collection` — container writes a system + * action `request_credential` into outbound.db. This module: + * + * 1. Delivers an `[Enter credential] [Reject]` card to the admin channel. + * 2. On "Enter credential" click, the Chat SDK bridge opens a modal with a + * TextInput, captures the user's value in `onModalSubmit`, and calls + * `handleCredentialSubmit()` here. + * 3. We insert the secret into OneCLI and write a system chat message into + * the agent's session DB so the blocking MCP tool call returns. + * 4. The credential value never enters any session DB or log line. + */ +import { + createPendingCredential, + deletePendingCredential, + getPendingCredential as getPendingCredentialRow, + updatePendingCredentialMessageId, + updatePendingCredentialStatus, +} from './db/credentials.js'; +import { getMessagingGroup } from './db/messaging-groups.js'; +import type { ChannelDeliveryAdapter } from './delivery.js'; +import { log } from './log.js'; +import { createSecret, OneCLISecretError } from './onecli-secrets.js'; +import { writeSessionMessage } from './session-manager.js'; +import type { PendingCredential, Session } from './types.js'; +import { wakeContainer } from './container-runner.js'; + +let adapterRef: ChannelDeliveryAdapter | null = null; + +export function setCredentialDeliveryAdapter(adapter: ChannelDeliveryAdapter): void { + adapterRef = adapter; +} + +/** Handle a `request_credential` system action from a container. */ +export async function handleCredentialRequest( + content: Record, + session: Session, +): Promise { + if (!adapterRef) { + notifyAgentCredentialResult(session, content.credentialId as string, 'failed', 'delivery adapter not ready'); + return; + } + + const credentialId = (content.credentialId as string) || ''; + const name = (content.name as string) || ''; + const type = ((content.type as string) || 'generic') as 'generic' | 'anthropic'; + const hostPattern = (content.hostPattern as string) || ''; + const pathPattern = (content.pathPattern as string) || null; + const headerName = (content.headerName as string) || null; + const valueFormat = (content.valueFormat as string) || null; + const description = (content.description as string) || null; + + if (!credentialId || !name || !hostPattern) { + notifyAgentCredentialResult( + session, + credentialId, + 'failed', + 'name and hostPattern are required', + ); + return; + } + + // Deliver the credential card to the channel where the conversation is + // happening — not the admin channel. The user triggered this request by + // chatting with the agent, so the response surface is their chat channel. + if (!session.messaging_group_id) { + notifyAgentCredentialResult( + session, + credentialId, + 'failed', + 'session has no messaging group — cannot deliver credential card', + ); + return; + } + const mg = getMessagingGroup(session.messaging_group_id); + if (!mg) { + notifyAgentCredentialResult(session, credentialId, 'failed', 'messaging group not found'); + return; + } + + createPendingCredential({ + id: credentialId, + agent_group_id: session.agent_group_id, + session_id: session.id, + name, + type, + host_pattern: hostPattern, + path_pattern: pathPattern, + header_name: headerName, + value_format: valueFormat, + description, + channel_type: mg.channel_type, + platform_id: mg.platform_id, + platform_message_id: null, + status: 'pending', + created_at: new Date().toISOString(), + }); + + const question = buildCardText({ + name, + hostPattern, + headerName, + valueFormat, + description, + }); + + let platformMessageId: string | undefined; + try { + platformMessageId = await adapterRef.deliver( + mg.channel_type, + mg.platform_id, + session.thread_id, + 'chat-sdk', + JSON.stringify({ + type: 'credential_request', + credentialId, + question, + }), + ); + } catch (err) { + log.error('Failed to deliver credential request card', { credentialId, err }); + updatePendingCredentialStatus(credentialId, 'failed'); + notifyAgentCredentialResult(session, credentialId, 'failed', 'could not deliver card'); + return; + } + + if (platformMessageId) { + updatePendingCredentialMessageId(credentialId, platformMessageId); + } + + log.info('Credential request delivered', { credentialId, name, hostPattern }); +} + +/** Called by chat-sdk-bridge to fetch metadata for building the modal. */ +export function getCredentialForModal( + credentialId: string, +): { name: string; description: string | null; hostPattern: string } | null { + const row = getPendingCredentialRow(credentialId); + if (!row || row.status !== 'pending') return null; + return { name: row.name, description: row.description, hostPattern: row.host_pattern }; +} + +/** Admin clicked "Reject" on the card (or cancelled the modal). */ +export async function handleCredentialReject(credentialId: string): Promise { + const row = getPendingCredentialRow(credentialId); + if (!row) return; + updatePendingCredentialStatus(credentialId, 'rejected'); + + if (row.session_id) { + await notifyAgentSessionResult( + row.agent_group_id, + row.session_id, + credentialId, + 'rejected', + `Credential request for ${row.name} was rejected by admin.`, + ); + } + + deletePendingCredential(credentialId); + log.info('Credential request rejected', { credentialId }); +} + +/** + * Admin submitted the modal with a credential value. + * The value is held only long enough to call OneCLI and is then dropped. + */ +export async function handleCredentialSubmit(credentialId: string, value: string): Promise { + const row = getPendingCredentialRow(credentialId); + if (!row) { + log.warn('Credential submit for unknown id', { credentialId }); + return; + } + if (row.status !== 'pending') { + log.warn('Credential submit for non-pending row', { credentialId, status: row.status }); + return; + } + + updatePendingCredentialStatus(credentialId, 'submitted'); + + try { + await createSecret({ + name: row.name, + type: row.type, + value, + hostPattern: row.host_pattern, + pathPattern: row.path_pattern ?? undefined, + headerName: row.header_name ?? undefined, + valueFormat: row.value_format ?? undefined, + agentId: row.agent_group_id, // honored once OneCLI SDK adds scoping + }); + } catch (err) { + const reason = err instanceof OneCLISecretError ? err.message : String(err); + log.error('Failed to create OneCLI secret', { credentialId, reason }); + updatePendingCredentialStatus(credentialId, 'failed'); + if (row.session_id) { + await notifyAgentSessionResult( + row.agent_group_id, + row.session_id, + credentialId, + 'failed', + `Credential save failed: ${reason}`, + ); + } + deletePendingCredential(credentialId); + return; + } + + updatePendingCredentialStatus(credentialId, 'saved'); + log.info('Credential saved', { credentialId, name: row.name, hostPattern: row.host_pattern }); + + if (row.session_id) { + await notifyAgentSessionResult( + row.agent_group_id, + row.session_id, + credentialId, + 'saved', + `Credential "${row.name}" saved (host pattern: ${row.host_pattern}).`, + ); + } + + deletePendingCredential(credentialId); +} + +/** + * Fallback for inbound channels that don't support modals — the bridge calls + * this when `event.openModal()` is unavailable or returned undefined. + */ +export async function handleCredentialChannelUnsupported(credentialId: string): Promise { + const row = getPendingCredentialRow(credentialId); + if (!row) return; + updatePendingCredentialStatus(credentialId, 'failed'); + if (row.session_id) { + await notifyAgentSessionResult( + row.agent_group_id, + row.session_id, + credentialId, + 'failed', + `This channel doesn't support credential collection modals. Use Slack, Discord, Teams, or Google Chat.`, + ); + } + deletePendingCredential(credentialId); +} + +function notifyAgentCredentialResult( + session: Session, + credentialId: string, + status: 'saved' | 'rejected' | 'failed', + detail: string, +): void { + writeSessionMessage(session.agent_group_id, session.id, { + id: `cred-${credentialId}-${Date.now()}`, + kind: 'system', + timestamp: new Date().toISOString(), + platformId: session.agent_group_id, + channelType: 'agent', + threadId: null, + content: JSON.stringify({ + type: 'credential_response', + credentialId, + status, + detail, + }), + }); +} + +async function notifyAgentSessionResult( + agentGroupId: string, + sessionId: string, + credentialId: string, + status: 'saved' | 'rejected' | 'failed', + detail: string, +): Promise { + writeSessionMessage(agentGroupId, sessionId, { + id: `cred-${credentialId}-${Date.now()}`, + kind: 'system', + timestamp: new Date().toISOString(), + platformId: agentGroupId, + channelType: 'agent', + threadId: null, + content: JSON.stringify({ + type: 'credential_response', + credentialId, + status, + detail, + }), + }); + + const { getSession } = await import('./db/sessions.js'); + const session = getSession(sessionId); + if (session) await wakeContainer(session); +} + +function buildCardText(opts: { + name: string; + hostPattern: string; + headerName: string | null; + valueFormat: string | null; + description: string | null; +}): string { + const lines = [ + `🔑 Credential request: ${opts.name}`, + '', + `Host: \`${opts.hostPattern}\``, + ]; + if (opts.headerName) lines.push(`Header: \`${opts.headerName}\``); + if (opts.valueFormat) lines.push(`Format: \`${opts.valueFormat}\``); + if (opts.description) lines.push('', opts.description); + lines.push('', 'Click Enter credential to provide the value, or Reject to decline.'); + return lines.join('\n'); +} diff --git a/src/db/credentials.ts b/src/db/credentials.ts new file mode 100644 index 0000000..887cf96 --- /dev/null +++ b/src/db/credentials.ts @@ -0,0 +1,33 @@ +import type { PendingCredential, PendingCredentialStatus } from '../types.js'; +import { getDb } from './connection.js'; + +export function createPendingCredential(c: PendingCredential): void { + getDb() + .prepare( + `INSERT INTO pending_credentials + (id, agent_group_id, session_id, name, type, host_pattern, path_pattern, + header_name, value_format, description, channel_type, platform_id, + platform_message_id, status, created_at) + VALUES + (@id, @agent_group_id, @session_id, @name, @type, @host_pattern, @path_pattern, + @header_name, @value_format, @description, @channel_type, @platform_id, + @platform_message_id, @status, @created_at)`, + ) + .run(c); +} + +export function getPendingCredential(id: string): PendingCredential | undefined { + return getDb().prepare('SELECT * FROM pending_credentials WHERE id = ?').get(id) as PendingCredential | undefined; +} + +export function updatePendingCredentialStatus(id: string, status: PendingCredentialStatus): void { + getDb().prepare('UPDATE pending_credentials SET status = ? WHERE id = ?').run(status, id); +} + +export function updatePendingCredentialMessageId(id: string, platformMessageId: string): void { + getDb().prepare('UPDATE pending_credentials SET platform_message_id = ? WHERE id = ?').run(platformMessageId, id); +} + +export function deletePendingCredential(id: string): void { + getDb().prepare('DELETE FROM pending_credentials WHERE id = ?').run(id); +} diff --git a/src/db/db-v2.test.ts b/src/db/db-v2.test.ts index 9fdbb40..095e4be 100644 --- a/src/db/db-v2.test.ts +++ b/src/db/db-v2.test.ts @@ -58,12 +58,6 @@ describe('migrations', () => { runMigrations(db); }); - it('should track schema version', () => { - const db = initTestDb(); - runMigrations(db); - const row = db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number }; - expect(row.v).toBe(4); - }); }); // ── Agent Groups ── diff --git a/src/db/index.ts b/src/db/index.ts index 457da2a..4e777c3 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -36,4 +36,16 @@ export { createPendingQuestion, getPendingQuestion, deletePendingQuestion, + createPendingApproval, + getPendingApproval, + updatePendingApprovalStatus, + deletePendingApproval, + getPendingApprovalsByAction, } from './sessions.js'; +export { + createPendingCredential, + getPendingCredential, + updatePendingCredentialStatus, + updatePendingCredentialMessageId, + deletePendingCredential, +} from './credentials.js'; diff --git a/src/db/migrations/003-pending-approvals.ts b/src/db/migrations/003-pending-approvals.ts index 9fc2704..08b99c7 100644 --- a/src/db/migrations/003-pending-approvals.ts +++ b/src/db/migrations/003-pending-approvals.ts @@ -1,18 +1,39 @@ import type { Migration } from './index.js'; +/** + * `pending_approvals` table — host-side records for any approval-requiring + * request. Used by: + * - install_packages / request_rebuild / add_mcp_server (session-bound, + * `session_id` set, status stays at default 'pending' until handled) + * - OneCLI credential approvals from the SDK `configureManualApproval` + * callback (session_id may be null, action='onecli_credential'). + * + * The OneCLI-specific columns (`agent_group_id`, `channel_type`, `platform_id`, + * `platform_message_id`, `expires_at`, `status`) let the host edit the admin + * card when a request expires and sweep stale rows on startup. + */ export const migration003: Migration = { version: 3, name: 'pending-approvals', up(db) { db.exec(` CREATE TABLE pending_approvals ( - approval_id TEXT PRIMARY KEY, - session_id TEXT NOT NULL REFERENCES sessions(id), - request_id TEXT NOT NULL, - action TEXT NOT NULL, - payload TEXT NOT NULL, - created_at TEXT NOT NULL + approval_id TEXT PRIMARY KEY, + session_id TEXT REFERENCES sessions(id), + request_id TEXT NOT NULL, + action TEXT NOT NULL, + payload TEXT NOT NULL, + created_at TEXT NOT NULL, + agent_group_id TEXT REFERENCES agent_groups(id), + channel_type TEXT, + platform_id TEXT, + platform_message_id TEXT, + expires_at TEXT, + status TEXT NOT NULL DEFAULT 'pending' ); + + CREATE INDEX idx_pending_approvals_action_status + ON pending_approvals(action, status); `); }, }; diff --git a/src/db/migrations/005-pending-credentials.ts b/src/db/migrations/005-pending-credentials.ts new file mode 100644 index 0000000..beeb3d7 --- /dev/null +++ b/src/db/migrations/005-pending-credentials.ts @@ -0,0 +1,34 @@ +import type { Migration } from './index.js'; + +/** + * `pending_credentials` — backs the trigger_credential_collection flow. + * One row per in-flight credential request; status transitions + * pending → submitted → saved | rejected | failed. + */ +export const migration005: Migration = { + version: 5, + name: 'pending-credentials', + up(db) { + db.exec(` + CREATE TABLE pending_credentials ( + id TEXT PRIMARY KEY, + agent_group_id TEXT NOT NULL REFERENCES agent_groups(id), + session_id TEXT REFERENCES sessions(id), + name TEXT NOT NULL, + type TEXT NOT NULL, + host_pattern TEXT NOT NULL, + path_pattern TEXT, + header_name TEXT, + value_format TEXT, + description TEXT, + channel_type TEXT NOT NULL, + platform_id TEXT NOT NULL, + platform_message_id TEXT, + status TEXT NOT NULL DEFAULT 'pending', + created_at TEXT NOT NULL + ); + + CREATE INDEX idx_pending_credentials_status ON pending_credentials(status); + `); + }, +}; diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts index c210359..0f85458 100644 --- a/src/db/migrations/index.ts +++ b/src/db/migrations/index.ts @@ -5,6 +5,7 @@ import { migration001 } from './001-initial.js'; import { migration002 } from './002-chat-sdk-state.js'; import { migration003 } from './003-pending-approvals.js'; import { migration004 } from './004-agent-destinations.js'; +import { migration005 } from './005-pending-credentials.js'; export interface Migration { version: number; @@ -12,7 +13,7 @@ export interface Migration { up: (db: Database.Database) => void; } -const migrations: Migration[] = [migration001, migration002, migration003, migration004]; +const migrations: Migration[] = [migration001, migration002, migration003, migration004, migration005]; export function runMigrations(db: Database.Database): void { db.exec(` diff --git a/src/db/schema.ts b/src/db/schema.ts index 2c40d6e..acffa22 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -114,6 +114,18 @@ CREATE TABLE destinations ( platform_id TEXT, -- for type='channel' agent_group_id TEXT -- for type='agent' ); + +-- Default reply routing for this session. Single-row table (id=1). +-- Host overwrites on every container wake from the session's messaging_group +-- and thread_id. Container reads it in send_message / ask_user_question / +-- trigger_credential_collection to default the channel/thread of outbound +-- messages when the agent doesn't specify an explicit destination. +CREATE TABLE session_routing ( + id INTEGER PRIMARY KEY CHECK (id = 1), + channel_type TEXT, + platform_id TEXT, + thread_id TEXT +); `; /** Container-owned: outbound messages + processing acknowledgments. */ diff --git a/src/db/sessions.ts b/src/db/sessions.ts index 45e911f..e3338d0 100644 --- a/src/db/sessions.ts +++ b/src/db/sessions.ts @@ -93,13 +93,26 @@ export function deletePendingQuestion(questionId: string): void { // ── Pending Approvals ── -export function createPendingApproval(pa: PendingApproval): void { +export function createPendingApproval(pa: Partial & Pick): void { getDb() .prepare( - `INSERT INTO pending_approvals (approval_id, session_id, request_id, action, payload, created_at) - VALUES (@approval_id, @session_id, @request_id, @action, @payload, @created_at)`, + `INSERT INTO pending_approvals + (approval_id, session_id, request_id, action, payload, created_at, + agent_group_id, channel_type, platform_id, platform_message_id, expires_at, status) + VALUES + (@approval_id, @session_id, @request_id, @action, @payload, @created_at, + @agent_group_id, @channel_type, @platform_id, @platform_message_id, @expires_at, @status)`, ) - .run(pa); + .run({ + session_id: null, + agent_group_id: null, + channel_type: null, + platform_id: null, + platform_message_id: null, + expires_at: null, + status: 'pending', + ...pa, + }); } export function getPendingApproval(approvalId: string): PendingApproval | undefined { @@ -108,6 +121,14 @@ export function getPendingApproval(approvalId: string): PendingApproval | undefi | undefined; } +export function updatePendingApprovalStatus(approvalId: string, status: PendingApproval['status']): void { + getDb().prepare('UPDATE pending_approvals SET status = ? WHERE approval_id = ?').run(status, approvalId); +} + export function deletePendingApproval(approvalId: string): void { getDb().prepare('DELETE FROM pending_approvals WHERE approval_id = ?').run(approvalId); } + +export function getPendingApprovalsByAction(action: string): PendingApproval[] { + return getDb().prepare('SELECT * FROM pending_approvals WHERE action = ?').all(action) as PendingApproval[]; +} diff --git a/src/delivery.ts b/src/delivery.ts index 4d60715..fdcf054 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -720,6 +720,12 @@ async function handleSystemAction( break; } + case 'request_credential': { + const { handleCredentialRequest } = await import('./credentials.js'); + await handleCredentialRequest(content, session); + break; + } + default: log.warn('Unknown system action', { action }); } diff --git a/src/index.ts b/src/index.ts index e237834..c3a478d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,6 +13,19 @@ import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messa import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js'; import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js'; import { startHostSweep, stopHostSweep } from './host-sweep.js'; +import { + ONECLI_ACTION, + resolveOneCLIApproval, + startOneCLIApprovalHandler, + stopOneCLIApprovalHandler, +} from './onecli-approvals.js'; +import { + getCredentialForModal, + handleCredentialChannelUnsupported, + handleCredentialReject, + handleCredentialSubmit, + setCredentialDeliveryAdapter, +} from './credentials.js'; import { routeInbound } from './router.js'; import { getPendingQuestion, @@ -79,12 +92,35 @@ async function main(): Promise { log.error('Failed to handle question response', { questionId, err }); }); }, + getCredentialForModal, + onCredentialReject(credentialId) { + handleCredentialReject(credentialId).catch((err) => + log.error('Failed to handle credential reject', { credentialId, err }), + ); + }, + onCredentialSubmit(credentialId, value) { + handleCredentialSubmit(credentialId, value).catch((err) => + log.error('Failed to handle credential submit', { credentialId, err }), + ); + }, + onCredentialChannelUnsupported(credentialId) { + handleCredentialChannelUnsupported(credentialId).catch((err) => + log.error('Failed to handle credential channel-unsupported', { credentialId, err }), + ); + }, }; }); // 4. Delivery adapter bridge — dispatches to channel adapters - setDeliveryAdapter({ - async deliver(channelType, platformId, threadId, kind, content, files) { + const deliveryAdapter = { + async deliver( + channelType: string, + platformId: string, + threadId: string | null, + kind: string, + content: string, + files?: import('./channels/adapter.js').OutboundFile[], + ): Promise { const adapter = getChannelAdapter(channelType); if (!adapter) { log.warn('No adapter for channel type', { channelType }); @@ -92,11 +128,13 @@ async function main(): Promise { } return adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files }); }, - async setTyping(channelType, platformId, threadId) { + async setTyping(channelType: string, platformId: string, threadId: string | null): Promise { const adapter = getChannelAdapter(channelType); await adapter?.setTyping?.(platformId, threadId); }, - }); + }; + setDeliveryAdapter(deliveryAdapter); + setCredentialDeliveryAdapter(deliveryAdapter); // 5. Start delivery polls startActiveDeliveryPoll(); @@ -107,6 +145,9 @@ async function main(): Promise { startHostSweep(); log.info('Host sweep started'); + // 7. Start OneCLI manual-approval handler + startOneCLIApprovalHandler(deliveryAdapter); + log.info('NanoClaw v2 running'); } @@ -134,9 +175,20 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] { /** Handle a user's response to an ask_user_question card or an approval card. */ async function handleQuestionResponse(questionId: string, selectedOption: string, userId: string): Promise { + // OneCLI credential approvals — resolved via in-memory Promise, not session DB + if (resolveOneCLIApproval(questionId, selectedOption)) { + return; + } + // Check if this is a pending approval (install_packages, request_rebuild) const approval = getPendingApproval(questionId); if (approval) { + if (approval.action === ONECLI_ACTION) { + // Row exists but the in-memory resolver is gone (timer fired or process + // was in a weird state). Nothing to do — just drop the row. + deletePendingApproval(questionId); + return; + } await handleApprovalResponse(approval, selectedOption, userId); return; } @@ -188,6 +240,10 @@ async function handleApprovalResponse( selectedOption: string, userId: string, ): Promise { + if (!approval.session_id) { + deletePendingApproval(approval.approval_id); + return; + } const session = getSession(approval.session_id); if (!session) { deletePendingApproval(approval.approval_id); @@ -262,6 +318,7 @@ async function handleApprovalResponse( /** Graceful shutdown. */ async function shutdown(signal: string): Promise { log.info('Shutdown signal received', { signal }); + stopOneCLIApprovalHandler(); stopDeliveryPolls(); stopHostSweep(); await teardownChannelAdapters(); diff --git a/src/onecli-approvals.ts b/src/onecli-approvals.ts new file mode 100644 index 0000000..c8d6558 --- /dev/null +++ b/src/onecli-approvals.ts @@ -0,0 +1,252 @@ +/** + * OneCLI manual-approval handler. + * + * When the OneCLI gateway intercepts a credentialed request that needs human + * approval, it holds the HTTP connection open and fires our `configureManualApproval` + * callback. We: + * 1. Deliver an ask_question card to the admin channel (same routing as + * `requestApproval()` — global admin agent group's first messaging group). + * 2. Persist a `pending_approvals` row (action='onecli_credential') so we can + * edit the card on expiry and sweep stale rows at startup. + * 3. Wait on an in-memory Promise: resolved by the admin click + * (`resolveOneCLIApproval`) or by a local expiry timer. + * 4. On expiry, edit the card to "Expired" and return 'deny' — the gateway's + * HTTP side will have already closed, but we need to release the Promise + * so the SDK callback returns cleanly. + * + * Startup sweep edits any leftover cards from a previous process to + * "Expired (host restarted)" and drops the rows. + */ +import { OneCLI, type ApprovalRequest, type ManualApprovalHandle } from '@onecli-sh/sdk'; + +import { ONECLI_URL } from './config.js'; +import { getAdminAgentGroup, getAgentGroup } from './db/agent-groups.js'; +import { getMessagingGroupsByAgentGroup } from './db/messaging-groups.js'; +import { + createPendingApproval, + deletePendingApproval, + getPendingApprovalsByAction, + updatePendingApprovalStatus, +} from './db/sessions.js'; +import type { ChannelDeliveryAdapter } from './delivery.js'; +import { log } from './log.js'; +import type { PendingApproval } from './types.js'; + +export const ONECLI_ACTION = 'onecli_credential'; + +type Decision = 'approve' | 'deny'; + +const onecli = new OneCLI({ url: ONECLI_URL }); + +interface PendingState { + resolve: (decision: Decision) => void; + timer: NodeJS.Timeout; +} + +const pending = new Map(); +let handle: ManualApprovalHandle | null = null; +let adapterRef: ChannelDeliveryAdapter | null = null; + +/** + * Generate a short approval id for card buttons. + * + * OneCLI's native request.id is a UUID (36 bytes). When we put it into a card + * button's action id as `ncq::Approve`, Chat SDK's Telegram adapter then + * serializes both `id` and `value` into the Telegram `callback_data` field, + * which has a hard 64-byte limit. UUIDs push past that limit. + * + * Instead we generate a 10-byte id (`oa-` + 8 base36 chars) for the card, and + * keep the OneCLI request.id in the persisted payload for audit. The pending + * map, DB row, and button callback all use this short id; click handling + * looks up the short id and resolves the Promise that was waiting on it. + */ +function shortApprovalId(): string { + return `oa-${Math.random().toString(36).slice(2, 10)}`; +} + +/** Called from the main `handleQuestionResponse` path when a card button is clicked. */ +export function resolveOneCLIApproval(approvalId: string, selectedOption: string): boolean { + const state = pending.get(approvalId); + if (!state) return false; + pending.delete(approvalId); + clearTimeout(state.timer); + + const decision: Decision = selectedOption === 'Approve' ? 'approve' : 'deny'; + updatePendingApprovalStatus(approvalId, decision === 'approve' ? 'approved' : 'rejected'); + // Card is auto-edited to "✅