refactor(modules): extract approvals + interactive as registry-based modules
Phase 2 / PR #3 of the module refactor. Moves the approval and interactive- question flows out of core and into src/modules/, wired through the response dispatcher and delivery action registries. New modules: - src/modules/interactive/ — registers a response handler that claims pending_questions rows, writes question_response to the session DB, wakes the container. createPendingQuestion call stays inline in delivery.ts (guarded by hasTable) per plan. - src/modules/approvals/ — registers 3 delivery actions (install_packages, request_rebuild, add_mcp_server), a response handler for pending_approvals (including OneCLI action fall-through), an adapter-ready hook that boots the OneCLI manual-approval handler, and a shutdown hook that stops it. OneCLI implementation (src/onecli-approvals.ts) moves into the module. Core lifecycle hooks added (narrow, not registries): - onDeliveryAdapterReady(cb) in delivery.ts — fires when setDeliveryAdapter runs (or immediately if already set). Used by approvals for OneCLI boot. - onShutdown(cb) in index.ts — fires on SIGTERM/SIGINT. Used by approvals for OneCLI teardown. - getDeliveryAdapter() getter in delivery.ts — for live-flow adapter access in registered delivery actions. Core shrinks: delivery.ts 911 → 665 lines, index.ts 405 → 224 lines. dispatchResponse now logs "Unclaimed response" instead of falling through to an inline handler — the inline fallback moved into the two modules. Migration files renamed to the module-<name>-<short>.ts convention: - 003-pending-approvals.ts → module-approvals-pending-approvals.ts - 007-pending-approvals-title-options.ts → module-approvals-title-options.ts Migration.name fields unchanged so existing DBs treat them as already-applied. Degradation verified: emptying the modules barrel builds clean and 137/137 tests pass. Actions would log "Unknown system action"; button clicks would log "Unclaimed response". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
237
src/index.ts
237
src/index.ts
@@ -13,24 +13,7 @@ import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messa
|
||||
import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js';
|
||||
import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js';
|
||||
import { startHostSweep, stopHostSweep } from './host-sweep.js';
|
||||
import {
|
||||
ONECLI_ACTION,
|
||||
resolveOneCLIApproval,
|
||||
startOneCLIApprovalHandler,
|
||||
stopOneCLIApprovalHandler,
|
||||
} from './onecli-approvals.js';
|
||||
import { routeInbound } from './router.js';
|
||||
import {
|
||||
getPendingQuestion,
|
||||
deletePendingQuestion,
|
||||
getPendingApproval,
|
||||
deletePendingApproval,
|
||||
getSession,
|
||||
} from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import { updateContainerConfig } from './container-config.js';
|
||||
import { writeSessionMessage } from './session-manager.js';
|
||||
import { wakeContainer, buildAgentGroupImage, killContainer } from './container-runner.js';
|
||||
import { log } from './log.js';
|
||||
|
||||
/**
|
||||
@@ -38,11 +21,12 @@ import { log } from './log.js';
|
||||
*
|
||||
* Button-click / question responses arrive via the channel adapter's
|
||||
* `onAction` callback. Core iterates registered handlers in registration
|
||||
* order; the first one that returns `true` claims the response.
|
||||
* Unclaimed responses fall through to the inline `handleQuestionResponse`
|
||||
* below (which handles OneCLI credential approvals, pending_approvals,
|
||||
* and pending_questions). As those modules are extracted, the inline
|
||||
* function will shrink and the registry will own the full dispatch.
|
||||
* order; the first one that returns `true` claims the response. Unclaimed
|
||||
* responses are logged and dropped.
|
||||
*
|
||||
* Current consumers: interactive module (pending_questions), approvals
|
||||
* module (pending_approvals + OneCLI). If neither is loaded, every click
|
||||
* logs "Unclaimed response".
|
||||
*/
|
||||
export interface ResponsePayload {
|
||||
questionId: string;
|
||||
@@ -61,6 +45,20 @@ export function registerResponseHandler(handler: ResponseHandler): void {
|
||||
responseHandlers.push(handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown callbacks. Modules with teardown needs (timers, outbound sockets)
|
||||
* register here. Called in registration order during SIGTERM / SIGINT
|
||||
* before core's delivery/sweep/channel teardown.
|
||||
*
|
||||
* Not a general-purpose registry — narrow lifecycle hook only.
|
||||
*/
|
||||
type ShutdownCallback = () => void | Promise<void>;
|
||||
const shutdownCallbacks: ShutdownCallback[] = [];
|
||||
|
||||
export function onShutdown(cb: ShutdownCallback): void {
|
||||
shutdownCallbacks.push(cb);
|
||||
}
|
||||
|
||||
async function dispatchResponse(payload: ResponsePayload): Promise<void> {
|
||||
for (const handler of responseHandlers) {
|
||||
try {
|
||||
@@ -70,8 +68,7 @@ async function dispatchResponse(payload: ResponsePayload): Promise<void> {
|
||||
log.error('Response handler threw', { questionId: payload.questionId, err });
|
||||
}
|
||||
}
|
||||
// Unclaimed — fall through to inline handler.
|
||||
await handleQuestionResponse(payload.questionId, payload.value, payload.userId ?? '');
|
||||
log.warn('Unclaimed response', { questionId: payload.questionId, value: payload.value });
|
||||
}
|
||||
|
||||
// Channel barrel — each enabled channel self-registers on import.
|
||||
@@ -133,9 +130,8 @@ async function main(): Promise<void> {
|
||||
userId,
|
||||
channelType: adapter.channelType,
|
||||
// platformId/threadId aren't surfaced by the current onAction
|
||||
// signature — the inline fallback looks them up from the
|
||||
// pending_question / pending_approval row. Registered handlers
|
||||
// typically do the same.
|
||||
// signature — registered handlers look them up from the
|
||||
// pending_question / pending_approval row.
|
||||
platformId: '',
|
||||
threadId: null,
|
||||
}).catch((err) => {
|
||||
@@ -178,9 +174,6 @@ async function main(): Promise<void> {
|
||||
startHostSweep();
|
||||
log.info('Host sweep started');
|
||||
|
||||
// 7. Start OneCLI manual-approval handler
|
||||
startOneCLIApprovalHandler(deliveryAdapter);
|
||||
|
||||
log.info('NanoClaw running');
|
||||
}
|
||||
|
||||
@@ -206,186 +199,16 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] {
|
||||
return configs;
|
||||
}
|
||||
|
||||
/** Handle a user's response to an ask_user_question card or an approval card. */
|
||||
async function handleQuestionResponse(questionId: string, selectedOption: string, userId: string): Promise<void> {
|
||||
// OneCLI credential approvals — resolved via in-memory Promise, not session DB
|
||||
if (resolveOneCLIApproval(questionId, selectedOption)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if this is a pending approval (install_packages, request_rebuild)
|
||||
const approval = getPendingApproval(questionId);
|
||||
if (approval) {
|
||||
if (approval.action === ONECLI_ACTION) {
|
||||
// Row exists but the in-memory resolver is gone (timer fired or process
|
||||
// was in a weird state). Nothing to do — just drop the row.
|
||||
deletePendingApproval(questionId);
|
||||
return;
|
||||
}
|
||||
await handleApprovalResponse(approval, selectedOption, userId);
|
||||
return;
|
||||
}
|
||||
|
||||
const pq = getPendingQuestion(questionId);
|
||||
if (!pq) {
|
||||
log.warn('Pending question not found (may have expired)', { questionId });
|
||||
return;
|
||||
}
|
||||
|
||||
const session = getSession(pq.session_id);
|
||||
if (!session) {
|
||||
log.warn('Session not found for pending question', { questionId, sessionId: pq.session_id });
|
||||
deletePendingQuestion(questionId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Write the response to the session DB as a system message
|
||||
writeSessionMessage(session.agent_group_id, session.id, {
|
||||
id: `qr-${questionId}-${Date.now()}`,
|
||||
kind: 'system',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: pq.platform_id,
|
||||
channelType: pq.channel_type,
|
||||
threadId: pq.thread_id,
|
||||
content: JSON.stringify({
|
||||
type: 'question_response',
|
||||
questionId,
|
||||
selectedOption,
|
||||
userId,
|
||||
}),
|
||||
});
|
||||
|
||||
deletePendingQuestion(questionId);
|
||||
log.info('Question response routed', { questionId, selectedOption, sessionId: session.id });
|
||||
|
||||
// Wake the container so the MCP tool's poll picks up the response
|
||||
await wakeContainer(session);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle an admin's response to an approval card.
|
||||
* Fire-and-forget model: the agent doesn't poll for this — we write a chat
|
||||
* notification to its session DB, and optionally kill the container so the
|
||||
* next wake picks up new config/images.
|
||||
*/
|
||||
async function handleApprovalResponse(
|
||||
approval: import('./types.js').PendingApproval,
|
||||
selectedOption: string,
|
||||
userId: string,
|
||||
): Promise<void> {
|
||||
if (!approval.session_id) {
|
||||
deletePendingApproval(approval.approval_id);
|
||||
return;
|
||||
}
|
||||
const session = getSession(approval.session_id);
|
||||
if (!session) {
|
||||
deletePendingApproval(approval.approval_id);
|
||||
return;
|
||||
}
|
||||
|
||||
const notify = (text: string): void => {
|
||||
writeSessionMessage(session.agent_group_id, session.id, {
|
||||
id: `appr-note-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: session.agent_group_id,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: JSON.stringify({ text, sender: 'system', senderId: 'system' }),
|
||||
});
|
||||
};
|
||||
|
||||
if (selectedOption !== 'approve') {
|
||||
notify(`Your ${approval.action} request was rejected by admin.`);
|
||||
log.info('Approval rejected', { approvalId: approval.approval_id, action: approval.action, userId });
|
||||
deletePendingApproval(approval.approval_id);
|
||||
await wakeContainer(session);
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = JSON.parse(approval.payload);
|
||||
|
||||
if (approval.action === 'install_packages') {
|
||||
const agentGroup = getAgentGroup(session.agent_group_id);
|
||||
if (!agentGroup) {
|
||||
notify('install_packages approved but agent group missing.');
|
||||
return;
|
||||
}
|
||||
updateContainerConfig(agentGroup.folder, (cfg) => {
|
||||
if (payload.apt) cfg.packages.apt.push(...(payload.apt as string[]));
|
||||
if (payload.npm) cfg.packages.npm.push(...(payload.npm as string[]));
|
||||
});
|
||||
|
||||
const pkgs = [...(payload.apt || []), ...(payload.npm || [])].join(', ');
|
||||
log.info('Package install approved', { approvalId: approval.approval_id, userId });
|
||||
try {
|
||||
await buildAgentGroupImage(session.agent_group_id);
|
||||
killContainer(session.id, 'rebuild applied');
|
||||
// Schedule a follow-up prompt a few seconds after kill so the host sweep
|
||||
// respawns the container on the new image and the agent verifies + reports.
|
||||
writeSessionMessage(session.agent_group_id, session.id, {
|
||||
id: `appr-note-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: session.agent_group_id,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: JSON.stringify({
|
||||
text: `Packages installed (${pkgs}) and container rebuilt. Verify the new packages are available (e.g. run them or check versions) and report the result to the user.`,
|
||||
sender: 'system',
|
||||
senderId: 'system',
|
||||
}),
|
||||
processAfter: new Date(Date.now() + 5000)
|
||||
.toISOString()
|
||||
.replace('T', ' ')
|
||||
.replace(/\.\d+Z$/, ''),
|
||||
});
|
||||
log.info('Container rebuild completed (bundled with install)', { approvalId: approval.approval_id });
|
||||
} catch (e) {
|
||||
notify(
|
||||
`Packages added to config (${pkgs}) but rebuild failed: ${e instanceof Error ? e.message : String(e)}. Call request_rebuild to retry.`,
|
||||
);
|
||||
log.error('Bundled rebuild failed after install approval', { approvalId: approval.approval_id, err: e });
|
||||
}
|
||||
} else if (approval.action === 'request_rebuild') {
|
||||
try {
|
||||
await buildAgentGroupImage(session.agent_group_id);
|
||||
// Kill the container so the next wake uses the new image
|
||||
killContainer(session.id, 'rebuild applied');
|
||||
notify('Container image rebuilt. Your container will restart with the new image on the next message.');
|
||||
log.info('Container rebuild approved and completed', { approvalId: approval.approval_id, userId });
|
||||
} catch (e) {
|
||||
notify(`Rebuild failed: ${e instanceof Error ? e.message : String(e)}`);
|
||||
log.error('Container rebuild failed', { approvalId: approval.approval_id, err: e });
|
||||
}
|
||||
} else if (approval.action === 'add_mcp_server') {
|
||||
const agentGroup = getAgentGroup(session.agent_group_id);
|
||||
if (!agentGroup) {
|
||||
notify('add_mcp_server approved but agent group missing.');
|
||||
return;
|
||||
}
|
||||
updateContainerConfig(agentGroup.folder, (cfg) => {
|
||||
cfg.mcpServers[payload.name as string] = {
|
||||
command: payload.command as string,
|
||||
args: (payload.args as string[]) || [],
|
||||
env: (payload.env as Record<string, string>) || {},
|
||||
};
|
||||
});
|
||||
|
||||
// Kill the container so next wake loads the new MCP server config
|
||||
killContainer(session.id, 'mcp server added');
|
||||
notify(`MCP server "${payload.name}" added. Your container will restart with it on the next message.`);
|
||||
log.info('MCP server add approved', { approvalId: approval.approval_id, userId });
|
||||
}
|
||||
|
||||
deletePendingApproval(approval.approval_id);
|
||||
await wakeContainer(session);
|
||||
}
|
||||
|
||||
/** Graceful shutdown. */
|
||||
async function shutdown(signal: string): Promise<void> {
|
||||
log.info('Shutdown signal received', { signal });
|
||||
stopOneCLIApprovalHandler();
|
||||
for (const cb of shutdownCallbacks) {
|
||||
try {
|
||||
await cb();
|
||||
} catch (err) {
|
||||
log.error('Shutdown callback threw', { err });
|
||||
}
|
||||
}
|
||||
stopDeliveryPolls();
|
||||
stopHostSweep();
|
||||
await teardownChannelAdapters();
|
||||
|
||||
Reference in New Issue
Block a user