From 473f7665855469ed8d2c461108e0923495080a32 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Sat, 18 Apr 2026 16:17:47 +0300 Subject: [PATCH] refactor(modules): extract scheduling as registry-based module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves the scheduling surface — 5 delivery actions (schedule_task, cancel_task, pause_task, resume_task, update_task), handleRecurrence, applyPreTaskScripts, and task DB helpers — out of core and into src/modules/scheduling/ (host) and container/agent-runner/src/scheduling/ (container). First PR to fill the MODULE-HOOK markers introduced in PR #2: - src/host-sweep.ts MODULE-HOOK:scheduling-recurrence now dynamically imports handleRecurrence from the module each sweep tick. - container/agent-runner/src/poll-loop.ts MODULE-HOOK:scheduling-pre-task dynamically imports applyPreTaskScripts before the provider call. When the marker block is empty (scheduling uninstalled), `keep` falls back to `normalMessages` so non-task messages still flow. The 5 task cases are removed from delivery.ts's handleSystemAction switch — the registry now routes them. Task DB helpers moved out of src/db/session-db.ts (which kept `nextEvenSeq` as a named export so the module can uphold the host-writes-even-seq invariant). Test suite split to match: scheduling-specific tests live in the module. No migration — tasks are messages_in rows with kind='task'. Co-Authored-By: Claude Opus 4.7 (1M context) --- container/agent-runner/src/poll-loop.ts | 15 +- .../src/{ => scheduling}/task-script.ts | 4 +- src/db/session-db.test.ts | 277 +---------------- src/db/session-db.ts | 152 +--------- src/delivery.ts | 65 ---- src/host-sweep.ts | 29 +- src/modules/index.ts | 1 + src/modules/scheduling/actions.ts | 113 +++++++ src/modules/scheduling/db.test.ts | 282 ++++++++++++++++++ src/modules/scheduling/db.ts | 153 ++++++++++ src/modules/scheduling/index.ts | 34 +++ src/modules/scheduling/recurrence.ts | 49 +++ 12 files changed, 657 insertions(+), 517 deletions(-) rename container/agent-runner/src/{ => scheduling}/task-script.ts (96%) create mode 100644 src/modules/scheduling/actions.ts create mode 100644 src/modules/scheduling/db.test.ts create mode 100644 src/modules/scheduling/db.ts create mode 100644 src/modules/scheduling/index.ts create mode 100644 src/modules/scheduling/recurrence.ts diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index eb06e89..e5ad1a5 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -4,7 +4,6 @@ import { writeMessageOut } from './db/messages-out.js'; import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; import { getStoredSessionId, setStoredSessionId, clearStoredSessionId } from './db/session-state.js'; import { formatMessages, extractRouting, categorizeMessage, type RoutingContext } from './formatter.js'; -import { applyPreTaskScripts } from './task-script.js'; import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js'; const POLL_INTERVAL_MS = 1000; @@ -156,13 +155,15 @@ export async function runPollLoop(config: PollLoopConfig): Promise { // Pre-task scripts: for any task rows with a `script`, run it before the // provider call. Scripts returning wakeAgent=false (or erroring) gate // their own task row only — surviving messages still go to the agent. - // + // Without the scheduling module, the marker block is empty, `keep` + // falls back to `normalMessages`, and no gating happens. + let keep: MessageInRow[] = normalMessages; + let skipped: string[] = []; // MODULE-HOOK:scheduling-pre-task:start - // When scheduling is extracted (PR #4), `applyPreTaskScripts` moves - // to the scheduling module and the `/add-scheduling` skill replaces - // this block with a call to the module. Without scheduling installed, - // the block is empty (no script gating) and `keep = normalMessages`. - const { keep, skipped } = await applyPreTaskScripts(normalMessages); + const { applyPreTaskScripts } = await import('./scheduling/task-script.js'); + const preTask = await applyPreTaskScripts(normalMessages); + keep = preTask.keep; + skipped = preTask.skipped; if (skipped.length > 0) { markCompleted(skipped); log(`Pre-task script skipped ${skipped.length} task(s): ${skipped.join(', ')}`); diff --git a/container/agent-runner/src/task-script.ts b/container/agent-runner/src/scheduling/task-script.ts similarity index 96% rename from container/agent-runner/src/task-script.ts rename to container/agent-runner/src/scheduling/task-script.ts index 5e4b9ef..112d175 100644 --- a/container/agent-runner/src/task-script.ts +++ b/container/agent-runner/src/scheduling/task-script.ts @@ -1,8 +1,8 @@ import { execFile } from 'node:child_process'; import fs from 'node:fs'; import path from 'node:path'; -import type { MessageInRow } from './db/messages-in.js'; -import { touchHeartbeat } from './db/connection.js'; +import type { MessageInRow } from '../db/messages-in.js'; +import { touchHeartbeat } from '../db/connection.js'; const SCRIPT_TIMEOUT_MS = 30_000; const SCRIPT_MAX_BUFFER = 1024 * 1024; diff --git a/src/db/session-db.test.ts b/src/db/session-db.test.ts index 3b18bc8..5307900 100644 --- a/src/db/session-db.test.ts +++ b/src/db/session-db.test.ts @@ -1,289 +1,24 @@ /** - * Tests for per-session messages_in operations — focused on the series_id - * invariant that lets cancel/pause/resume reach the live next occurrence of - * a recurring task, even after the row the agent remembers has completed - * and been replaced by a follow-up. + * Tests for core per-session messages_in schema maintenance. + * + * Task-specific DB tests (insertTask, cancel/pause/resume, updateTask, + * insertRecurrence) live in `src/modules/scheduling/db.test.ts` with the + * rest of the scheduling module. */ import Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; import { describe, it, expect, afterEach } from 'vitest'; -import { - ensureSchema, - openInboundDb, - insertTask, - insertRecurrence, - cancelTask, - pauseTask, - resumeTask, - updateTask, - getCompletedRecurring, - migrateMessagesInTable, - type RecurringMessage, -} from './session-db.js'; +import { migrateMessagesInTable } from './session-db.js'; const TEST_DIR = '/tmp/nanoclaw-session-db-test'; const DB_PATH = path.join(TEST_DIR, 'inbound.db'); -function freshDb() { - if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); - fs.mkdirSync(TEST_DIR, { recursive: true }); - ensureSchema(DB_PATH, 'inbound'); - return openInboundDb(DB_PATH); -} - -function insertBasicTask(db: ReturnType, id: string, recurrence: string | null) { - insertTask(db, { - id, - processAfter: new Date().toISOString(), - recurrence, - platformId: null, - channelType: null, - threadId: null, - content: JSON.stringify({ prompt: 'noop' }), - }); -} - afterEach(() => { if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); }); -describe('insertTask', () => { - it('stamps series_id = id on insert', () => { - const db = freshDb(); - insertBasicTask(db, 'task-1', null); - const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-1') as { series_id: string }; - expect(row.series_id).toBe('task-1'); - db.close(); - }); -}); - -describe('cancelTask / pauseTask / resumeTask series matching', () => { - // Simulates the recurrence chain that used to survive cancellation: - // the original task completes → handleRecurrence spawns a follow-up - // row → agent calls cancel_task(originalId) → historically only hit - // the completed row, leaving the live one running. - function seedRecurringChain(db: ReturnType) { - insertBasicTask(db, 'task-orig', '0 9 * * *'); - // Mark the original as completed (as syncProcessingAcks would do). - db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); - - const msg: RecurringMessage = { - id: 'task-orig', - kind: 'task', - content: JSON.stringify({ prompt: 'noop' }), - recurrence: '0 9 * * *', - process_after: null, - platform_id: null, - channel_type: null, - thread_id: null, - series_id: 'task-orig', - }; - insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString()); - } - - it('cancel by original id reaches the live follow-up via series_id', () => { - const db = freshDb(); - seedRecurringChain(db); - - cancelTask(db, 'task-orig'); - - const live = db.prepare("SELECT id, status, recurrence FROM messages_in WHERE status = 'pending'").all(); - expect(live).toHaveLength(0); - - const followUp = db.prepare("SELECT status, recurrence FROM messages_in WHERE id = 'task-next'").get() as { - status: string; - recurrence: string | null; - }; - expect(followUp.status).toBe('completed'); - // Recurrence cleared so the sweep doesn't spawn another clone. - expect(followUp.recurrence).toBeNull(); - db.close(); - }); - - it('cancelled task is not picked up by getCompletedRecurring', () => { - const db = freshDb(); - insertBasicTask(db, 'task-1', '0 9 * * *'); - cancelTask(db, 'task-1'); - - const recurring = getCompletedRecurring(db); - expect(recurring).toHaveLength(0); - db.close(); - }); - - it('pause by original id pauses the live follow-up', () => { - const db = freshDb(); - seedRecurringChain(db); - - pauseTask(db, 'task-orig'); - - const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string }; - expect(followUp.status).toBe('paused'); - db.close(); - }); - - it('resume by original id resumes the live follow-up', () => { - const db = freshDb(); - seedRecurringChain(db); - - db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = 'task-next'").run(); - resumeTask(db, 'task-orig'); - - const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string }; - expect(followUp.status).toBe('pending'); - db.close(); - }); -}); - -describe('updateTask', () => { - it('merges supplied fields into content JSON without clobbering others', () => { - const db = freshDb(); - insertTask(db, { - id: 'task-1', - processAfter: new Date().toISOString(), - recurrence: null, - platformId: null, - channelType: null, - threadId: null, - content: JSON.stringify({ prompt: 'old', script: 'echo old', extra: 'keep me' }), - }); - - const touched = updateTask(db, 'task-1', { prompt: 'new' }); - expect(touched).toBe(1); - - const row = db.prepare('SELECT content FROM messages_in WHERE id = ?').get('task-1') as { content: string }; - const parsed = JSON.parse(row.content); - expect(parsed.prompt).toBe('new'); - expect(parsed.script).toBe('echo old'); - expect(parsed.extra).toBe('keep me'); - }); - - it('updates recurrence and process_after when supplied', () => { - const db = freshDb(); - insertTask(db, { - id: 'task-1', - processAfter: '2026-01-01T00:00:00Z', - recurrence: '0 9 * * *', - platformId: null, - channelType: null, - threadId: null, - content: JSON.stringify({ prompt: 'p' }), - }); - - updateTask(db, 'task-1', { recurrence: '0 18 * * *', processAfter: '2026-02-01T00:00:00Z' }); - - const row = db.prepare('SELECT recurrence, process_after FROM messages_in WHERE id = ?').get('task-1') as { - recurrence: string; - process_after: string; - }; - expect(row.recurrence).toBe('0 18 * * *'); - expect(row.process_after).toBe('2026-02-01T00:00:00Z'); - }); - - it('clears recurrence when null is passed', () => { - const db = freshDb(); - insertTask(db, { - id: 'task-1', - processAfter: '2026-01-01T00:00:00Z', - recurrence: '0 9 * * *', - platformId: null, - channelType: null, - threadId: null, - content: JSON.stringify({ prompt: 'p' }), - }); - - updateTask(db, 'task-1', { recurrence: null }); - - const row = db.prepare('SELECT recurrence FROM messages_in WHERE id = ?').get('task-1') as { - recurrence: string | null; - }; - expect(row.recurrence).toBeNull(); - }); - - it('reaches the live follow-up via series_id when called with the original id', () => { - const db = freshDb(); - insertTask(db, { - id: 'task-orig', - processAfter: new Date().toISOString(), - recurrence: '0 9 * * *', - platformId: null, - channelType: null, - threadId: null, - content: JSON.stringify({ prompt: 'old' }), - }); - db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); - - const msg: RecurringMessage = { - id: 'task-orig', - kind: 'task', - content: JSON.stringify({ prompt: 'old' }), - recurrence: '0 9 * * *', - process_after: null, - platform_id: null, - channel_type: null, - thread_id: null, - series_id: 'task-orig', - }; - insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString()); - - const touched = updateTask(db, 'task-orig', { prompt: 'new' }); - // Only the live follow-up should be touched — completed rows are excluded. - expect(touched).toBe(1); - - const live = db.prepare("SELECT content FROM messages_in WHERE id = 'task-next'").get() as { content: string }; - expect(JSON.parse(live.content).prompt).toBe('new'); - - // Original (completed) row left alone. - const orig = db.prepare("SELECT content FROM messages_in WHERE id = 'task-orig'").get() as { content: string }; - expect(JSON.parse(orig.content).prompt).toBe('old'); - }); - - it('returns 0 when no live task matches', () => { - const db = freshDb(); - insertTask(db, { - id: 'task-1', - processAfter: new Date().toISOString(), - recurrence: null, - platformId: null, - channelType: null, - threadId: null, - content: JSON.stringify({ prompt: 'p' }), - }); - db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-1'").run(); - - const touched = updateTask(db, 'task-1', { prompt: 'new' }); - expect(touched).toBe(0); - }); -}); - -describe('insertRecurrence', () => { - it('copies series_id forward', () => { - const db = freshDb(); - insertBasicTask(db, 'task-orig', '0 9 * * *'); - db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); - - const msg: RecurringMessage = { - id: 'task-orig', - kind: 'task', - content: '{}', - recurrence: '0 9 * * *', - process_after: null, - platform_id: null, - channel_type: null, - thread_id: null, - series_id: 'task-orig', - }; - insertRecurrence(db, msg, 'task-next', new Date().toISOString()); - - const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-next') as { - series_id: string; - }; - expect(row.series_id).toBe('task-orig'); - db.close(); - }); -}); - describe('migrateMessagesInTable', () => { it('backfills series_id = id on legacy rows and is idempotent', () => { if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); diff --git a/src/db/session-db.ts b/src/db/session-db.ts index aafb39e..05104cf 100644 --- a/src/db/session-db.ts +++ b/src/db/session-db.ts @@ -71,8 +71,14 @@ export function replaceDestinations(db: Database.Database, entries: DestinationR // messages_in // --------------------------------------------------------------------------- -/** Next even seq number for host-owned inbound.db. */ -function nextEvenSeq(db: Database.Database): number { +/** + * Next even seq number for host-owned inbound.db. + * + * Exported so the scheduling module's task helpers can maintain the + * host-writes-even-seq invariant without duplicating the logic. Not part of + * the general public API — imported by `src/modules/scheduling/db.ts` only. + */ +export function nextEvenSeq(db: Database.Database): number { const maxSeq = (db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; return maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); } @@ -100,103 +106,6 @@ export function insertMessage( }); } -export function insertTask( - db: Database.Database, - task: { - id: string; - processAfter: string; - recurrence: string | null; - platformId: string | null; - channelType: string | null; - threadId: string | null; - content: string; - }, -): void { - db.prepare( - `INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content, series_id) - VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content, @id)`, - ).run({ - ...task, - seq: nextEvenSeq(db), - }); -} - -// cancel/pause/resume match any live row in the series, not just the exact id. -// Recurring tasks get a new row per occurrence (see handleRecurrence), all -// sharing series_id. Matching by id alone would only hit the completed row -// the agent remembers, missing the live next occurrence. -export function cancelTask(db: Database.Database, taskId: string): void { - db.prepare( - "UPDATE messages_in SET status = 'completed', recurrence = NULL WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')", - ).run(taskId, taskId); -} - -export function pauseTask(db: Database.Database, taskId: string): void { - db.prepare( - "UPDATE messages_in SET status = 'paused' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'pending'", - ).run(taskId, taskId); -} - -export function resumeTask(db: Database.Database, taskId: string): void { - db.prepare( - "UPDATE messages_in SET status = 'pending' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'paused'", - ).run(taskId, taskId); -} - -export interface TaskUpdate { - prompt?: string; - script?: string | null; - recurrence?: string | null; - processAfter?: string; -} - -// Merges content JSON in-place so callers can update prompt/script without -// clobbering other fields. Matches by id OR series_id so the live next -// occurrence of a recurring task is updated, not just the completed row the -// agent last saw. Returns the number of rows touched. -export function updateTask(db: Database.Database, taskId: string, update: TaskUpdate): number { - const rows = db - .prepare( - "SELECT id, content FROM messages_in WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')", - ) - .all(taskId, taskId) as Array<{ id: string; content: string }>; - - if (rows.length === 0) return 0; - - const setProcessAfter = update.processAfter !== undefined; - const setRecurrence = update.recurrence !== undefined; - const mergeContent = update.prompt !== undefined || update.script !== undefined; - - const tx = db.transaction(() => { - for (const row of rows) { - let content = row.content; - if (mergeContent) { - const parsed = JSON.parse(row.content) as Record; - if (update.prompt !== undefined) parsed.prompt = update.prompt; - if (update.script !== undefined) parsed.script = update.script; - content = JSON.stringify(parsed); - } - - // Build SET clause dynamically so callers can update fields independently. - const sets: string[] = ['content = ?']; - const params: unknown[] = [content]; - if (setProcessAfter) { - sets.push('process_after = ?'); - params.push(update.processAfter); - } - if (setRecurrence) { - sets.push('recurrence = ?'); - params.push(update.recurrence); - } - params.push(row.id); - - db.prepare(`UPDATE messages_in SET ${sets.join(', ')} WHERE id = ?`).run(...params); - } - }); - tx(); - return rows.length; -} - export function countDueMessages(db: Database.Database): number { return ( db @@ -229,51 +138,6 @@ export function getMessageForRetry( | undefined; } -export interface RecurringMessage { - id: string; - kind: string; - content: string; - recurrence: string; - process_after: string | null; - platform_id: string | null; - channel_type: string | null; - thread_id: string | null; - series_id: string; -} - -export function getCompletedRecurring(db: Database.Database): RecurringMessage[] { - return db - .prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL") - .all() as RecurringMessage[]; -} - -export function insertRecurrence( - db: Database.Database, - msg: RecurringMessage, - newId: string, - nextRun: string | null, -): void { - db.prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content, series_id) - VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?, ?)`, - ).run( - newId, - nextEvenSeq(db), - msg.kind, - nextRun, - msg.recurrence, - msg.platform_id, - msg.channel_type, - msg.thread_id, - msg.content, - msg.series_id, - ); -} - -export function clearRecurrence(db: Database.Database, messageId: string): void { - db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(messageId); -} - export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void { const completed = outDb .prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')") diff --git a/src/delivery.ts b/src/delivery.ts index c7b244d..6ea3d04 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -23,11 +23,6 @@ import { markDelivered, markDeliveryFailed, migrateDeliveredTable, - insertTask, - cancelTask, - pauseTask, - resumeTask, - updateTask, } from './db/session-db.js'; import { log } from './log.js'; import { normalizeOptions } from './channels/ask-question.js'; @@ -501,66 +496,6 @@ async function handleSystemAction( } switch (action) { - case 'schedule_task': { - const taskId = content.taskId as string; - const prompt = content.prompt as string; - const script = content.script as string | null; - const processAfter = content.processAfter as string; - const recurrence = (content.recurrence as string) || null; - - insertTask(inDb, { - id: taskId, - processAfter, - recurrence, - platformId: (content.platformId as string) ?? null, - channelType: (content.channelType as string) ?? null, - threadId: (content.threadId as string) ?? null, - content: JSON.stringify({ prompt, script }), - }); - log.info('Scheduled task created', { taskId, processAfter, recurrence }); - break; - } - - case 'cancel_task': { - const taskId = content.taskId as string; - cancelTask(inDb, taskId); - log.info('Task cancelled', { taskId }); - break; - } - - case 'pause_task': { - const taskId = content.taskId as string; - pauseTask(inDb, taskId); - log.info('Task paused', { taskId }); - break; - } - - case 'resume_task': { - const taskId = content.taskId as string; - resumeTask(inDb, taskId); - log.info('Task resumed', { taskId }); - break; - } - - case 'update_task': { - const taskId = content.taskId as string; - const update: Parameters[2] = {}; - if (typeof content.prompt === 'string') update.prompt = content.prompt; - if (typeof content.processAfter === 'string') update.processAfter = content.processAfter; - if (content.recurrence === null || typeof content.recurrence === 'string') { - update.recurrence = content.recurrence as string | null; - } - if (content.script === null || typeof content.script === 'string') { - update.script = content.script as string | null; - } - const touched = updateTask(inDb, taskId, update); - log.info('Task updated', { taskId, touched, fields: Object.keys(update) }); - if (touched === 0) { - notifyAgent(session, `update_task: no live task matched id "${taskId}".`); - } - break; - } - case 'create_agent': { const requestId = content.requestId as string; const name = content.name as string; diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 3958c8c..9bbd4bc 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -19,9 +19,6 @@ import { getMessageForRetry, markMessageFailed, retryWithBackoff, - getCompletedRecurring, - insertRecurrence, - clearRecurrence, } from './db/session-db.js'; import { log } from './log.js'; import { openInboundDb, openOutboundDb, inboundDbPath, outboundDbPath, heartbeatPath } from './session-manager.js'; @@ -102,10 +99,7 @@ async function sweepSession(session: Session): Promise { // 4. Handle recurrence for completed messages. // MODULE-HOOK:scheduling-recurrence:start - // When scheduling is extracted (PR #4), `handleRecurrence` moves to - // `src/modules/scheduling/` and the `/add-scheduling` skill replaces - // this block with a call to the module. Without scheduling - // installed, the block is empty and recurrence is a no-op. + const { handleRecurrence } = await import('./modules/scheduling/recurrence.js'); await handleRecurrence(inDb, session); // MODULE-HOOK:scheduling-recurrence:end } finally { @@ -156,24 +150,3 @@ function detectStaleContainers( } } -/** Insert next occurrence for completed recurring messages. */ -async function handleRecurrence(inDb: Database.Database, session: Session): Promise { - const recurring = getCompletedRecurring(inDb); - - for (const msg of recurring) { - try { - const { CronExpressionParser } = await import('cron-parser'); - const interval = CronExpressionParser.parse(msg.recurrence); - const nextRun = interval.next().toISOString(); - const prefix = msg.kind === 'task' ? 'task' : 'msg'; - const newId = `${prefix}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; - - insertRecurrence(inDb, msg, newId, nextRun); - clearRecurrence(inDb, msg.id); - - log.info('Inserted next recurrence', { originalId: msg.id, newId, seriesId: msg.series_id, nextRun }); - } catch (err) { - log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err }); - } - } -} diff --git a/src/modules/index.ts b/src/modules/index.ts index 4d1cc9f..ed6b832 100644 --- a/src/modules/index.ts +++ b/src/modules/index.ts @@ -15,4 +15,5 @@ */ import './interactive/index.js'; import './approvals/index.js'; +import './scheduling/index.js'; diff --git a/src/modules/scheduling/actions.ts b/src/modules/scheduling/actions.ts new file mode 100644 index 0000000..3b0fd8f --- /dev/null +++ b/src/modules/scheduling/actions.ts @@ -0,0 +1,113 @@ +/** + * Delivery action handlers for scheduling. + * + * The container can't write to inbound.db (host-owned). When the agent calls + * schedule_task / cancel_task / etc. via MCP, the container writes a + * `kind='system'` outbound message with an `action` field. The delivery path + * reaches into this module via the delivery-action registry and we apply the + * change to inbound.db here. + */ +import type Database from 'better-sqlite3'; + +import { wakeContainer } from '../../container-runner.js'; +import { getSession } from '../../db/sessions.js'; +import { log } from '../../log.js'; +import { writeSessionMessage } from '../../session-manager.js'; +import type { Session } from '../../types.js'; +import { cancelTask, insertTask, pauseTask, resumeTask, updateTask, type TaskUpdate } from './db.js'; + +export async function handleScheduleTask( + content: Record, + _session: Session, + inDb: Database.Database, +): Promise { + const taskId = content.taskId as string; + const prompt = content.prompt as string; + const script = content.script as string | null; + const processAfter = content.processAfter as string; + const recurrence = (content.recurrence as string) || null; + + insertTask(inDb, { + id: taskId, + processAfter, + recurrence, + platformId: (content.platformId as string) ?? null, + channelType: (content.channelType as string) ?? null, + threadId: (content.threadId as string) ?? null, + content: JSON.stringify({ prompt, script }), + }); + log.info('Scheduled task created', { taskId, processAfter, recurrence }); +} + +export async function handleCancelTask( + content: Record, + _session: Session, + inDb: Database.Database, +): Promise { + const taskId = content.taskId as string; + cancelTask(inDb, taskId); + log.info('Task cancelled', { taskId }); +} + +export async function handlePauseTask( + content: Record, + _session: Session, + inDb: Database.Database, +): Promise { + const taskId = content.taskId as string; + pauseTask(inDb, taskId); + log.info('Task paused', { taskId }); +} + +export async function handleResumeTask( + content: Record, + _session: Session, + inDb: Database.Database, +): Promise { + const taskId = content.taskId as string; + resumeTask(inDb, taskId); + log.info('Task resumed', { taskId }); +} + +export async function handleUpdateTask( + content: Record, + session: Session, + inDb: Database.Database, +): Promise { + const taskId = content.taskId as string; + const update: TaskUpdate = {}; + if (typeof content.prompt === 'string') update.prompt = content.prompt; + if (typeof content.processAfter === 'string') update.processAfter = content.processAfter; + if (content.recurrence === null || typeof content.recurrence === 'string') { + update.recurrence = content.recurrence as string | null; + } + if (content.script === null || typeof content.script === 'string') { + update.script = content.script as string | null; + } + const touched = updateTask(inDb, taskId, update); + log.info('Task updated', { taskId, touched, fields: Object.keys(update) }); + if (touched === 0) { + // Notify the agent that update_task matched nothing. Replicates the + // old notifyAgent helper that used to live in delivery.ts — inlined + // here so scheduling doesn't depend on delivery's private helpers. + writeSessionMessage(session.agent_group_id, session.id, { + id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'chat', + timestamp: new Date().toISOString(), + platformId: session.agent_group_id, + channelType: 'agent', + threadId: null, + content: JSON.stringify({ + text: `update_task: no live task matched id "${taskId}".`, + sender: 'system', + senderId: 'system', + }), + }); + const fresh = getSession(session.id); + if (fresh) { + wakeContainer(fresh).catch((err) => + log.error('Failed to wake container after update_task notification', { err }), + ); + } + } +} diff --git a/src/modules/scheduling/db.test.ts b/src/modules/scheduling/db.test.ts new file mode 100644 index 0000000..61f7c3f --- /dev/null +++ b/src/modules/scheduling/db.test.ts @@ -0,0 +1,282 @@ +/** + * Tests for the scheduling module's task DB helpers — focused on the + * series_id invariant that lets cancel/pause/resume/update reach the live + * next occurrence of a recurring task, even after the row the agent + * remembers has completed and been replaced by a follow-up. + */ +import fs from 'fs'; +import path from 'path'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { ensureSchema, openInboundDb } from '../../db/session-db.js'; +import { + insertTask, + insertRecurrence, + cancelTask, + pauseTask, + resumeTask, + updateTask, + getCompletedRecurring, + type RecurringMessage, +} from './db.js'; + +const TEST_DIR = '/tmp/nanoclaw-scheduling-db-test'; +const DB_PATH = path.join(TEST_DIR, 'inbound.db'); + +function freshDb() { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + ensureSchema(DB_PATH, 'inbound'); + return openInboundDb(DB_PATH); +} + +function insertBasicTask(db: ReturnType, id: string, recurrence: string | null) { + insertTask(db, { + id, + processAfter: new Date().toISOString(), + recurrence, + platformId: null, + channelType: null, + threadId: null, + content: JSON.stringify({ prompt: 'noop' }), + }); +} + +afterEach(() => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); +}); + +describe('insertTask', () => { + it('stamps series_id = id on insert', () => { + const db = freshDb(); + insertBasicTask(db, 'task-1', null); + const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-1') as { series_id: string }; + expect(row.series_id).toBe('task-1'); + db.close(); + }); +}); + +describe('cancelTask / pauseTask / resumeTask series matching', () => { + // Simulates the recurrence chain that used to survive cancellation: + // the original task completes → handleRecurrence spawns a follow-up + // row → agent calls cancel_task(originalId) → historically only hit + // the completed row, leaving the live one running. + function seedRecurringChain(db: ReturnType) { + insertBasicTask(db, 'task-orig', '0 9 * * *'); + // Mark the original as completed (as syncProcessingAcks would do). + db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); + + const msg: RecurringMessage = { + id: 'task-orig', + kind: 'task', + content: JSON.stringify({ prompt: 'noop' }), + recurrence: '0 9 * * *', + process_after: null, + platform_id: null, + channel_type: null, + thread_id: null, + series_id: 'task-orig', + }; + insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString()); + } + + it('cancel by original id reaches the live follow-up via series_id', () => { + const db = freshDb(); + seedRecurringChain(db); + + cancelTask(db, 'task-orig'); + + const live = db.prepare("SELECT id, status, recurrence FROM messages_in WHERE status = 'pending'").all(); + expect(live).toHaveLength(0); + + const followUp = db.prepare("SELECT status, recurrence FROM messages_in WHERE id = 'task-next'").get() as { + status: string; + recurrence: string | null; + }; + expect(followUp.status).toBe('completed'); + // Recurrence cleared so the sweep doesn't spawn another clone. + expect(followUp.recurrence).toBeNull(); + db.close(); + }); + + it('cancelled task is not picked up by getCompletedRecurring', () => { + const db = freshDb(); + insertBasicTask(db, 'task-1', '0 9 * * *'); + cancelTask(db, 'task-1'); + + const recurring = getCompletedRecurring(db); + expect(recurring).toHaveLength(0); + db.close(); + }); + + it('pause by original id pauses the live follow-up', () => { + const db = freshDb(); + seedRecurringChain(db); + + pauseTask(db, 'task-orig'); + + const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string }; + expect(followUp.status).toBe('paused'); + db.close(); + }); + + it('resume by original id resumes the live follow-up', () => { + const db = freshDb(); + seedRecurringChain(db); + + db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = 'task-next'").run(); + resumeTask(db, 'task-orig'); + + const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string }; + expect(followUp.status).toBe('pending'); + db.close(); + }); +}); + +describe('updateTask', () => { + it('merges supplied fields into content JSON without clobbering others', () => { + const db = freshDb(); + insertTask(db, { + id: 'task-1', + processAfter: new Date().toISOString(), + recurrence: null, + platformId: null, + channelType: null, + threadId: null, + content: JSON.stringify({ prompt: 'old', script: 'echo old', extra: 'keep me' }), + }); + + const touched = updateTask(db, 'task-1', { prompt: 'new' }); + expect(touched).toBe(1); + + const row = db.prepare('SELECT content FROM messages_in WHERE id = ?').get('task-1') as { content: string }; + const parsed = JSON.parse(row.content); + expect(parsed.prompt).toBe('new'); + expect(parsed.script).toBe('echo old'); + expect(parsed.extra).toBe('keep me'); + }); + + it('updates recurrence and process_after when supplied', () => { + const db = freshDb(); + insertTask(db, { + id: 'task-1', + processAfter: '2026-01-01T00:00:00Z', + recurrence: '0 9 * * *', + platformId: null, + channelType: null, + threadId: null, + content: JSON.stringify({ prompt: 'p' }), + }); + + updateTask(db, 'task-1', { recurrence: '0 18 * * *', processAfter: '2026-02-01T00:00:00Z' }); + + const row = db.prepare('SELECT recurrence, process_after FROM messages_in WHERE id = ?').get('task-1') as { + recurrence: string; + process_after: string; + }; + expect(row.recurrence).toBe('0 18 * * *'); + expect(row.process_after).toBe('2026-02-01T00:00:00Z'); + }); + + it('clears recurrence when null is passed', () => { + const db = freshDb(); + insertTask(db, { + id: 'task-1', + processAfter: '2026-01-01T00:00:00Z', + recurrence: '0 9 * * *', + platformId: null, + channelType: null, + threadId: null, + content: JSON.stringify({ prompt: 'p' }), + }); + + updateTask(db, 'task-1', { recurrence: null }); + + const row = db.prepare('SELECT recurrence FROM messages_in WHERE id = ?').get('task-1') as { + recurrence: string | null; + }; + expect(row.recurrence).toBeNull(); + }); + + it('reaches the live follow-up via series_id when called with the original id', () => { + const db = freshDb(); + insertTask(db, { + id: 'task-orig', + processAfter: new Date().toISOString(), + recurrence: '0 9 * * *', + platformId: null, + channelType: null, + threadId: null, + content: JSON.stringify({ prompt: 'old' }), + }); + db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); + + const msg: RecurringMessage = { + id: 'task-orig', + kind: 'task', + content: JSON.stringify({ prompt: 'old' }), + recurrence: '0 9 * * *', + process_after: null, + platform_id: null, + channel_type: null, + thread_id: null, + series_id: 'task-orig', + }; + insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString()); + + const touched = updateTask(db, 'task-orig', { prompt: 'new' }); + // Only the live follow-up should be touched — completed rows are excluded. + expect(touched).toBe(1); + + const live = db.prepare("SELECT content FROM messages_in WHERE id = 'task-next'").get() as { content: string }; + expect(JSON.parse(live.content).prompt).toBe('new'); + + // Original (completed) row left alone. + const orig = db.prepare("SELECT content FROM messages_in WHERE id = 'task-orig'").get() as { content: string }; + expect(JSON.parse(orig.content).prompt).toBe('old'); + }); + + it('returns 0 when no live task matches', () => { + const db = freshDb(); + insertTask(db, { + id: 'task-1', + processAfter: new Date().toISOString(), + recurrence: null, + platformId: null, + channelType: null, + threadId: null, + content: JSON.stringify({ prompt: 'p' }), + }); + db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-1'").run(); + + const touched = updateTask(db, 'task-1', { prompt: 'new' }); + expect(touched).toBe(0); + }); +}); + +describe('insertRecurrence', () => { + it('copies series_id forward', () => { + const db = freshDb(); + insertBasicTask(db, 'task-orig', '0 9 * * *'); + db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); + + const msg: RecurringMessage = { + id: 'task-orig', + kind: 'task', + content: '{}', + recurrence: '0 9 * * *', + process_after: null, + platform_id: null, + channel_type: null, + thread_id: null, + series_id: 'task-orig', + }; + insertRecurrence(db, msg, 'task-next', new Date().toISOString()); + + const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-next') as { + series_id: string; + }; + expect(row.series_id).toBe('task-orig'); + db.close(); + }); +}); diff --git a/src/modules/scheduling/db.ts b/src/modules/scheduling/db.ts new file mode 100644 index 0000000..87b1410 --- /dev/null +++ b/src/modules/scheduling/db.ts @@ -0,0 +1,153 @@ +/** + * Task DB helpers used by the scheduling module. + * + * Tasks are `messages_in` rows with `kind='task'`. This module doesn't own + * its own table — it piggybacks on the core schema. That's why there's no + * `module-scheduling-*.ts` migration file. + * + * cancel/pause/resume match any live row in the series, not just the exact id. + * Recurring tasks get a new row per occurrence (see handleRecurrence), all + * sharing series_id. Matching by id alone would only hit the completed row + * the agent remembers, missing the live next occurrence. + */ +import type Database from 'better-sqlite3'; + +import { nextEvenSeq } from '../../db/session-db.js'; + +export function insertTask( + db: Database.Database, + task: { + id: string; + processAfter: string; + recurrence: string | null; + platformId: string | null; + channelType: string | null; + threadId: string | null; + content: string; + }, +): void { + db.prepare( + `INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content, series_id) + VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content, @id)`, + ).run({ + ...task, + seq: nextEvenSeq(db), + }); +} + +export function cancelTask(db: Database.Database, taskId: string): void { + db.prepare( + "UPDATE messages_in SET status = 'completed', recurrence = NULL WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')", + ).run(taskId, taskId); +} + +export function pauseTask(db: Database.Database, taskId: string): void { + db.prepare( + "UPDATE messages_in SET status = 'paused' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'pending'", + ).run(taskId, taskId); +} + +export function resumeTask(db: Database.Database, taskId: string): void { + db.prepare( + "UPDATE messages_in SET status = 'pending' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'paused'", + ).run(taskId, taskId); +} + +export interface TaskUpdate { + prompt?: string; + script?: string | null; + recurrence?: string | null; + processAfter?: string; +} + +// Merges content JSON in-place so callers can update prompt/script without +// clobbering other fields. Matches by id OR series_id so the live next +// occurrence of a recurring task is updated, not just the completed row the +// agent last saw. Returns the number of rows touched. +export function updateTask(db: Database.Database, taskId: string, update: TaskUpdate): number { + const rows = db + .prepare( + "SELECT id, content FROM messages_in WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')", + ) + .all(taskId, taskId) as Array<{ id: string; content: string }>; + + if (rows.length === 0) return 0; + + const setProcessAfter = update.processAfter !== undefined; + const setRecurrence = update.recurrence !== undefined; + const mergeContent = update.prompt !== undefined || update.script !== undefined; + + const tx = db.transaction(() => { + for (const row of rows) { + let content = row.content; + if (mergeContent) { + const parsed = JSON.parse(row.content) as Record; + if (update.prompt !== undefined) parsed.prompt = update.prompt; + if (update.script !== undefined) parsed.script = update.script; + content = JSON.stringify(parsed); + } + + // Build SET clause dynamically so callers can update fields independently. + const sets: string[] = ['content = ?']; + const params: unknown[] = [content]; + if (setProcessAfter) { + sets.push('process_after = ?'); + params.push(update.processAfter); + } + if (setRecurrence) { + sets.push('recurrence = ?'); + params.push(update.recurrence); + } + params.push(row.id); + + db.prepare(`UPDATE messages_in SET ${sets.join(', ')} WHERE id = ?`).run(...params); + } + }); + tx(); + return rows.length; +} + +export interface RecurringMessage { + id: string; + kind: string; + content: string; + recurrence: string; + process_after: string | null; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + series_id: string; +} + +export function getCompletedRecurring(db: Database.Database): RecurringMessage[] { + return db + .prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL") + .all() as RecurringMessage[]; +} + +export function insertRecurrence( + db: Database.Database, + msg: RecurringMessage, + newId: string, + nextRun: string | null, +): void { + db.prepare( + `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content, series_id) + VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?, ?)`, + ).run( + newId, + nextEvenSeq(db), + msg.kind, + nextRun, + msg.recurrence, + msg.platform_id, + msg.channel_type, + msg.thread_id, + msg.content, + msg.series_id, + ); +} + +export function clearRecurrence(db: Database.Database, messageId: string): void { + db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(messageId); +} diff --git a/src/modules/scheduling/index.ts b/src/modules/scheduling/index.ts new file mode 100644 index 0000000..14ef7d5 --- /dev/null +++ b/src/modules/scheduling/index.ts @@ -0,0 +1,34 @@ +/** + * Scheduling module — one-shot and recurring tasks. + * + * Registers: + * - Five delivery action handlers: schedule_task, cancel_task, pause_task, + * resume_task, update_task. The container's scheduling MCP tools + * (container/agent-runner/src/mcp-tools/scheduling.ts) write system + * messages with these actions; the host applies them to inbound.db. + * + * Host integration points (filled by MODULE-HOOK markers, validated here + * with the scheduling module shipping inline): + * - `src/host-sweep.ts` → MODULE-HOOK:scheduling-recurrence calls + * `handleRecurrence` each sweep tick. + * - `container/agent-runner/src/poll-loop.ts` → MODULE-HOOK:scheduling-pre-task + * runs `applyPreTaskScripts` before the provider call so tasks carrying + * a pre-agent script can gate their own execution. + * + * No DB migration — tasks are `messages_in` rows with `kind='task'`, so the + * module piggybacks on the core schema. + */ +import { registerDeliveryAction } from '../../delivery.js'; +import { + handleCancelTask, + handlePauseTask, + handleResumeTask, + handleScheduleTask, + handleUpdateTask, +} from './actions.js'; + +registerDeliveryAction('schedule_task', handleScheduleTask); +registerDeliveryAction('cancel_task', handleCancelTask); +registerDeliveryAction('pause_task', handlePauseTask); +registerDeliveryAction('resume_task', handleResumeTask); +registerDeliveryAction('update_task', handleUpdateTask); diff --git a/src/modules/scheduling/recurrence.ts b/src/modules/scheduling/recurrence.ts new file mode 100644 index 0000000..a8a2e5c --- /dev/null +++ b/src/modules/scheduling/recurrence.ts @@ -0,0 +1,49 @@ +/** + * Sweep hook for recurring tasks. + * + * Every sweep tick, find `messages_in` rows that are `completed` AND still + * have a `recurrence` cron expression. For each, compute the next run via + * cron-parser, insert a fresh pending row (copying series_id forward), then + * clear the recurrence on the original so it isn't re-cloned next tick. + * + * Called from `src/host-sweep.ts` inside `MODULE-HOOK:scheduling-recurrence`. + * When scheduling ships inline (current state through PR #7), the hook is a + * direct dynamic import. When scheduling moves to the modules branch in + * PR #8, the install skill re-fills the marker on install. + */ +import type Database from 'better-sqlite3'; + +import { log } from '../../log.js'; +import type { Session } from '../../types.js'; +import { clearRecurrence, getCompletedRecurring, insertRecurrence } from './db.js'; + +export async function handleRecurrence(inDb: Database.Database, session: Session): Promise { + const recurring = getCompletedRecurring(inDb); + + for (const msg of recurring) { + try { + const { CronExpressionParser } = await import('cron-parser'); + const interval = CronExpressionParser.parse(msg.recurrence); + const nextRun = interval.next().toISOString(); + const prefix = msg.kind === 'task' ? 'task' : 'msg'; + const newId = `${prefix}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + + insertRecurrence(inDb, msg, newId, nextRun); + clearRecurrence(inDb, msg.id); + + log.info('Inserted next recurrence', { + originalId: msg.id, + newId, + seriesId: msg.series_id, + nextRun, + sessionId: session.id, + }); + } catch (err) { + log.error('Failed to compute next recurrence', { + messageId: msg.id, + recurrence: msg.recurrence, + err, + }); + } + } +}