refactor(modules): extract scheduling as registry-based module
Moves the scheduling surface — 5 delivery actions (schedule_task, cancel_task, pause_task, resume_task, update_task), handleRecurrence, applyPreTaskScripts, and task DB helpers — out of core and into src/modules/scheduling/ (host) and container/agent-runner/src/scheduling/ (container). First PR to fill the MODULE-HOOK markers introduced in PR #2: - src/host-sweep.ts MODULE-HOOK:scheduling-recurrence now dynamically imports handleRecurrence from the module each sweep tick. - container/agent-runner/src/poll-loop.ts MODULE-HOOK:scheduling-pre-task dynamically imports applyPreTaskScripts before the provider call. When the marker block is empty (scheduling uninstalled), `keep` falls back to `normalMessages` so non-task messages still flow. The 5 task cases are removed from delivery.ts's handleSystemAction switch — the registry now routes them. Task DB helpers moved out of src/db/session-db.ts (which kept `nextEvenSeq` as a named export so the module can uphold the host-writes-even-seq invariant). Test suite split to match: scheduling-specific tests live in the module. No migration — tasks are messages_in rows with kind='task'. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,289 +1,24 @@
|
||||
/**
|
||||
* Tests for per-session messages_in operations — focused on the series_id
|
||||
* invariant that lets cancel/pause/resume reach the live next occurrence of
|
||||
* a recurring task, even after the row the agent remembers has completed
|
||||
* and been replaced by a follow-up.
|
||||
* Tests for core per-session messages_in schema maintenance.
|
||||
*
|
||||
* Task-specific DB tests (insertTask, cancel/pause/resume, updateTask,
|
||||
* insertRecurrence) live in `src/modules/scheduling/db.test.ts` with the
|
||||
* rest of the scheduling module.
|
||||
*/
|
||||
import Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { describe, it, expect, afterEach } from 'vitest';
|
||||
|
||||
import {
|
||||
ensureSchema,
|
||||
openInboundDb,
|
||||
insertTask,
|
||||
insertRecurrence,
|
||||
cancelTask,
|
||||
pauseTask,
|
||||
resumeTask,
|
||||
updateTask,
|
||||
getCompletedRecurring,
|
||||
migrateMessagesInTable,
|
||||
type RecurringMessage,
|
||||
} from './session-db.js';
|
||||
import { migrateMessagesInTable } from './session-db.js';
|
||||
|
||||
const TEST_DIR = '/tmp/nanoclaw-session-db-test';
|
||||
const DB_PATH = path.join(TEST_DIR, 'inbound.db');
|
||||
|
||||
function freshDb() {
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
fs.mkdirSync(TEST_DIR, { recursive: true });
|
||||
ensureSchema(DB_PATH, 'inbound');
|
||||
return openInboundDb(DB_PATH);
|
||||
}
|
||||
|
||||
function insertBasicTask(db: ReturnType<typeof openInboundDb>, id: string, recurrence: string | null) {
|
||||
insertTask(db, {
|
||||
id,
|
||||
processAfter: new Date().toISOString(),
|
||||
recurrence,
|
||||
platformId: null,
|
||||
channelType: null,
|
||||
threadId: null,
|
||||
content: JSON.stringify({ prompt: 'noop' }),
|
||||
});
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
});
|
||||
|
||||
describe('insertTask', () => {
|
||||
it('stamps series_id = id on insert', () => {
|
||||
const db = freshDb();
|
||||
insertBasicTask(db, 'task-1', null);
|
||||
const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-1') as { series_id: string };
|
||||
expect(row.series_id).toBe('task-1');
|
||||
db.close();
|
||||
});
|
||||
});
|
||||
|
||||
describe('cancelTask / pauseTask / resumeTask series matching', () => {
|
||||
// Simulates the recurrence chain that used to survive cancellation:
|
||||
// the original task completes → handleRecurrence spawns a follow-up
|
||||
// row → agent calls cancel_task(originalId) → historically only hit
|
||||
// the completed row, leaving the live one running.
|
||||
function seedRecurringChain(db: ReturnType<typeof openInboundDb>) {
|
||||
insertBasicTask(db, 'task-orig', '0 9 * * *');
|
||||
// Mark the original as completed (as syncProcessingAcks would do).
|
||||
db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run();
|
||||
|
||||
const msg: RecurringMessage = {
|
||||
id: 'task-orig',
|
||||
kind: 'task',
|
||||
content: JSON.stringify({ prompt: 'noop' }),
|
||||
recurrence: '0 9 * * *',
|
||||
process_after: null,
|
||||
platform_id: null,
|
||||
channel_type: null,
|
||||
thread_id: null,
|
||||
series_id: 'task-orig',
|
||||
};
|
||||
insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString());
|
||||
}
|
||||
|
||||
it('cancel by original id reaches the live follow-up via series_id', () => {
|
||||
const db = freshDb();
|
||||
seedRecurringChain(db);
|
||||
|
||||
cancelTask(db, 'task-orig');
|
||||
|
||||
const live = db.prepare("SELECT id, status, recurrence FROM messages_in WHERE status = 'pending'").all();
|
||||
expect(live).toHaveLength(0);
|
||||
|
||||
const followUp = db.prepare("SELECT status, recurrence FROM messages_in WHERE id = 'task-next'").get() as {
|
||||
status: string;
|
||||
recurrence: string | null;
|
||||
};
|
||||
expect(followUp.status).toBe('completed');
|
||||
// Recurrence cleared so the sweep doesn't spawn another clone.
|
||||
expect(followUp.recurrence).toBeNull();
|
||||
db.close();
|
||||
});
|
||||
|
||||
it('cancelled task is not picked up by getCompletedRecurring', () => {
|
||||
const db = freshDb();
|
||||
insertBasicTask(db, 'task-1', '0 9 * * *');
|
||||
cancelTask(db, 'task-1');
|
||||
|
||||
const recurring = getCompletedRecurring(db);
|
||||
expect(recurring).toHaveLength(0);
|
||||
db.close();
|
||||
});
|
||||
|
||||
it('pause by original id pauses the live follow-up', () => {
|
||||
const db = freshDb();
|
||||
seedRecurringChain(db);
|
||||
|
||||
pauseTask(db, 'task-orig');
|
||||
|
||||
const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string };
|
||||
expect(followUp.status).toBe('paused');
|
||||
db.close();
|
||||
});
|
||||
|
||||
it('resume by original id resumes the live follow-up', () => {
|
||||
const db = freshDb();
|
||||
seedRecurringChain(db);
|
||||
|
||||
db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = 'task-next'").run();
|
||||
resumeTask(db, 'task-orig');
|
||||
|
||||
const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string };
|
||||
expect(followUp.status).toBe('pending');
|
||||
db.close();
|
||||
});
|
||||
});
|
||||
|
||||
describe('updateTask', () => {
|
||||
it('merges supplied fields into content JSON without clobbering others', () => {
|
||||
const db = freshDb();
|
||||
insertTask(db, {
|
||||
id: 'task-1',
|
||||
processAfter: new Date().toISOString(),
|
||||
recurrence: null,
|
||||
platformId: null,
|
||||
channelType: null,
|
||||
threadId: null,
|
||||
content: JSON.stringify({ prompt: 'old', script: 'echo old', extra: 'keep me' }),
|
||||
});
|
||||
|
||||
const touched = updateTask(db, 'task-1', { prompt: 'new' });
|
||||
expect(touched).toBe(1);
|
||||
|
||||
const row = db.prepare('SELECT content FROM messages_in WHERE id = ?').get('task-1') as { content: string };
|
||||
const parsed = JSON.parse(row.content);
|
||||
expect(parsed.prompt).toBe('new');
|
||||
expect(parsed.script).toBe('echo old');
|
||||
expect(parsed.extra).toBe('keep me');
|
||||
});
|
||||
|
||||
it('updates recurrence and process_after when supplied', () => {
|
||||
const db = freshDb();
|
||||
insertTask(db, {
|
||||
id: 'task-1',
|
||||
processAfter: '2026-01-01T00:00:00Z',
|
||||
recurrence: '0 9 * * *',
|
||||
platformId: null,
|
||||
channelType: null,
|
||||
threadId: null,
|
||||
content: JSON.stringify({ prompt: 'p' }),
|
||||
});
|
||||
|
||||
updateTask(db, 'task-1', { recurrence: '0 18 * * *', processAfter: '2026-02-01T00:00:00Z' });
|
||||
|
||||
const row = db.prepare('SELECT recurrence, process_after FROM messages_in WHERE id = ?').get('task-1') as {
|
||||
recurrence: string;
|
||||
process_after: string;
|
||||
};
|
||||
expect(row.recurrence).toBe('0 18 * * *');
|
||||
expect(row.process_after).toBe('2026-02-01T00:00:00Z');
|
||||
});
|
||||
|
||||
it('clears recurrence when null is passed', () => {
|
||||
const db = freshDb();
|
||||
insertTask(db, {
|
||||
id: 'task-1',
|
||||
processAfter: '2026-01-01T00:00:00Z',
|
||||
recurrence: '0 9 * * *',
|
||||
platformId: null,
|
||||
channelType: null,
|
||||
threadId: null,
|
||||
content: JSON.stringify({ prompt: 'p' }),
|
||||
});
|
||||
|
||||
updateTask(db, 'task-1', { recurrence: null });
|
||||
|
||||
const row = db.prepare('SELECT recurrence FROM messages_in WHERE id = ?').get('task-1') as {
|
||||
recurrence: string | null;
|
||||
};
|
||||
expect(row.recurrence).toBeNull();
|
||||
});
|
||||
|
||||
it('reaches the live follow-up via series_id when called with the original id', () => {
|
||||
const db = freshDb();
|
||||
insertTask(db, {
|
||||
id: 'task-orig',
|
||||
processAfter: new Date().toISOString(),
|
||||
recurrence: '0 9 * * *',
|
||||
platformId: null,
|
||||
channelType: null,
|
||||
threadId: null,
|
||||
content: JSON.stringify({ prompt: 'old' }),
|
||||
});
|
||||
db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run();
|
||||
|
||||
const msg: RecurringMessage = {
|
||||
id: 'task-orig',
|
||||
kind: 'task',
|
||||
content: JSON.stringify({ prompt: 'old' }),
|
||||
recurrence: '0 9 * * *',
|
||||
process_after: null,
|
||||
platform_id: null,
|
||||
channel_type: null,
|
||||
thread_id: null,
|
||||
series_id: 'task-orig',
|
||||
};
|
||||
insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString());
|
||||
|
||||
const touched = updateTask(db, 'task-orig', { prompt: 'new' });
|
||||
// Only the live follow-up should be touched — completed rows are excluded.
|
||||
expect(touched).toBe(1);
|
||||
|
||||
const live = db.prepare("SELECT content FROM messages_in WHERE id = 'task-next'").get() as { content: string };
|
||||
expect(JSON.parse(live.content).prompt).toBe('new');
|
||||
|
||||
// Original (completed) row left alone.
|
||||
const orig = db.prepare("SELECT content FROM messages_in WHERE id = 'task-orig'").get() as { content: string };
|
||||
expect(JSON.parse(orig.content).prompt).toBe('old');
|
||||
});
|
||||
|
||||
it('returns 0 when no live task matches', () => {
|
||||
const db = freshDb();
|
||||
insertTask(db, {
|
||||
id: 'task-1',
|
||||
processAfter: new Date().toISOString(),
|
||||
recurrence: null,
|
||||
platformId: null,
|
||||
channelType: null,
|
||||
threadId: null,
|
||||
content: JSON.stringify({ prompt: 'p' }),
|
||||
});
|
||||
db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-1'").run();
|
||||
|
||||
const touched = updateTask(db, 'task-1', { prompt: 'new' });
|
||||
expect(touched).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('insertRecurrence', () => {
|
||||
it('copies series_id forward', () => {
|
||||
const db = freshDb();
|
||||
insertBasicTask(db, 'task-orig', '0 9 * * *');
|
||||
db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run();
|
||||
|
||||
const msg: RecurringMessage = {
|
||||
id: 'task-orig',
|
||||
kind: 'task',
|
||||
content: '{}',
|
||||
recurrence: '0 9 * * *',
|
||||
process_after: null,
|
||||
platform_id: null,
|
||||
channel_type: null,
|
||||
thread_id: null,
|
||||
series_id: 'task-orig',
|
||||
};
|
||||
insertRecurrence(db, msg, 'task-next', new Date().toISOString());
|
||||
|
||||
const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-next') as {
|
||||
series_id: string;
|
||||
};
|
||||
expect(row.series_id).toBe('task-orig');
|
||||
db.close();
|
||||
});
|
||||
});
|
||||
|
||||
describe('migrateMessagesInTable', () => {
|
||||
it('backfills series_id = id on legacy rows and is idempotent', () => {
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
|
||||
@@ -71,8 +71,14 @@ export function replaceDestinations(db: Database.Database, entries: DestinationR
|
||||
// messages_in
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Next even seq number for host-owned inbound.db. */
|
||||
function nextEvenSeq(db: Database.Database): number {
|
||||
/**
|
||||
* Next even seq number for host-owned inbound.db.
|
||||
*
|
||||
* Exported so the scheduling module's task helpers can maintain the
|
||||
* host-writes-even-seq invariant without duplicating the logic. Not part of
|
||||
* the general public API — imported by `src/modules/scheduling/db.ts` only.
|
||||
*/
|
||||
export function nextEvenSeq(db: Database.Database): number {
|
||||
const maxSeq = (db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m;
|
||||
return maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2);
|
||||
}
|
||||
@@ -100,103 +106,6 @@ export function insertMessage(
|
||||
});
|
||||
}
|
||||
|
||||
export function insertTask(
|
||||
db: Database.Database,
|
||||
task: {
|
||||
id: string;
|
||||
processAfter: string;
|
||||
recurrence: string | null;
|
||||
platformId: string | null;
|
||||
channelType: string | null;
|
||||
threadId: string | null;
|
||||
content: string;
|
||||
},
|
||||
): void {
|
||||
db.prepare(
|
||||
`INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content, series_id)
|
||||
VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content, @id)`,
|
||||
).run({
|
||||
...task,
|
||||
seq: nextEvenSeq(db),
|
||||
});
|
||||
}
|
||||
|
||||
// cancel/pause/resume match any live row in the series, not just the exact id.
|
||||
// Recurring tasks get a new row per occurrence (see handleRecurrence), all
|
||||
// sharing series_id. Matching by id alone would only hit the completed row
|
||||
// the agent remembers, missing the live next occurrence.
|
||||
export function cancelTask(db: Database.Database, taskId: string): void {
|
||||
db.prepare(
|
||||
"UPDATE messages_in SET status = 'completed', recurrence = NULL WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')",
|
||||
).run(taskId, taskId);
|
||||
}
|
||||
|
||||
export function pauseTask(db: Database.Database, taskId: string): void {
|
||||
db.prepare(
|
||||
"UPDATE messages_in SET status = 'paused' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'pending'",
|
||||
).run(taskId, taskId);
|
||||
}
|
||||
|
||||
export function resumeTask(db: Database.Database, taskId: string): void {
|
||||
db.prepare(
|
||||
"UPDATE messages_in SET status = 'pending' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'paused'",
|
||||
).run(taskId, taskId);
|
||||
}
|
||||
|
||||
export interface TaskUpdate {
|
||||
prompt?: string;
|
||||
script?: string | null;
|
||||
recurrence?: string | null;
|
||||
processAfter?: string;
|
||||
}
|
||||
|
||||
// Merges content JSON in-place so callers can update prompt/script without
|
||||
// clobbering other fields. Matches by id OR series_id so the live next
|
||||
// occurrence of a recurring task is updated, not just the completed row the
|
||||
// agent last saw. Returns the number of rows touched.
|
||||
export function updateTask(db: Database.Database, taskId: string, update: TaskUpdate): number {
|
||||
const rows = db
|
||||
.prepare(
|
||||
"SELECT id, content FROM messages_in WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')",
|
||||
)
|
||||
.all(taskId, taskId) as Array<{ id: string; content: string }>;
|
||||
|
||||
if (rows.length === 0) return 0;
|
||||
|
||||
const setProcessAfter = update.processAfter !== undefined;
|
||||
const setRecurrence = update.recurrence !== undefined;
|
||||
const mergeContent = update.prompt !== undefined || update.script !== undefined;
|
||||
|
||||
const tx = db.transaction(() => {
|
||||
for (const row of rows) {
|
||||
let content = row.content;
|
||||
if (mergeContent) {
|
||||
const parsed = JSON.parse(row.content) as Record<string, unknown>;
|
||||
if (update.prompt !== undefined) parsed.prompt = update.prompt;
|
||||
if (update.script !== undefined) parsed.script = update.script;
|
||||
content = JSON.stringify(parsed);
|
||||
}
|
||||
|
||||
// Build SET clause dynamically so callers can update fields independently.
|
||||
const sets: string[] = ['content = ?'];
|
||||
const params: unknown[] = [content];
|
||||
if (setProcessAfter) {
|
||||
sets.push('process_after = ?');
|
||||
params.push(update.processAfter);
|
||||
}
|
||||
if (setRecurrence) {
|
||||
sets.push('recurrence = ?');
|
||||
params.push(update.recurrence);
|
||||
}
|
||||
params.push(row.id);
|
||||
|
||||
db.prepare(`UPDATE messages_in SET ${sets.join(', ')} WHERE id = ?`).run(...params);
|
||||
}
|
||||
});
|
||||
tx();
|
||||
return rows.length;
|
||||
}
|
||||
|
||||
export function countDueMessages(db: Database.Database): number {
|
||||
return (
|
||||
db
|
||||
@@ -229,51 +138,6 @@ export function getMessageForRetry(
|
||||
| undefined;
|
||||
}
|
||||
|
||||
export interface RecurringMessage {
|
||||
id: string;
|
||||
kind: string;
|
||||
content: string;
|
||||
recurrence: string;
|
||||
process_after: string | null;
|
||||
platform_id: string | null;
|
||||
channel_type: string | null;
|
||||
thread_id: string | null;
|
||||
series_id: string;
|
||||
}
|
||||
|
||||
export function getCompletedRecurring(db: Database.Database): RecurringMessage[] {
|
||||
return db
|
||||
.prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL")
|
||||
.all() as RecurringMessage[];
|
||||
}
|
||||
|
||||
export function insertRecurrence(
|
||||
db: Database.Database,
|
||||
msg: RecurringMessage,
|
||||
newId: string,
|
||||
nextRun: string | null,
|
||||
): void {
|
||||
db.prepare(
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content, series_id)
|
||||
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?, ?)`,
|
||||
).run(
|
||||
newId,
|
||||
nextEvenSeq(db),
|
||||
msg.kind,
|
||||
nextRun,
|
||||
msg.recurrence,
|
||||
msg.platform_id,
|
||||
msg.channel_type,
|
||||
msg.thread_id,
|
||||
msg.content,
|
||||
msg.series_id,
|
||||
);
|
||||
}
|
||||
|
||||
export function clearRecurrence(db: Database.Database, messageId: string): void {
|
||||
db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(messageId);
|
||||
}
|
||||
|
||||
export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
|
||||
const completed = outDb
|
||||
.prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')")
|
||||
|
||||
Reference in New Issue
Block a user