Files
nanoclaw/container/agent-runner/src/mcp-tools/scheduling.ts
gavrielc e92b245399 feat(v2): OneCLI 0.3.1 — approvals, credential collection, threaded routing
Three features built on top of @onecli-sh/sdk 0.3.1, landed together because
they share wiring surfaces (session DB schema, delivery dispatcher, Chat SDK
bridge, channel adapter contract).

## OneCLI manual-approval handler

* `src/onecli-approvals.ts` — long-polls OneCLI via the SDK's
  `configureManualApproval`; on each request, delivers an `ask_question` card
  to the admin agent group's first messaging group, persists a
  `pending_approvals` row, and waits on an in-memory Promise resolved by the
  admin's button click or an expiry timer. Expired cards are edited to
  "Expired (...)" and a startup sweep flushes any rows left over from a
  previous process.
* Short 11-byte approval id (`oa-<8 base36>`) instead of the SDK's UUID so the
  Telegram 64-byte `callback_data` limit is respected; the OneCLI UUID stays
  in the persisted payload for audit.
* Migration 003 consolidated: `pending_approvals` now has the OneCLI-aware
  columns from the start (`agent_group_id`, `channel_type`, `platform_id`,
  `platform_message_id`, `expires_at`, `status`), `session_id` relaxed to
  nullable so cross-session approvals fit.
* `handleQuestionResponse` in `src/index.ts` now routes OneCLI approvals
  through `resolveOneCLIApproval` before falling back to the
  session-bound approval path.

## Credential collection from chat

New `trigger_credential_collection` MCP tool — the agent researches a
third-party API, calls the tool with `{name, hostPattern, headerName,
valueFormat, description}`, and blocks until the host reports saved, rejected,
or failed. The credential value never enters the agent's context: the user
submits it into a Chat SDK Modal on the host side, the host writes it to
OneCLI via a thin facade (`src/onecli-secrets.ts` — shells out to
`onecli secrets create`, shape mirrors the SDK we expect upstream), and only
the status string flows back to the container via a system message.

* `src/credentials.ts` — host-side handler: delivers the card to the
  conversation's own channel (not the admin channel — credential collection
  is a user-facing flow, distinct from admin approval), persists a
  `pending_credentials` row, drives the submit → `createSecret` → notify
  pipeline. Falls back gracefully when the channel doesn't support modals.
* `src/db/credentials.ts` + migration 005: `pending_credentials` table.
* `src/channels/chat-sdk-bridge.ts`: renders a `credential_request` card,
  handles the `nccr:` action prefix by opening a Modal with a TextInput,
  registers an `onModalSubmit` handler for the `nccm:` callback prefix.
* `container/agent-runner/src/mcp-tools/credentials.ts`: the blocking MCP
  tool, mirroring the `ask_user_question` polling pattern.
* `container/agent-runner/src/db/messages-in.ts`: `findCredentialResponse`
  helper to pick up the system message the host writes back.

## Threaded adapter routing

The destination layer previously didn't carry thread context, so agent replies
to Discord always landed in the root channel regardless of which thread the
inbound came from.

* `ChannelAdapter.supportsThreads: boolean` — declared by every channel skill
  at `createChatSdkBridge`. Threaded: Discord, Slack, Teams, Google Chat,
  Linear, GitHub, Webex. Non-threaded: Telegram, WhatsApp Cloud, Matrix,
  Resend, iMessage.
* `src/router.ts`: non-threaded adapters strip `threadId` at ingest (threads
  collapse to channel-level sessions). Threaded adapters override the
  wiring's `session_mode` to `'per-thread'` so each thread = a session
  (except `agent-shared`, which is preserved as a cross-channel intent the
  adapter can't know about).
* `session_routing` table in `inbound.db` — single-row default reply routing
  written by the host on every container wake from
  `session.messaging_group_id` + `session.thread_id`. Forward-compat
  `CREATE TABLE IF NOT EXISTS` handles older session DBs lazily.
* `container/agent-runner/src/db/session-routing.ts` — container-side reader.
* `send_message` / `send_file` / `ask_user_question` / `send_card` /
  scheduling tools all default their routing (channel, platform, **and**
  thread) from the session when no explicit `to` is given. Explicit `to`
  uses the destination's channel with `thread_id = null` (cross-destination
  sends start a new conversation elsewhere).
* `poll-loop.ts::sendToDestination` (the final-text single-destination
  shortcut) now inherits `thread_id` from `RoutingContext` too — this was
  the root cause of Discord replies landing in the root channel even after
  `send_message` was wired correctly.

## Related cleanups

* `src/container-runner.ts`: OneCLI agent identifier switched from the lossy
  folder-derived string to `agent_group.id`, making `getAgentGroup(externalId)`
  a trivial reverse lookup for per-agent scoping.
* `wakeContainer` race fix via an in-flight promise map — concurrent wakes
  during the async buildContainerArgs / OneCLI `applyContainerConfig` window
  no longer double-spawn containers against the same session directory.
* `src/db/db-v2.test.ts`: dropped the brittle `expect(row.v).toBe(N)` schema
  version assertion — it had to be bumped on every migration addition.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 17:18:21 +03:00

201 lines
6.5 KiB
TypeScript

/**
* Scheduling MCP tools: schedule_task, list_tasks, cancel_task, pause_task, resume_task.
*
* With the two-DB split, the container cannot write to inbound.db (host-owned).
* Scheduling operations are sent as system actions via messages_out — the host
* reads them during delivery and applies the changes to inbound.db.
*/
import { getInboundDb } from '../db/connection.js';
import { writeMessageOut } from '../db/messages-out.js';
import { getSessionRouting } from '../db/session-routing.js';
import type { McpToolDefinition } from './types.js';
function log(msg: string): void {
console.error(`[mcp-tools] ${msg}`);
}
function generateId(): string {
return `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
function routing() {
return getSessionRouting();
}
function ok(text: string) {
return { content: [{ type: 'text' as const, text }] };
}
function err(text: string) {
return { content: [{ type: 'text' as const, text: `Error: ${text}` }], isError: true };
}
export const scheduleTask: McpToolDefinition = {
tool: {
name: 'schedule_task',
description:
'Schedule a one-shot or recurring task. The task will be processed at the specified time. Use cron expressions for recurring tasks.',
inputSchema: {
type: 'object' as const,
properties: {
prompt: { type: 'string', description: 'Task instructions/prompt' },
processAfter: { type: 'string', description: 'ISO timestamp for first run (e.g., 2024-01-15T09:00:00Z)' },
recurrence: { type: 'string', description: 'Cron expression for recurring tasks (e.g., "0 9 * * 1-5" for weekdays at 9am)' },
script: { type: 'string', description: 'Optional pre-agent script to run before processing' },
},
required: ['prompt', 'processAfter'],
},
},
async handler(args) {
const prompt = args.prompt as string;
const processAfter = args.processAfter as string;
if (!prompt || !processAfter) return err('prompt and processAfter are required');
const id = generateId();
const r = routing();
const recurrence = (args.recurrence as string) || null;
const script = (args.script as string) || null;
// Write as a system action — host will insert into inbound.db
writeMessageOut({
id,
kind: 'system',
platform_id: r.platform_id,
channel_type: r.channel_type,
thread_id: r.thread_id,
content: JSON.stringify({
action: 'schedule_task',
taskId: id,
prompt,
script,
processAfter,
recurrence,
}),
});
log(`schedule_task: ${id} at ${processAfter}${recurrence ? ` (recurring: ${recurrence})` : ''}`);
return ok(`Task scheduled (id: ${id}, runs at: ${processAfter}${recurrence ? `, recurrence: ${recurrence}` : ''})`);
},
};
export const listTasks: McpToolDefinition = {
tool: {
name: 'list_tasks',
description: 'List scheduled and pending tasks.',
inputSchema: {
type: 'object' as const,
properties: {
status: { type: 'string', description: 'Filter by status: pending, processing, completed, paused (default: all non-completed)' },
},
},
},
async handler(args) {
const status = args.status as string | undefined;
const db = getInboundDb();
let rows;
if (status) {
rows = db
.prepare("SELECT id, status, process_after, recurrence, content FROM messages_in WHERE kind = 'task' AND status = ? ORDER BY process_after ASC")
.all(status);
} else {
rows = db
.prepare("SELECT id, status, process_after, recurrence, content FROM messages_in WHERE kind = 'task' AND status NOT IN ('completed') ORDER BY process_after ASC")
.all();
}
if ((rows as unknown[]).length === 0) return ok('No tasks found.');
const lines = (rows as Array<{ id: string; status: string; process_after: string | null; recurrence: string | null; content: string }>).map((r) => {
const content = JSON.parse(r.content);
const prompt = (content.prompt as string || '').slice(0, 80);
return `- ${r.id} [${r.status}] at=${r.process_after || 'now'} ${r.recurrence ? `recur=${r.recurrence} ` : ''}${prompt}`;
});
return ok(lines.join('\n'));
},
};
export const cancelTask: McpToolDefinition = {
tool: {
name: 'cancel_task',
description: 'Cancel a scheduled task.',
inputSchema: {
type: 'object' as const,
properties: {
taskId: { type: 'string', description: 'Task ID to cancel' },
},
required: ['taskId'],
},
},
async handler(args) {
const taskId = args.taskId as string;
if (!taskId) return err('taskId is required');
// Write as a system action — host will update inbound.db
writeMessageOut({
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
kind: 'system',
content: JSON.stringify({ action: 'cancel_task', taskId }),
});
log(`cancel_task: ${taskId}`);
return ok(`Task cancellation requested: ${taskId}`);
},
};
export const pauseTask: McpToolDefinition = {
tool: {
name: 'pause_task',
description: 'Pause a scheduled task. It will not run until resumed.',
inputSchema: {
type: 'object' as const,
properties: {
taskId: { type: 'string', description: 'Task ID to pause' },
},
required: ['taskId'],
},
},
async handler(args) {
const taskId = args.taskId as string;
if (!taskId) return err('taskId is required');
writeMessageOut({
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
kind: 'system',
content: JSON.stringify({ action: 'pause_task', taskId }),
});
log(`pause_task: ${taskId}`);
return ok(`Task pause requested: ${taskId}`);
},
};
export const resumeTask: McpToolDefinition = {
tool: {
name: 'resume_task',
description: 'Resume a paused task.',
inputSchema: {
type: 'object' as const,
properties: {
taskId: { type: 'string', description: 'Task ID to resume' },
},
required: ['taskId'],
},
},
async handler(args) {
const taskId = args.taskId as string;
if (!taskId) return err('taskId is required');
writeMessageOut({
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
kind: 'system',
content: JSON.stringify({ action: 'resume_task', taskId }),
});
log(`resume_task: ${taskId}`);
return ok(`Task resume requested: ${taskId}`);
},
};
export const schedulingTools: McpToolDefinition[] = [scheduleTask, listTasks, cancelTask, pauseTask, resumeTask];