The schedule_task MCP tool wrote routing fields (platform_id, channel_type, thread_id) onto the outbound system message's row columns, but handleSystemAction (src/delivery.ts) parses content JSON and forwards only that to handlers. handleScheduleTask (src/modules/scheduling/actions.ts) reads content.platformId/channelType/threadId — which the writer never populated — so every kind='task' row landed in messages_in with all-null routing. When host-sweep wakes a scheduled task, dispatchResultText's fast path requires routing on the message and bails when it's null, falling through to the "Routing recovery" retry prompt. End-user delivery still works because the agent can pick a destination from its destinations table on retry — so the bug went undetected, silently costing one extra LLM turn per scheduled-task wake. Sessions whose destinations table has no channel row (e.g. agent-only destinations) fail outright with a recovery loop. Fix: add the routing fields to the content JSON so the writer matches the contract handleScheduleTask already expects. cancel/pause/resume/update_task operate by id alone and don't need routing.
303 lines
11 KiB
TypeScript
303 lines
11 KiB
TypeScript
/**
|
|
* Scheduling MCP tools: schedule_task, list_tasks, cancel_task, pause_task, resume_task.
|
|
*
|
|
* With the two-DB split, the container cannot write to inbound.db (host-owned).
|
|
* Scheduling operations are sent as system actions via messages_out — the host
|
|
* reads them during delivery and applies the changes to inbound.db.
|
|
*/
|
|
import { getInboundDb } from '../db/connection.js';
|
|
import { writeMessageOut } from '../db/messages-out.js';
|
|
import { getSessionRouting } from '../db/session-routing.js';
|
|
import { TIMEZONE, parseZonedToUtc } from '../timezone.js';
|
|
import { registerTools } from './server.js';
|
|
import type { McpToolDefinition } from './types.js';
|
|
|
|
function log(msg: string): void {
|
|
console.error(`[mcp-tools] ${msg}`);
|
|
}
|
|
|
|
function generateId(): string {
|
|
return `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
|
}
|
|
|
|
function routing() {
|
|
return getSessionRouting();
|
|
}
|
|
|
|
function ok(text: string) {
|
|
return { content: [{ type: 'text' as const, text }] };
|
|
}
|
|
|
|
function err(text: string) {
|
|
return { content: [{ type: 'text' as const, text: `Error: ${text}` }], isError: true };
|
|
}
|
|
|
|
export const scheduleTask: McpToolDefinition = {
|
|
tool: {
|
|
name: 'schedule_task',
|
|
description:
|
|
`Schedule a one-shot or recurring task. The user's timezone is declared in the <context timezone="..."/> header of your prompt — interpret the user's "9pm" etc. in that zone. Cron expressions are interpreted in the user's timezone too.`,
|
|
inputSchema: {
|
|
type: 'object' as const,
|
|
properties: {
|
|
prompt: { type: 'string', description: 'Task instructions/prompt' },
|
|
processAfter: {
|
|
type: 'string',
|
|
description:
|
|
`ISO 8601 timestamp for the first run. Accepts either UTC (ending in "Z" or "+00:00") or a naive local timestamp (no offset) which is interpreted in the user's timezone (e.g. "2026-01-15T21:00:00" = 9pm user-local). Prefer naive local.`,
|
|
},
|
|
recurrence: {
|
|
type: 'string',
|
|
description:
|
|
'Cron expression for recurring tasks (e.g., "0 9 * * 1-5" = weekdays at 9am user-local). Evaluated in the user\'s timezone.',
|
|
},
|
|
script: { type: 'string', description: 'Optional pre-agent script to run before processing' },
|
|
},
|
|
required: ['prompt', 'processAfter'],
|
|
},
|
|
},
|
|
async handler(args) {
|
|
const prompt = args.prompt as string;
|
|
const processAfterIn = args.processAfter as string;
|
|
if (!prompt || !processAfterIn) return err('prompt and processAfter are required');
|
|
|
|
let processAfter: string;
|
|
try {
|
|
const d = parseZonedToUtc(processAfterIn, TIMEZONE);
|
|
if (Number.isNaN(d.getTime())) return err(`invalid processAfter: ${processAfterIn}`);
|
|
processAfter = d.toISOString();
|
|
} catch {
|
|
return err(`invalid processAfter: ${processAfterIn}`);
|
|
}
|
|
|
|
const id = generateId();
|
|
const r = routing();
|
|
const recurrence = (args.recurrence as string) || null;
|
|
const script = (args.script as string) || null;
|
|
|
|
// Write as a system action — host will insert into inbound.db
|
|
writeMessageOut({
|
|
id,
|
|
kind: 'system',
|
|
platform_id: r.platform_id,
|
|
channel_type: r.channel_type,
|
|
thread_id: r.thread_id,
|
|
content: JSON.stringify({
|
|
action: 'schedule_task',
|
|
taskId: id,
|
|
prompt,
|
|
script,
|
|
processAfter,
|
|
recurrence,
|
|
platformId: r.platform_id,
|
|
channelType: r.channel_type,
|
|
threadId: r.thread_id,
|
|
}),
|
|
});
|
|
|
|
log(`schedule_task: ${id} at ${processAfter}${recurrence ? ` (recurring: ${recurrence})` : ''}`);
|
|
return ok(`Task scheduled (id: ${id}, runs at: ${processAfter}${recurrence ? `, recurrence: ${recurrence}` : ''})`);
|
|
},
|
|
};
|
|
|
|
export const listTasks: McpToolDefinition = {
|
|
tool: {
|
|
name: 'list_tasks',
|
|
description:
|
|
'List scheduled tasks. Returns one row per series — the live (pending or paused) occurrence. The id shown is the series id, which is what update_task / cancel_task / pause_task / resume_task expect.',
|
|
inputSchema: {
|
|
type: 'object' as const,
|
|
properties: {
|
|
status: { type: 'string', description: 'Filter by status: pending or paused (default: both)' },
|
|
},
|
|
},
|
|
},
|
|
async handler(args) {
|
|
const status = args.status as string | undefined;
|
|
const db = getInboundDb();
|
|
// One row per series — the live (pending or paused) occurrence. Recurring
|
|
// tasks accumulate one completed row per firing plus one live follow-up;
|
|
// exposing the whole pile to the agent is noisy and confuses task identity
|
|
// ("which id do I cancel?"). The series_id is the stable handle.
|
|
//
|
|
// SQLite quirk: when MAX(seq) appears in the SELECT list of a GROUP BY
|
|
// query, the bare columns take values from the row that contains that max
|
|
// — that's how we pick "the latest live row per series" in one pass.
|
|
let rows;
|
|
if (status) {
|
|
rows = db
|
|
.prepare(
|
|
`SELECT series_id AS id, status, process_after, recurrence, content, MAX(seq) AS _seq
|
|
FROM messages_in
|
|
WHERE kind = 'task' AND status = ?
|
|
GROUP BY series_id
|
|
ORDER BY process_after ASC`,
|
|
)
|
|
.all(status);
|
|
} else {
|
|
rows = db
|
|
.prepare(
|
|
`SELECT series_id AS id, status, process_after, recurrence, content, MAX(seq) AS _seq
|
|
FROM messages_in
|
|
WHERE kind = 'task' AND status IN ('pending', 'paused')
|
|
GROUP BY series_id
|
|
ORDER BY process_after ASC`,
|
|
)
|
|
.all();
|
|
}
|
|
|
|
if ((rows as unknown[]).length === 0) return ok('No tasks found.');
|
|
|
|
const lines = (rows as Array<{ id: string; status: string; process_after: string | null; recurrence: string | null; content: string }>).map((r) => {
|
|
const content = JSON.parse(r.content);
|
|
const prompt = (content.prompt as string || '').slice(0, 80);
|
|
return `- ${r.id} [${r.status}] at=${r.process_after || 'now'} ${r.recurrence ? `recur=${r.recurrence} ` : ''}→ ${prompt}`;
|
|
});
|
|
|
|
return ok(lines.join('\n'));
|
|
},
|
|
};
|
|
|
|
export const cancelTask: McpToolDefinition = {
|
|
tool: {
|
|
name: 'cancel_task',
|
|
description: 'Cancel a scheduled task.',
|
|
inputSchema: {
|
|
type: 'object' as const,
|
|
properties: {
|
|
taskId: { type: 'string', description: 'Task ID to cancel' },
|
|
},
|
|
required: ['taskId'],
|
|
},
|
|
},
|
|
async handler(args) {
|
|
const taskId = args.taskId as string;
|
|
if (!taskId) return err('taskId is required');
|
|
|
|
// Write as a system action — host will update inbound.db
|
|
writeMessageOut({
|
|
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
|
kind: 'system',
|
|
content: JSON.stringify({ action: 'cancel_task', taskId }),
|
|
});
|
|
|
|
log(`cancel_task: ${taskId}`);
|
|
return ok(`Task cancellation requested: ${taskId}`);
|
|
},
|
|
};
|
|
|
|
export const pauseTask: McpToolDefinition = {
|
|
tool: {
|
|
name: 'pause_task',
|
|
description: 'Pause a scheduled task. It will not run until resumed.',
|
|
inputSchema: {
|
|
type: 'object' as const,
|
|
properties: {
|
|
taskId: { type: 'string', description: 'Task ID to pause' },
|
|
},
|
|
required: ['taskId'],
|
|
},
|
|
},
|
|
async handler(args) {
|
|
const taskId = args.taskId as string;
|
|
if (!taskId) return err('taskId is required');
|
|
|
|
writeMessageOut({
|
|
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
|
kind: 'system',
|
|
content: JSON.stringify({ action: 'pause_task', taskId }),
|
|
});
|
|
|
|
log(`pause_task: ${taskId}`);
|
|
return ok(`Task pause requested: ${taskId}`);
|
|
},
|
|
};
|
|
|
|
export const resumeTask: McpToolDefinition = {
|
|
tool: {
|
|
name: 'resume_task',
|
|
description: 'Resume a paused task.',
|
|
inputSchema: {
|
|
type: 'object' as const,
|
|
properties: {
|
|
taskId: { type: 'string', description: 'Task ID to resume' },
|
|
},
|
|
required: ['taskId'],
|
|
},
|
|
},
|
|
async handler(args) {
|
|
const taskId = args.taskId as string;
|
|
if (!taskId) return err('taskId is required');
|
|
|
|
writeMessageOut({
|
|
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
|
kind: 'system',
|
|
content: JSON.stringify({ action: 'resume_task', taskId }),
|
|
});
|
|
|
|
log(`resume_task: ${taskId}`);
|
|
return ok(`Task resume requested: ${taskId}`);
|
|
},
|
|
};
|
|
|
|
export const updateTask: McpToolDefinition = {
|
|
tool: {
|
|
name: 'update_task',
|
|
description:
|
|
'Update a scheduled task. Pass the series id from list_tasks. Any field omitted is left unchanged. Use this instead of cancel + reschedule when adjusting an existing task.',
|
|
inputSchema: {
|
|
type: 'object' as const,
|
|
properties: {
|
|
taskId: { type: 'string', description: 'Series id of the task to update (as shown by list_tasks)' },
|
|
prompt: { type: 'string', description: 'New task prompt (optional)' },
|
|
recurrence: {
|
|
type: 'string',
|
|
description: 'New cron expression (optional). Pass empty string to clear and make the task one-shot.',
|
|
},
|
|
processAfter: {
|
|
type: 'string',
|
|
description:
|
|
`New ISO 8601 timestamp for the next run (optional). Accepts either UTC (ending in "Z" / "+00:00") or a naive local timestamp interpreted in the user's timezone.`,
|
|
},
|
|
script: {
|
|
type: 'string',
|
|
description: 'New pre-agent script (optional). Pass empty string to clear.',
|
|
},
|
|
},
|
|
required: ['taskId'],
|
|
},
|
|
},
|
|
async handler(args) {
|
|
const taskId = args.taskId as string;
|
|
if (!taskId) return err('taskId is required');
|
|
|
|
const update: Record<string, unknown> = { taskId };
|
|
if (typeof args.prompt === 'string') update.prompt = args.prompt;
|
|
if (typeof args.processAfter === 'string') {
|
|
try {
|
|
const d = parseZonedToUtc(args.processAfter, TIMEZONE);
|
|
if (Number.isNaN(d.getTime())) return err(`invalid processAfter: ${args.processAfter}`);
|
|
update.processAfter = d.toISOString();
|
|
} catch {
|
|
return err(`invalid processAfter: ${args.processAfter}`);
|
|
}
|
|
}
|
|
// Empty string clears recurrence/script; undefined leaves them as-is.
|
|
if (typeof args.recurrence === 'string') update.recurrence = args.recurrence === '' ? null : args.recurrence;
|
|
if (typeof args.script === 'string') update.script = args.script === '' ? null : args.script;
|
|
|
|
if (Object.keys(update).length === 1) return err('at least one field to update is required');
|
|
|
|
writeMessageOut({
|
|
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
|
kind: 'system',
|
|
content: JSON.stringify({ action: 'update_task', ...update }),
|
|
});
|
|
|
|
log(`update_task: ${taskId}`);
|
|
return ok(`Task update requested: ${taskId}`);
|
|
},
|
|
};
|
|
|
|
registerTools([scheduleTask, listTasks, updateTask, cancelTask, pauseTask, resumeTask]);
|