fix(v2): cancel/pause/resume recurring tasks via series_id
Recurring tasks spawn a new messages_in row per occurrence. Cancel only matched the completed row the agent remembered, leaving the live next occurrence running. Tag every row in a recurrence chain with the originating task's id (series_id) so cancel/pause/resume can reach any live row in the series. Cancel also clears recurrence to prevent the sweep from cloning a cancelled task. Kind-aware id prefix on recurrences (task- instead of msg-) keeps list_tasks output consistent across occurrences. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -136,12 +136,14 @@ CREATE TABLE IF NOT EXISTS messages_in (
|
|||||||
status TEXT DEFAULT 'pending',
|
status TEXT DEFAULT 'pending',
|
||||||
process_after TEXT,
|
process_after TEXT,
|
||||||
recurrence TEXT,
|
recurrence TEXT,
|
||||||
|
series_id TEXT,
|
||||||
tries INTEGER DEFAULT 0,
|
tries INTEGER DEFAULT 0,
|
||||||
platform_id TEXT,
|
platform_id TEXT,
|
||||||
channel_type TEXT,
|
channel_type TEXT,
|
||||||
thread_id TEXT,
|
thread_id TEXT,
|
||||||
content TEXT NOT NULL
|
content TEXT NOT NULL
|
||||||
);
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id);
|
||||||
|
|
||||||
-- Host tracks delivery outcomes for messages_out IDs.
|
-- Host tracks delivery outcomes for messages_out IDs.
|
||||||
-- Avoids writing to outbound.db (container-owned).
|
-- Avoids writing to outbound.db (container-owned).
|
||||||
|
|||||||
201
src/db/session-db.test.ts
Normal file
201
src/db/session-db.test.ts
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
/**
|
||||||
|
* Tests for per-session messages_in operations — focused on the series_id
|
||||||
|
* invariant that lets cancel/pause/resume reach the live next occurrence of
|
||||||
|
* a recurring task, even after the row the agent remembers has completed
|
||||||
|
* and been replaced by a follow-up.
|
||||||
|
*/
|
||||||
|
import Database from 'better-sqlite3';
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
import { describe, it, expect, afterEach } from 'vitest';
|
||||||
|
|
||||||
|
import {
|
||||||
|
ensureSchema,
|
||||||
|
openInboundDb,
|
||||||
|
insertTask,
|
||||||
|
insertRecurrence,
|
||||||
|
cancelTask,
|
||||||
|
pauseTask,
|
||||||
|
resumeTask,
|
||||||
|
getCompletedRecurring,
|
||||||
|
migrateMessagesInTable,
|
||||||
|
type RecurringMessage,
|
||||||
|
} from './session-db.js';
|
||||||
|
|
||||||
|
const TEST_DIR = '/tmp/nanoclaw-session-db-test';
|
||||||
|
const DB_PATH = path.join(TEST_DIR, 'inbound.db');
|
||||||
|
|
||||||
|
function freshDb() {
|
||||||
|
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||||
|
fs.mkdirSync(TEST_DIR, { recursive: true });
|
||||||
|
ensureSchema(DB_PATH, 'inbound');
|
||||||
|
return openInboundDb(DB_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
function insertBasicTask(db: ReturnType<typeof openInboundDb>, id: string, recurrence: string | null) {
|
||||||
|
insertTask(db, {
|
||||||
|
id,
|
||||||
|
processAfter: new Date().toISOString(),
|
||||||
|
recurrence,
|
||||||
|
platformId: null,
|
||||||
|
channelType: null,
|
||||||
|
threadId: null,
|
||||||
|
content: JSON.stringify({ prompt: 'noop' }),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('insertTask', () => {
|
||||||
|
it('stamps series_id = id on insert', () => {
|
||||||
|
const db = freshDb();
|
||||||
|
insertBasicTask(db, 'task-1', null);
|
||||||
|
const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-1') as { series_id: string };
|
||||||
|
expect(row.series_id).toBe('task-1');
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('cancelTask / pauseTask / resumeTask series matching', () => {
|
||||||
|
// Simulates the recurrence chain that used to survive cancellation:
|
||||||
|
// the original task completes → handleRecurrence spawns a follow-up
|
||||||
|
// row → agent calls cancel_task(originalId) → historically only hit
|
||||||
|
// the completed row, leaving the live one running.
|
||||||
|
function seedRecurringChain(db: ReturnType<typeof openInboundDb>) {
|
||||||
|
insertBasicTask(db, 'task-orig', '0 9 * * *');
|
||||||
|
// Mark the original as completed (as syncProcessingAcks would do).
|
||||||
|
db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run();
|
||||||
|
|
||||||
|
const msg: RecurringMessage = {
|
||||||
|
id: 'task-orig',
|
||||||
|
kind: 'task',
|
||||||
|
content: JSON.stringify({ prompt: 'noop' }),
|
||||||
|
recurrence: '0 9 * * *',
|
||||||
|
process_after: null,
|
||||||
|
platform_id: null,
|
||||||
|
channel_type: null,
|
||||||
|
thread_id: null,
|
||||||
|
series_id: 'task-orig',
|
||||||
|
};
|
||||||
|
insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString());
|
||||||
|
}
|
||||||
|
|
||||||
|
it('cancel by original id reaches the live follow-up via series_id', () => {
|
||||||
|
const db = freshDb();
|
||||||
|
seedRecurringChain(db);
|
||||||
|
|
||||||
|
cancelTask(db, 'task-orig');
|
||||||
|
|
||||||
|
const live = db.prepare("SELECT id, status, recurrence FROM messages_in WHERE status = 'pending'").all();
|
||||||
|
expect(live).toHaveLength(0);
|
||||||
|
|
||||||
|
const followUp = db.prepare("SELECT status, recurrence FROM messages_in WHERE id = 'task-next'").get() as {
|
||||||
|
status: string;
|
||||||
|
recurrence: string | null;
|
||||||
|
};
|
||||||
|
expect(followUp.status).toBe('completed');
|
||||||
|
// Recurrence cleared so the sweep doesn't spawn another clone.
|
||||||
|
expect(followUp.recurrence).toBeNull();
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('cancelled task is not picked up by getCompletedRecurring', () => {
|
||||||
|
const db = freshDb();
|
||||||
|
insertBasicTask(db, 'task-1', '0 9 * * *');
|
||||||
|
cancelTask(db, 'task-1');
|
||||||
|
|
||||||
|
const recurring = getCompletedRecurring(db);
|
||||||
|
expect(recurring).toHaveLength(0);
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('pause by original id pauses the live follow-up', () => {
|
||||||
|
const db = freshDb();
|
||||||
|
seedRecurringChain(db);
|
||||||
|
|
||||||
|
pauseTask(db, 'task-orig');
|
||||||
|
|
||||||
|
const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string };
|
||||||
|
expect(followUp.status).toBe('paused');
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('resume by original id resumes the live follow-up', () => {
|
||||||
|
const db = freshDb();
|
||||||
|
seedRecurringChain(db);
|
||||||
|
|
||||||
|
db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = 'task-next'").run();
|
||||||
|
resumeTask(db, 'task-orig');
|
||||||
|
|
||||||
|
const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string };
|
||||||
|
expect(followUp.status).toBe('pending');
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('insertRecurrence', () => {
|
||||||
|
it('copies series_id forward', () => {
|
||||||
|
const db = freshDb();
|
||||||
|
insertBasicTask(db, 'task-orig', '0 9 * * *');
|
||||||
|
db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run();
|
||||||
|
|
||||||
|
const msg: RecurringMessage = {
|
||||||
|
id: 'task-orig',
|
||||||
|
kind: 'task',
|
||||||
|
content: '{}',
|
||||||
|
recurrence: '0 9 * * *',
|
||||||
|
process_after: null,
|
||||||
|
platform_id: null,
|
||||||
|
channel_type: null,
|
||||||
|
thread_id: null,
|
||||||
|
series_id: 'task-orig',
|
||||||
|
};
|
||||||
|
insertRecurrence(db, msg, 'task-next', new Date().toISOString());
|
||||||
|
|
||||||
|
const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-next') as {
|
||||||
|
series_id: string;
|
||||||
|
};
|
||||||
|
expect(row.series_id).toBe('task-orig');
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('migrateMessagesInTable', () => {
|
||||||
|
it('backfills series_id = id on legacy rows and is idempotent', () => {
|
||||||
|
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||||
|
fs.mkdirSync(TEST_DIR, { recursive: true });
|
||||||
|
|
||||||
|
// Build a legacy inbound.db WITHOUT series_id to simulate a pre-fix install.
|
||||||
|
const db = new Database(DB_PATH);
|
||||||
|
db.exec(`
|
||||||
|
CREATE TABLE messages_in (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
seq INTEGER UNIQUE,
|
||||||
|
kind TEXT NOT NULL,
|
||||||
|
timestamp TEXT NOT NULL,
|
||||||
|
status TEXT DEFAULT 'pending',
|
||||||
|
process_after TEXT,
|
||||||
|
recurrence TEXT,
|
||||||
|
tries INTEGER DEFAULT 0,
|
||||||
|
platform_id TEXT,
|
||||||
|
channel_type TEXT,
|
||||||
|
thread_id TEXT,
|
||||||
|
content TEXT NOT NULL
|
||||||
|
);
|
||||||
|
`);
|
||||||
|
db.prepare(
|
||||||
|
"INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES (?, ?, 'task', datetime('now'), 'pending', '{}')",
|
||||||
|
).run('legacy-1', 2);
|
||||||
|
|
||||||
|
migrateMessagesInTable(db);
|
||||||
|
migrateMessagesInTable(db); // idempotent
|
||||||
|
|
||||||
|
const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('legacy-1') as {
|
||||||
|
series_id: string;
|
||||||
|
};
|
||||||
|
expect(row.series_id).toBe('legacy-1');
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -92,8 +92,8 @@ export function insertMessage(
|
|||||||
},
|
},
|
||||||
): void {
|
): void {
|
||||||
db.prepare(
|
db.prepare(
|
||||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence)
|
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id)
|
||||||
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`,
|
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id)`,
|
||||||
).run({
|
).run({
|
||||||
...message,
|
...message,
|
||||||
seq: nextEvenSeq(db),
|
seq: nextEvenSeq(db),
|
||||||
@@ -113,30 +113,34 @@ export function insertTask(
|
|||||||
},
|
},
|
||||||
): void {
|
): void {
|
||||||
db.prepare(
|
db.prepare(
|
||||||
`INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content)
|
`INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content, series_id)
|
||||||
VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content)`,
|
VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content, @id)`,
|
||||||
).run({
|
).run({
|
||||||
...task,
|
...task,
|
||||||
seq: nextEvenSeq(db),
|
seq: nextEvenSeq(db),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancel/pause/resume match any live row in the series, not just the exact id.
|
||||||
|
// Recurring tasks get a new row per occurrence (see handleRecurrence), all
|
||||||
|
// sharing series_id. Matching by id alone would only hit the completed row
|
||||||
|
// the agent remembers, missing the live next occurrence.
|
||||||
export function cancelTask(db: Database.Database, taskId: string): void {
|
export function cancelTask(db: Database.Database, taskId: string): void {
|
||||||
db.prepare(
|
db.prepare(
|
||||||
"UPDATE messages_in SET status = 'completed' WHERE id = ? AND kind = 'task' AND status IN ('pending', 'paused')",
|
"UPDATE messages_in SET status = 'completed', recurrence = NULL WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')",
|
||||||
).run(taskId);
|
).run(taskId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function pauseTask(db: Database.Database, taskId: string): void {
|
export function pauseTask(db: Database.Database, taskId: string): void {
|
||||||
db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = ? AND kind = 'task' AND status = 'pending'").run(
|
db.prepare(
|
||||||
taskId,
|
"UPDATE messages_in SET status = 'paused' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'pending'",
|
||||||
);
|
).run(taskId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function resumeTask(db: Database.Database, taskId: string): void {
|
export function resumeTask(db: Database.Database, taskId: string): void {
|
||||||
db.prepare("UPDATE messages_in SET status = 'pending' WHERE id = ? AND kind = 'task' AND status = 'paused'").run(
|
db.prepare(
|
||||||
taskId,
|
"UPDATE messages_in SET status = 'pending' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'paused'",
|
||||||
);
|
).run(taskId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function countDueMessages(db: Database.Database): number {
|
export function countDueMessages(db: Database.Database): number {
|
||||||
@@ -180,6 +184,7 @@ export interface RecurringMessage {
|
|||||||
platform_id: string | null;
|
platform_id: string | null;
|
||||||
channel_type: string | null;
|
channel_type: string | null;
|
||||||
thread_id: string | null;
|
thread_id: string | null;
|
||||||
|
series_id: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getCompletedRecurring(db: Database.Database): RecurringMessage[] {
|
export function getCompletedRecurring(db: Database.Database): RecurringMessage[] {
|
||||||
@@ -195,8 +200,8 @@ export function insertRecurrence(
|
|||||||
nextRun: string | null,
|
nextRun: string | null,
|
||||||
): void {
|
): void {
|
||||||
db.prepare(
|
db.prepare(
|
||||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content)
|
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content, series_id)
|
||||||
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`,
|
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?, ?)`,
|
||||||
).run(
|
).run(
|
||||||
newId,
|
newId,
|
||||||
nextEvenSeq(db),
|
nextEvenSeq(db),
|
||||||
@@ -207,6 +212,7 @@ export function insertRecurrence(
|
|||||||
msg.channel_type,
|
msg.channel_type,
|
||||||
msg.thread_id,
|
msg.thread_id,
|
||||||
msg.content,
|
msg.content,
|
||||||
|
msg.series_id,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -296,3 +302,18 @@ export function migrateDeliveredTable(db: Database.Database): void {
|
|||||||
db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run();
|
db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Adds series_id (groups all occurrences of a recurring task) to pre-existing
|
||||||
|
// messages_in tables. No-op on fresh installs where the column is in the schema.
|
||||||
|
// Backfills existing rows so cancel/pause/resume queries can rely on
|
||||||
|
// series_id IS NOT NULL.
|
||||||
|
export function migrateMessagesInTable(db: Database.Database): void {
|
||||||
|
const cols = new Set(
|
||||||
|
(db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name),
|
||||||
|
);
|
||||||
|
if (!cols.has('series_id')) {
|
||||||
|
db.prepare('ALTER TABLE messages_in ADD COLUMN series_id TEXT').run();
|
||||||
|
db.prepare('UPDATE messages_in SET series_id = id WHERE series_id IS NULL').run();
|
||||||
|
db.prepare('CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id)').run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -159,12 +159,13 @@ async function handleRecurrence(inDb: Database.Database, session: Session): Prom
|
|||||||
const { CronExpressionParser } = await import('cron-parser');
|
const { CronExpressionParser } = await import('cron-parser');
|
||||||
const interval = CronExpressionParser.parse(msg.recurrence);
|
const interval = CronExpressionParser.parse(msg.recurrence);
|
||||||
const nextRun = interval.next().toISOString();
|
const nextRun = interval.next().toISOString();
|
||||||
const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
const prefix = msg.kind === 'task' ? 'task' : 'msg';
|
||||||
|
const newId = `${prefix}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||||
|
|
||||||
insertRecurrence(inDb, msg, newId, nextRun);
|
insertRecurrence(inDb, msg, newId, nextRun);
|
||||||
clearRecurrence(inDb, msg.id);
|
clearRecurrence(inDb, msg.id);
|
||||||
|
|
||||||
log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun });
|
log.info('Inserted next recurrence', { originalId: msg.id, newId, seriesId: msg.series_id, nextRun });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err });
|
log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import {
|
|||||||
upsertSessionRouting,
|
upsertSessionRouting,
|
||||||
replaceDestinations,
|
replaceDestinations,
|
||||||
insertMessage,
|
insertMessage,
|
||||||
|
migrateMessagesInTable,
|
||||||
type DestinationRow,
|
type DestinationRow,
|
||||||
} from './db/session-db.js';
|
} from './db/session-db.js';
|
||||||
import { log } from './log.js';
|
import { log } from './log.js';
|
||||||
@@ -305,7 +306,9 @@ function extractAttachmentFiles(
|
|||||||
|
|
||||||
/** Open the inbound DB for a session (host reads/writes). */
|
/** Open the inbound DB for a session (host reads/writes). */
|
||||||
export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database {
|
export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database {
|
||||||
return openInboundDbRaw(inboundDbPath(agentGroupId, sessionId));
|
const db = openInboundDbRaw(inboundDbPath(agentGroupId, sessionId));
|
||||||
|
migrateMessagesInTable(db);
|
||||||
|
return db;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Open the outbound DB for a session (host reads only). */
|
/** Open the outbound DB for a session (host reads only). */
|
||||||
|
|||||||
Reference in New Issue
Block a user