From c31bb02c06c8d767fe2c99fc95d0d17806483408 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Thu, 9 Apr 2026 03:26:16 +0300 Subject: [PATCH] v2 phase 5: pending questions with interactive cards End-to-end ask_user_question flow: - Agent MCP tool writes question card to messages_out - Host delivery creates pending_questions row, delivers as Discord Card with buttons - Local webhook server receives Gateway INTERACTION_CREATE events - Acknowledges interaction + updates card to show selected answer - Routes response back to session DB as system message - MCP tool poll picks up response and returns to agent Key fixes: - Poll loop now skips system messages (reserved for MCP tool responses) - Gateway listener uses webhookUrl forwarding mode for interaction support - Button custom_id encodes questionId + option text for self-contained routing Co-Authored-By: Claude Opus 4.6 (1M context) --- container/agent-runner/src/poll-loop.ts | 13 +- src/channels/adapter.ts | 3 + src/channels/channel-registry.test.ts | 2 + src/channels/chat-sdk-bridge.ts | 150 +++++++++++++++++++++++- src/channels/discord-v2.ts | 2 +- src/delivery.ts | 23 +++- src/index-v2.ts | 46 ++++++++ 7 files changed, 233 insertions(+), 6 deletions(-) diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index aca3766..474be8b 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -38,8 +38,16 @@ export async function runPollLoop(config: PollLoopConfig): Promise { let sessionId: string | undefined; let resumeAt: string | undefined; + let pollCount = 0; while (true) { - const messages = getPendingMessages(); + // Skip system messages — they're responses for MCP tools (e.g., ask_user_question) + const messages = getPendingMessages().filter((m) => m.kind !== 'system'); + pollCount++; + + // Periodic heartbeat so we know the loop is alive + if (pollCount % 30 === 0) { + log(`Poll heartbeat (${pollCount} iterations, ${messages.length} pending)`); + } if (messages.length === 0) { await sleep(POLL_INTERVAL_MS); @@ -210,7 +218,8 @@ async function processQuery(query: AgentQuery, routing: RoutingContext, config: const pollHandle = setInterval(() => { if (done) return; - const newMessages = getPendingMessages(); + // Skip system messages — they're responses for MCP tools (e.g., ask_user_question) + const newMessages = getPendingMessages().filter((m) => m.kind !== 'system'); if (newMessages.length > 0) { const newIds = newMessages.map((m) => m.id); markProcessing(newIds); diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index 56eb8f0..615c28e 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -24,6 +24,9 @@ export interface ChannelSetup { /** Called when the adapter discovers metadata about a conversation. */ onMetadata(platformId: string, name?: string, isGroup?: boolean): void; + + /** Called when a user clicks a button/action in a card (e.g., ask_user_question response). */ + onAction(questionId: string, selectedOption: string, userId: string): void; } /** Inbound message from adapter to host. */ diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index d78761b..1903791 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -103,6 +103,7 @@ describe('channel registry', () => { conversations: [], onInbound: () => {}, onMetadata: () => {}, + onAction: () => {}, })); // Should not have any active adapters for channels with null factory returns @@ -205,6 +206,7 @@ describe('channel + router integration', () => { conversations: [], onInbound: () => {}, onMetadata: () => {}, + onAction: () => {}, })); // Set up delivery adapter bridge (same pattern as index-v2.ts) diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 853e2c4..e3f486b 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -4,7 +4,9 @@ * * Used by Discord, Slack, and other Chat SDK-supported platforms. */ -import { Chat, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage } from 'chat'; +import http from 'http'; + +import { Chat, Card, CardText, Actions, Button, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage } from 'chat'; import { createMemoryState } from '@chat-adapter/state-memory'; import { log } from '../log.js'; @@ -16,12 +18,15 @@ interface GatewayAdapter extends Adapter { options: { waitUntil?: (task: Promise) => void }, durationMs?: number, abortSignal?: AbortSignal, + webhookUrl?: string, ): Promise; } export interface ChatSdkBridgeConfig { adapter: GatewayAdapter; concurrency?: ConcurrencyStrategy; + /** Bot token for authenticating forwarded Gateway events (required for interaction handling). */ + botToken?: string; } export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter { @@ -87,6 +92,17 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter await thread.subscribe(); }); + // Handle button clicks (ask_user_question responses) + chat.onAction(async (event) => { + if (!event.actionId.startsWith('ncq:')) return; + const parts = event.actionId.split(':'); + if (parts.length < 3) return; + const questionId = parts[1]; + const selectedOption = event.value || ''; + const userId = event.user?.userId || ''; + setupConfig.onAction(questionId, selectedOption, userId); + }); + await chat.initialize(); // Subscribe registered conversations (after initialize connects state) @@ -100,6 +116,10 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter // Start Gateway listener for adapters that support it (e.g., Discord) if (adapter.startGatewayListener) { gatewayAbort = new AbortController(); + + // Start local HTTP server to receive forwarded Gateway events (including interactions) + const webhookUrl = await startLocalWebhookServer(adapter, setupConfig, config.botToken); + const startGateway = () => { if (gatewayAbort?.signal.aborted) return; // Capture the long-running listener promise via waitUntil @@ -112,6 +132,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter }, 24 * 60 * 60 * 1000, gatewayAbort!.signal, + webhookUrl, ).then(() => { // startGatewayListener resolves immediately with a Response; // the actual work is in the listenerPromise passed to waitUntil @@ -155,6 +176,25 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter return; } + // Ask question card — render as Card with buttons + if (content.type === 'ask_question' && content.questionId && content.options) { + const questionId = content.questionId as string; + const options = content.options as string[]; + const card = Card({ + title: '❓ Question', + children: [ + CardText(content.question as string), + Actions( + options.map((opt) => + Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }), + ), + ), + ], + }); + await adapter.postMessage(tid, { card, fallbackText: `${content.question}\nOptions: ${options.join(', ')}` }); + return; + } + // Normal message const text = (content.markdown as string) || (content.text as string); if (text) { @@ -199,3 +239,111 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter }, }; } + +/** + * Start a local HTTP server to receive forwarded Gateway events. + * This is needed because the Gateway listener in webhook-forwarding mode + * sends ALL raw events (including INTERACTION_CREATE for button clicks) + * to the webhookUrl, which we handle here. + */ +function startLocalWebhookServer(adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string): Promise { + return new Promise((resolve) => { + const server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', (chunk: Buffer) => chunks.push(chunk)); + req.on('end', () => { + const body = Buffer.concat(chunks).toString(); + handleForwardedEvent(body, adapter, setupConfig, botToken) + .then(() => { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end('{"ok":true}'); + }) + .catch((err) => { + log.error('Webhook server error', { err }); + res.writeHead(500); + res.end('{"error":"internal"}'); + }); + }); + }); + + server.listen(0, '127.0.0.1', () => { + const addr = server.address() as { port: number }; + const url = `http://127.0.0.1:${addr.port}/webhook`; + log.info('Local webhook server started', { port: addr.port }); + resolve(url); + }); + }); +} + +async function handleForwardedEvent(body: string, adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string): Promise { + let event: { type: string; data: Record }; + try { + event = JSON.parse(body); + } catch { + return; + } + + // Handle interaction events (button clicks) — not handled by adapter's handleForwardedGatewayEvent + if (event.type === 'GATEWAY_INTERACTION_CREATE' && event.data) { + const interaction = event.data; + // type 3 = MessageComponent (button/select) + if (interaction.type === 3) { + const customId = (interaction.data as Record)?.custom_id as string; + const user = (interaction.member as Record)?.user as Record | undefined; + const interactionId = interaction.id as string; + const interactionToken = interaction.token as string; + + // Parse the selected option from custom_id + let questionId: string | undefined; + let selectedOption: string | undefined; + if (customId?.startsWith('ncq:')) { + const colonIdx = customId.indexOf(':', 4); // after "ncq:" + if (colonIdx !== -1) { + questionId = customId.slice(4, colonIdx); + selectedOption = customId.slice(colonIdx + 1); + } + } + + // Update the card to show the selected answer and remove buttons + const originalEmbeds = ((interaction.message as Record)?.embeds as Array>) || []; + const originalDescription = (originalEmbeds[0]?.description as string) || ''; + try { + await fetch(`https://discord.com/api/v10/interactions/${interactionId}/${interactionToken}/callback`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + type: 7, // UPDATE_MESSAGE — acknowledge + update in one call + data: { + embeds: [ + { + title: '❓ Question', + description: `${originalDescription}\n\n✅ **${selectedOption || customId}**`, + }, + ], + components: [], // remove buttons + }, + }), + }); + } catch (err) { + log.error('Failed to update interaction', { err }); + } + + // Dispatch to host + if (questionId && selectedOption) { + setupConfig.onAction(questionId, selectedOption, user?.id || ''); + } + return; + } + } + + // Forward other events to the adapter's webhook handler for normal processing + const fakeRequest = new Request('http://localhost/webhook', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-discord-gateway-token': botToken || '', + }, + body, + }); + await adapter.handleWebhook(fakeRequest, {}); +} diff --git a/src/channels/discord-v2.ts b/src/channels/discord-v2.ts index 5eb32ed..01ed4c5 100644 --- a/src/channels/discord-v2.ts +++ b/src/channels/discord-v2.ts @@ -17,6 +17,6 @@ registerChannelAdapter('discord', { publicKey: env.DISCORD_PUBLIC_KEY, applicationId: env.DISCORD_APPLICATION_ID, }); - return createChatSdkBridge({ adapter: discordAdapter, concurrency: 'concurrent' }); + return createChatSdkBridge({ adapter: discordAdapter, concurrency: 'concurrent', botToken: env.DISCORD_BOT_TOKEN }); }, }); diff --git a/src/delivery.ts b/src/delivery.ts index 246e67c..8d1c268 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -6,7 +6,7 @@ import Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; -import { getRunningSessions, getActiveSessions } from './db/sessions.js'; +import { getRunningSessions, getActiveSessions, createPendingQuestion } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { log } from './log.js'; import { openSessionDb, sessionDir } from './session-manager.js'; @@ -157,6 +157,20 @@ async function deliverMessage( return; } + // Track pending questions for ask_user_question flow + if (content.type === 'ask_question' && content.questionId) { + createPendingQuestion({ + question_id: content.questionId, + session_id: session.id, + message_out_id: msg.id, + platform_id: msg.platform_id, + channel_type: msg.channel_type, + thread_id: msg.thread_id, + created_at: new Date().toISOString(), + }); + log.info('Pending question created', { questionId: content.questionId, sessionId: session.id }); + } + // Channel delivery if (!msg.channel_type || !msg.platform_id) { log.warn('Message missing routing fields', { id: msg.id }); @@ -180,7 +194,12 @@ async function deliverMessage( } await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content, files); - log.info('Message delivered', { id: msg.id, channelType: msg.channel_type, platformId: msg.platform_id, fileCount: files?.length }); + log.info('Message delivered', { + id: msg.id, + channelType: msg.channel_type, + platformId: msg.platform_id, + fileCount: files?.length, + }); // Clean up outbox directory after successful delivery if (fs.existsSync(outboxDir)) { diff --git a/src/index-v2.ts b/src/index-v2.ts index eca93f6..a72540b 100644 --- a/src/index-v2.ts +++ b/src/index-v2.ts @@ -14,6 +14,9 @@ import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runti import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js'; import { startHostSweep, stopHostSweep } from './host-sweep.js'; import { routeInbound } from './router-v2.js'; +import { getPendingQuestion, deletePendingQuestion, getSession } from './db/sessions.js'; +import { writeSessionMessage } from './session-manager.js'; +import { wakeContainer } from './container-runner-v2.js'; import { log } from './log.js'; // Channel imports — each triggers self-registration @@ -63,6 +66,11 @@ async function main(): Promise { isGroup, }); }, + onAction(questionId, selectedOption, userId) { + handleQuestionResponse(questionId, selectedOption, userId).catch((err) => { + log.error('Failed to handle question response', { questionId, err }); + }); + }, }; }); @@ -116,6 +124,44 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] { return configs; } +/** Handle a user's response to an ask_user_question card. */ +async function handleQuestionResponse(questionId: string, selectedOption: string, userId: string): Promise { + const pq = getPendingQuestion(questionId); + if (!pq) { + log.warn('Pending question not found (may have expired)', { questionId }); + return; + } + + const session = getSession(pq.session_id); + if (!session) { + log.warn('Session not found for pending question', { questionId, sessionId: pq.session_id }); + deletePendingQuestion(questionId); + return; + } + + // Write the response to the session DB as a system message + writeSessionMessage(session.agent_group_id, session.id, { + id: `qr-${questionId}-${Date.now()}`, + kind: 'system', + timestamp: new Date().toISOString(), + platformId: pq.platform_id, + channelType: pq.channel_type, + threadId: pq.thread_id, + content: JSON.stringify({ + type: 'question_response', + questionId, + selectedOption, + userId, + }), + }); + + deletePendingQuestion(questionId); + log.info('Question response routed', { questionId, selectedOption, sessionId: session.id }); + + // Wake the container so the MCP tool's poll picks up the response + await wakeContainer(session); +} + /** Graceful shutdown. */ async function shutdown(signal: string): Promise { log.info('Shutdown signal received', { signal });