refactor: move destinations from JSON file into inbound.db
The per-session destination map was being written as a sidecar JSON file (/workspace/.nanoclaw-destinations.json) — inconsistent with the rest of v2, where all host↔container IO goes through inbound.db / outbound.db. Move it into a `destinations` table in INBOUND_SCHEMA. The host writes it before every container wake AND on demand (e.g. after create_agent) so the creator sees the new child destination mid-session without a restart. The container queries the table live on every lookup — no cache, no staleness window. - src/db/schema.ts: add `destinations` table to INBOUND_SCHEMA. - src/session-manager.ts: writeDestinationsFile → writeDestinations, writes via DELETE + INSERT inside a transaction. - src/delivery.ts: create_agent handler calls writeDestinations on the creator's session after inserting the new destination rows. - container/agent-runner/src/destinations.ts: queries inbound.db directly in every findByName/getAllDestinations/findByRouting call. No more cache. No setDestinationsForTest (obsolete). No fs import. - container/agent-runner/src/index.ts and mcp-tools/index.ts: remove loadDestinations() calls — no longer needed. - Test helper initTestSessionDb creates the destinations table. Integration test inserts a row directly instead of mocking the cache. No backwards compatibility: sessions predating the schema update must be recreated. This is fine on the v2 branch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -95,6 +95,14 @@ export function initTestSessionDb(): { inbound: Database.Database; outbound: Dat
|
|||||||
status TEXT NOT NULL DEFAULT 'delivered',
|
status TEXT NOT NULL DEFAULT 'delivered',
|
||||||
delivered_at TEXT NOT NULL
|
delivered_at TEXT NOT NULL
|
||||||
);
|
);
|
||||||
|
CREATE TABLE destinations (
|
||||||
|
name TEXT PRIMARY KEY,
|
||||||
|
display_name TEXT,
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
channel_type TEXT,
|
||||||
|
platform_id TEXT,
|
||||||
|
agent_group_id TEXT
|
||||||
|
);
|
||||||
`);
|
`);
|
||||||
|
|
||||||
_outbound = new Database(':memory:');
|
_outbound = new Database(':memory:');
|
||||||
|
|||||||
@@ -1,11 +1,16 @@
|
|||||||
/**
|
/**
|
||||||
* Destination map loaded at container startup from
|
* Destination map — lives in inbound.db's `destinations` table.
|
||||||
* /workspace/.nanoclaw-destinations.json (written by the host on wake).
|
|
||||||
*
|
*
|
||||||
* The map is BOTH the routing table and the ACL — if a name/target
|
* The host writes this table before every container wake AND on demand
|
||||||
* isn't in here, the agent can't reach it.
|
* (e.g. when a new child agent is created mid-session). The container
|
||||||
|
* queries the table live on every lookup, so admin changes take effect
|
||||||
|
* immediately — no restart required.
|
||||||
|
*
|
||||||
|
* This table is BOTH the routing map and the container-visible ACL.
|
||||||
|
* The host re-validates on the delivery side against the central DB,
|
||||||
|
* so even if this table is stale the host's enforcement is authoritative.
|
||||||
*/
|
*/
|
||||||
import fs from 'fs';
|
import { getInboundDb } from './db/connection.js';
|
||||||
|
|
||||||
export interface DestinationEntry {
|
export interface DestinationEntry {
|
||||||
name: string;
|
name: string;
|
||||||
@@ -16,36 +21,34 @@ export interface DestinationEntry {
|
|||||||
agentGroupId?: string;
|
agentGroupId?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
const DEST_FILE = '/workspace/.nanoclaw-destinations.json';
|
interface DestRow {
|
||||||
|
name: string;
|
||||||
|
display_name: string | null;
|
||||||
|
type: 'channel' | 'agent';
|
||||||
|
channel_type: string | null;
|
||||||
|
platform_id: string | null;
|
||||||
|
agent_group_id: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
let cache: DestinationEntry[] = [];
|
function rowToEntry(row: DestRow): DestinationEntry {
|
||||||
|
return {
|
||||||
export function loadDestinations(): void {
|
name: row.name,
|
||||||
try {
|
displayName: row.display_name ?? row.name,
|
||||||
if (!fs.existsSync(DEST_FILE)) {
|
type: row.type,
|
||||||
cache = [];
|
channelType: row.channel_type ?? undefined,
|
||||||
return;
|
platformId: row.platform_id ?? undefined,
|
||||||
}
|
agentGroupId: row.agent_group_id ?? undefined,
|
||||||
const raw = fs.readFileSync(DEST_FILE, 'utf-8');
|
};
|
||||||
const parsed = JSON.parse(raw) as { destinations?: DestinationEntry[] };
|
|
||||||
cache = Array.isArray(parsed.destinations) ? parsed.destinations : [];
|
|
||||||
} catch (err) {
|
|
||||||
console.error(`[destinations] Failed to load: ${err instanceof Error ? err.message : String(err)}`);
|
|
||||||
cache = [];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getAllDestinations(): DestinationEntry[] {
|
export function getAllDestinations(): DestinationEntry[] {
|
||||||
return cache;
|
const rows = getInboundDb().prepare('SELECT * FROM destinations ORDER BY name').all() as DestRow[];
|
||||||
}
|
return rows.map(rowToEntry);
|
||||||
|
|
||||||
/** Test-only: inject destinations without touching the filesystem. */
|
|
||||||
export function setDestinationsForTest(destinations: DestinationEntry[]): void {
|
|
||||||
cache = destinations;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function findByName(name: string): DestinationEntry | undefined {
|
export function findByName(name: string): DestinationEntry | undefined {
|
||||||
return cache.find((d) => d.name === name);
|
const row = getInboundDb().prepare('SELECT * FROM destinations WHERE name = ?').get(name) as DestRow | undefined;
|
||||||
|
return row ? rowToEntry(row) : undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -57,15 +60,23 @@ export function findByRouting(
|
|||||||
platformId: string | null | undefined,
|
platformId: string | null | undefined,
|
||||||
): DestinationEntry | undefined {
|
): DestinationEntry | undefined {
|
||||||
if (!channelType || !platformId) return undefined;
|
if (!channelType || !platformId) return undefined;
|
||||||
if (channelType === 'agent') {
|
const db = getInboundDb();
|
||||||
return cache.find((d) => d.type === 'agent' && d.agentGroupId === platformId);
|
const row =
|
||||||
}
|
channelType === 'agent'
|
||||||
return cache.find((d) => d.type === 'channel' && d.channelType === channelType && d.platformId === platformId);
|
? (db
|
||||||
|
.prepare("SELECT * FROM destinations WHERE type = 'agent' AND agent_group_id = ?")
|
||||||
|
.get(platformId) as DestRow | undefined)
|
||||||
|
: (db
|
||||||
|
.prepare("SELECT * FROM destinations WHERE type = 'channel' AND channel_type = ? AND platform_id = ?")
|
||||||
|
.get(channelType, platformId) as DestRow | undefined);
|
||||||
|
return row ? rowToEntry(row) : undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Generate the system-prompt addendum describing destinations and syntax. */
|
/** Generate the system-prompt addendum describing destinations and syntax. */
|
||||||
export function buildSystemPromptAddendum(): string {
|
export function buildSystemPromptAddendum(): string {
|
||||||
if (cache.length === 0) {
|
const all = getAllDestinations();
|
||||||
|
|
||||||
|
if (all.length === 0) {
|
||||||
return [
|
return [
|
||||||
'## Sending messages',
|
'## Sending messages',
|
||||||
'',
|
'',
|
||||||
@@ -74,9 +85,8 @@ export function buildSystemPromptAddendum(): string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Single-destination shortcut: the agent just writes its response normally.
|
// Single-destination shortcut: the agent just writes its response normally.
|
||||||
// No wrapping needed. This preserves the simple case (one user, one channel).
|
if (all.length === 1) {
|
||||||
if (cache.length === 1) {
|
const d = all[0];
|
||||||
const d = cache[0];
|
|
||||||
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
|
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
|
||||||
return [
|
return [
|
||||||
'## Sending messages',
|
'## Sending messages',
|
||||||
@@ -90,7 +100,7 @@ export function buildSystemPromptAddendum(): string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const lines = ['## Sending messages', '', 'You can send messages to the following destinations:', ''];
|
const lines = ['## Sending messages', '', 'You can send messages to the following destinations:', ''];
|
||||||
for (const d of cache) {
|
for (const d of all) {
|
||||||
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
|
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
|
||||||
lines.push(`- \`${d.name}\`${label}`);
|
lines.push(`- \`${d.name}\`${label}`);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import fs from 'fs';
|
|||||||
import path from 'path';
|
import path from 'path';
|
||||||
import { fileURLToPath } from 'url';
|
import { fileURLToPath } from 'url';
|
||||||
|
|
||||||
import { buildSystemPromptAddendum, loadDestinations } from './destinations.js';
|
import { buildSystemPromptAddendum } from './destinations.js';
|
||||||
import { createProvider, type ProviderName } from './providers/factory.js';
|
import { createProvider, type ProviderName } from './providers/factory.js';
|
||||||
import { runPollLoop } from './poll-loop.js';
|
import { runPollLoop } from './poll-loop.js';
|
||||||
|
|
||||||
@@ -45,9 +45,6 @@ async function main(): Promise<void> {
|
|||||||
|
|
||||||
const provider = createProvider(providerName, { assistantName });
|
const provider = createProvider(providerName, { assistantName });
|
||||||
|
|
||||||
// Load destination map (written by host on every wake)
|
|
||||||
loadDestinations();
|
|
||||||
|
|
||||||
// Load global CLAUDE.md as additional system context, then append destinations addendum
|
// Load global CLAUDE.md as additional system context, then append destinations addendum
|
||||||
let systemPrompt: string | undefined;
|
let systemPrompt: string | undefined;
|
||||||
if (fs.existsSync(GLOBAL_CLAUDE_MD)) {
|
if (fs.existsSync(GLOBAL_CLAUDE_MD)) {
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||||
|
|
||||||
import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js';
|
import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js';
|
||||||
import { setDestinationsForTest } from './destinations.js';
|
|
||||||
import { getUndeliveredMessages } from './db/messages-out.js';
|
import { getUndeliveredMessages } from './db/messages-out.js';
|
||||||
import { getPendingMessages } from './db/messages-in.js';
|
import { getPendingMessages } from './db/messages-in.js';
|
||||||
import { MockProvider } from './providers/mock.js';
|
import { MockProvider } from './providers/mock.js';
|
||||||
@@ -9,21 +8,17 @@ import { runPollLoop } from './poll-loop.js';
|
|||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
initTestSessionDb();
|
initTestSessionDb();
|
||||||
// Provide a test destination map so output parsing can resolve "discord-test" → routing
|
// Seed a destination so output parsing can resolve "discord-test" → routing
|
||||||
setDestinationsForTest([
|
getInboundDb()
|
||||||
{
|
.prepare(
|
||||||
name: 'discord-test',
|
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||||
displayName: 'Discord Test',
|
VALUES ('discord-test', 'Discord Test', 'channel', 'discord', 'chan-1', NULL)`,
|
||||||
type: 'channel',
|
)
|
||||||
channelType: 'discord',
|
.run();
|
||||||
platformId: 'chan-1',
|
|
||||||
},
|
|
||||||
]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
closeSessionDb();
|
closeSessionDb();
|
||||||
setDestinationsForTest([]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
function insertMessage(id: string, content: object, opts?: { platformId?: string; channelType?: string; threadId?: string }) {
|
function insertMessage(id: string, content: object, opts?: { platformId?: string; channelType?: string; threadId?: string }) {
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import { Server } from '@modelcontextprotocol/sdk/server/index.js';
|
|||||||
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
|
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
|
||||||
import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js';
|
import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js';
|
||||||
|
|
||||||
import { loadDestinations } from '../destinations.js';
|
|
||||||
import type { McpToolDefinition } from './types.js';
|
import type { McpToolDefinition } from './types.js';
|
||||||
import { coreTools } from './core.js';
|
import { coreTools } from './core.js';
|
||||||
import { schedulingTools } from './scheduling.js';
|
import { schedulingTools } from './scheduling.js';
|
||||||
@@ -21,10 +20,6 @@ function log(msg: string): void {
|
|||||||
console.error(`[mcp-tools] ${msg}`);
|
console.error(`[mcp-tools] ${msg}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the destination map — this process is spawned fresh for each container
|
|
||||||
// wake, so the map file is always fresh (written by the host before spawn).
|
|
||||||
loadDestinations();
|
|
||||||
|
|
||||||
// Only admin agents get the create_agent tool. Non-admins never see it in the
|
// Only admin agents get the create_agent tool. Non-admins never see it in the
|
||||||
// listTools response; the host also re-checks permission on receive as defense
|
// listTools response; the host also re-checks permission on receive as defense
|
||||||
// in depth (see delivery.ts create_agent handler).
|
// in depth (see delivery.ts create_agent handler).
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ import {
|
|||||||
markContainerRunning,
|
markContainerRunning,
|
||||||
markContainerStopped,
|
markContainerStopped,
|
||||||
sessionDir,
|
sessionDir,
|
||||||
writeDestinationsFile,
|
writeDestinations,
|
||||||
} from './session-manager.js';
|
} from './session-manager.js';
|
||||||
import type { AgentGroup, Session } from './types.js';
|
import type { AgentGroup, Session } from './types.js';
|
||||||
|
|
||||||
@@ -59,8 +59,8 @@ export async function wakeContainer(session: Session): Promise<void> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Refresh the destination map file so any admin changes take effect on wake
|
// Refresh the destination map so any admin changes take effect on wake
|
||||||
writeDestinationsFile(agentGroup.id, session.id);
|
writeDestinations(agentGroup.id, session.id);
|
||||||
|
|
||||||
const mounts = buildMounts(agentGroup, session);
|
const mounts = buildMounts(agentGroup, session);
|
||||||
const containerName = `nanoclaw-v2-${agentGroup.folder}-${Date.now()}`;
|
const containerName = `nanoclaw-v2-${agentGroup.folder}-${Date.now()}`;
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ CREATE TABLE pending_questions (
|
|||||||
* outbound.db — container writes, host reads (read-only open)
|
* outbound.db — container writes, host reads (read-only open)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/** Host-owned: inbound messages + delivery tracking. */
|
/** Host-owned: inbound messages + delivery tracking + destination map. */
|
||||||
export const INBOUND_SCHEMA = `
|
export const INBOUND_SCHEMA = `
|
||||||
CREATE TABLE messages_in (
|
CREATE TABLE messages_in (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
@@ -101,6 +101,19 @@ CREATE TABLE delivered (
|
|||||||
status TEXT NOT NULL DEFAULT 'delivered',
|
status TEXT NOT NULL DEFAULT 'delivered',
|
||||||
delivered_at TEXT NOT NULL
|
delivered_at TEXT NOT NULL
|
||||||
);
|
);
|
||||||
|
|
||||||
|
-- Destination map for this session's agent.
|
||||||
|
-- Host overwrites on every container wake AND on demand (admin rewires, new child agents, etc.).
|
||||||
|
-- Container queries this live on every lookup, so admin changes take effect
|
||||||
|
-- mid-session without requiring a container restart.
|
||||||
|
CREATE TABLE destinations (
|
||||||
|
name TEXT PRIMARY KEY,
|
||||||
|
display_name TEXT,
|
||||||
|
type TEXT NOT NULL, -- 'channel' | 'agent'
|
||||||
|
channel_type TEXT, -- for type='channel'
|
||||||
|
platform_id TEXT, -- for type='channel'
|
||||||
|
agent_group_id TEXT -- for type='agent'
|
||||||
|
);
|
||||||
`;
|
`;
|
||||||
|
|
||||||
/** Container-owned: outbound messages + processing acknowledgments. */
|
/** Container-owned: outbound messages + processing acknowledgments. */
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ import {
|
|||||||
sessionDir,
|
sessionDir,
|
||||||
inboundDbPath,
|
inboundDbPath,
|
||||||
resolveSession,
|
resolveSession,
|
||||||
|
writeDestinations,
|
||||||
writeSessionMessage,
|
writeSessionMessage,
|
||||||
writeSystemResponse,
|
writeSystemResponse,
|
||||||
} from './session-manager.js';
|
} from './session-manager.js';
|
||||||
@@ -611,6 +612,10 @@ async function handleSystemAction(
|
|||||||
created_at: now,
|
created_at: now,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Refresh the creator's destination map so the new child appears
|
||||||
|
// immediately on the next query — no restart needed.
|
||||||
|
writeDestinations(session.agent_group_id, session.id);
|
||||||
|
|
||||||
// Fire-and-forget notification back to the creator
|
// Fire-and-forget notification back to the creator
|
||||||
notifyAgent(
|
notifyAgent(
|
||||||
session,
|
session,
|
||||||
|
|||||||
@@ -132,43 +132,73 @@ export function initSessionFolder(agentGroupId: string, sessionId: string): void
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write the destination map file into the session folder.
|
* Write the session's destination map into its inbound.db `destinations` table.
|
||||||
* Called before every container wake so admin changes take effect on next start.
|
*
|
||||||
* The container loads this at startup to know what destinations exist.
|
* Called before every container wake so admin changes take effect on next start —
|
||||||
|
* but the container also re-queries on demand, so mid-session admin changes
|
||||||
|
* (e.g. spawning a new child agent) can also call this to push the new map
|
||||||
|
* without restarting the container.
|
||||||
|
*
|
||||||
|
* Uses DELETE + INSERT in a transaction for a clean overwrite.
|
||||||
*/
|
*/
|
||||||
export function writeDestinationsFile(agentGroupId: string, sessionId: string): void {
|
export function writeDestinations(agentGroupId: string, sessionId: string): void {
|
||||||
const dir = sessionDir(agentGroupId, sessionId);
|
const dbPath = inboundDbPath(agentGroupId, sessionId);
|
||||||
if (!fs.existsSync(dir)) return;
|
if (!fs.existsSync(dbPath)) return;
|
||||||
|
|
||||||
const rows = getDestinations(agentGroupId);
|
const rows = getDestinations(agentGroupId);
|
||||||
const destinations: Array<Record<string, unknown>> = [];
|
type DestRow = {
|
||||||
|
name: string;
|
||||||
|
display_name: string | null;
|
||||||
|
type: 'channel' | 'agent';
|
||||||
|
channel_type: string | null;
|
||||||
|
platform_id: string | null;
|
||||||
|
agent_group_id: string | null;
|
||||||
|
};
|
||||||
|
const resolved: DestRow[] = [];
|
||||||
|
|
||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
if (row.target_type === 'channel') {
|
if (row.target_type === 'channel') {
|
||||||
const mg = getMessagingGroup(row.target_id);
|
const mg = getMessagingGroup(row.target_id);
|
||||||
if (!mg) continue;
|
if (!mg) continue;
|
||||||
destinations.push({
|
resolved.push({
|
||||||
name: row.local_name,
|
name: row.local_name,
|
||||||
displayName: mg.name ?? row.local_name,
|
display_name: mg.name ?? row.local_name,
|
||||||
type: 'channel',
|
type: 'channel',
|
||||||
channelType: mg.channel_type,
|
channel_type: mg.channel_type,
|
||||||
platformId: mg.platform_id,
|
platform_id: mg.platform_id,
|
||||||
|
agent_group_id: null,
|
||||||
});
|
});
|
||||||
} else if (row.target_type === 'agent') {
|
} else if (row.target_type === 'agent') {
|
||||||
const ag = getAgentGroup(row.target_id);
|
const ag = getAgentGroup(row.target_id);
|
||||||
if (!ag) continue;
|
if (!ag) continue;
|
||||||
destinations.push({
|
resolved.push({
|
||||||
name: row.local_name,
|
name: row.local_name,
|
||||||
displayName: ag.name,
|
display_name: ag.name,
|
||||||
type: 'agent',
|
type: 'agent',
|
||||||
agentGroupId: ag.id,
|
channel_type: null,
|
||||||
|
platform_id: null,
|
||||||
|
agent_group_id: ag.id,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const filePath = path.join(dir, '.nanoclaw-destinations.json');
|
const db = new Database(dbPath);
|
||||||
fs.writeFileSync(filePath, JSON.stringify({ destinations }, null, 2));
|
db.pragma('journal_mode = DELETE');
|
||||||
log.debug('Destination map written', { sessionId, count: destinations.length });
|
db.pragma('busy_timeout = 5000');
|
||||||
|
try {
|
||||||
|
const tx = db.transaction((entries: DestRow[]) => {
|
||||||
|
db.prepare('DELETE FROM destinations').run();
|
||||||
|
const stmt = db.prepare(
|
||||||
|
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||||
|
VALUES (@name, @display_name, @type, @channel_type, @platform_id, @agent_group_id)`,
|
||||||
|
);
|
||||||
|
for (const e of entries) stmt.run(e);
|
||||||
|
});
|
||||||
|
tx(resolved);
|
||||||
|
} finally {
|
||||||
|
db.close();
|
||||||
|
}
|
||||||
|
log.debug('Destination map written', { sessionId, count: resolved.length });
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Write a message to a session's inbound DB (messages_in). Host-only. */
|
/** Write a message to a session's inbound DB (messages_in). Host-only. */
|
||||||
|
|||||||
Reference in New Issue
Block a user