feat: race-free on-wake messages and explicit restart CLI
Decouple container restart from config updates — config CLI ops now only write to the DB; restart is a separate `ncl groups restart` command with --rebuild and --message flags. Add on_wake column to messages_in so wake messages are only picked up by a fresh container's first poll, preventing dying containers from stealing them during the SIGTERM grace window. killContainer accepts an onExit callback for race-free respawn. Agent- called restart auto-scopes to the calling session. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -196,7 +196,8 @@ export function initTestSessionDb(): { inbound: Database; outbound: Database } {
|
||||
platform_id TEXT,
|
||||
channel_type TEXT,
|
||||
thread_id TEXT,
|
||||
content TEXT NOT NULL
|
||||
content TEXT NOT NULL,
|
||||
on_wake INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
CREATE TABLE delivered (
|
||||
message_out_id TEXT PRIMARY KEY,
|
||||
|
||||
@@ -49,7 +49,7 @@ function getMaxMessagesPerPrompt(): number {
|
||||
* sees the prior context it missed. Host's countDueMessages gates waking on
|
||||
* trigger=1 separately (see src/db/session-db.ts).
|
||||
*/
|
||||
export function getPendingMessages(): MessageInRow[] {
|
||||
export function getPendingMessages(isFirstPoll = false): MessageInRow[] {
|
||||
const inbound = openInboundDb();
|
||||
const outbound = getOutboundDb();
|
||||
|
||||
@@ -59,10 +59,11 @@ export function getPendingMessages(): MessageInRow[] {
|
||||
`SELECT * FROM messages_in
|
||||
WHERE status = 'pending'
|
||||
AND (process_after IS NULL OR datetime(process_after) <= datetime('now'))
|
||||
AND (on_wake = 0 OR ?1 = 1)
|
||||
ORDER BY seq DESC
|
||||
LIMIT ?`,
|
||||
LIMIT ?2`,
|
||||
)
|
||||
.all(getMaxMessagesPerPrompt()) as MessageInRow[];
|
||||
.all(isFirstPoll ? 1 : 0, getMaxMessagesPerPrompt()) as MessageInRow[];
|
||||
|
||||
if (pending.length === 0) return [];
|
||||
|
||||
|
||||
@@ -14,13 +14,18 @@ afterEach(() => {
|
||||
closeSessionDb();
|
||||
});
|
||||
|
||||
function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string; trigger?: 0 | 1 }) {
|
||||
function insertMessage(
|
||||
id: string,
|
||||
kind: string,
|
||||
content: object,
|
||||
opts?: { processAfter?: string; trigger?: 0 | 1; onWake?: 0 | 1 },
|
||||
) {
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO messages_in (id, kind, timestamp, status, process_after, trigger, content)
|
||||
VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?)`,
|
||||
`INSERT INTO messages_in (id, kind, timestamp, status, process_after, trigger, on_wake, content)
|
||||
VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?, ?)`,
|
||||
)
|
||||
.run(id, kind, opts?.processAfter ?? null, opts?.trigger ?? 1, JSON.stringify(content));
|
||||
.run(id, kind, opts?.processAfter ?? null, opts?.trigger ?? 1, opts?.onWake ?? 0, JSON.stringify(content));
|
||||
}
|
||||
|
||||
describe('formatter', () => {
|
||||
@@ -131,6 +136,58 @@ describe('accumulate gate (trigger column)', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('on_wake filtering', () => {
|
||||
it('first poll returns on_wake=1 messages', () => {
|
||||
insertMessage('m1', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 });
|
||||
const messages = getPendingMessages(true);
|
||||
expect(messages).toHaveLength(1);
|
||||
expect(messages[0].id).toBe('m1');
|
||||
});
|
||||
|
||||
it('subsequent polls skip on_wake=1 messages', () => {
|
||||
insertMessage('m1', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 });
|
||||
const messages = getPendingMessages(false);
|
||||
expect(messages).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('normal messages returned regardless of isFirstPoll', () => {
|
||||
insertMessage('m1', 'chat', { sender: 'A', text: 'hello' });
|
||||
expect(getPendingMessages(true)).toHaveLength(1);
|
||||
|
||||
// Reset: mark completed so we can re-test with a fresh message
|
||||
markCompleted(['m1']);
|
||||
insertMessage('m2', 'chat', { sender: 'A', text: 'hello again' });
|
||||
expect(getPendingMessages(false)).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('mixed batch: first poll returns both normal and on_wake messages', () => {
|
||||
insertMessage('m1', 'chat', { sender: 'A', text: 'user msg' });
|
||||
insertMessage('m2', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 });
|
||||
const messages = getPendingMessages(true);
|
||||
expect(messages).toHaveLength(2);
|
||||
expect(messages.map((m) => m.id).sort()).toEqual(['m1', 'm2']);
|
||||
});
|
||||
|
||||
it('mixed batch: subsequent poll returns only normal messages', () => {
|
||||
insertMessage('m1', 'chat', { sender: 'A', text: 'user msg' });
|
||||
insertMessage('m2', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 });
|
||||
const messages = getPendingMessages(false);
|
||||
expect(messages).toHaveLength(1);
|
||||
expect(messages[0].id).toBe('m1');
|
||||
});
|
||||
|
||||
it('on_wake defaults to 0 for inserts without explicit value', () => {
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO messages_in (id, kind, timestamp, status, content)
|
||||
VALUES ('m1', 'chat', datetime('now'), 'pending', '{"text":"hi"}')`,
|
||||
)
|
||||
.run();
|
||||
// Should be returned even on non-first poll (on_wake=0)
|
||||
expect(getPendingMessages(false)).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('routing', () => {
|
||||
it('should extract routing from messages', () => {
|
||||
getInboundDb()
|
||||
|
||||
@@ -67,9 +67,11 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
||||
clearStaleProcessingAcks();
|
||||
|
||||
let pollCount = 0;
|
||||
let isFirstPoll = true;
|
||||
while (true) {
|
||||
// Skip system messages — they're responses for MCP tools (e.g., ask_user_question)
|
||||
const messages = getPendingMessages().filter((m) => m.kind !== 'system');
|
||||
const messages = getPendingMessages(isFirstPoll).filter((m) => m.kind !== 'system');
|
||||
isFirstPoll = false;
|
||||
pollCount++;
|
||||
|
||||
// Periodic heartbeat so we know the loop is alive
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import type { McpServerConfig } from '../../container-config.js';
|
||||
import { buildAgentGroupImage, killContainer, wakeContainer } from '../../container-runner.js';
|
||||
import { restartAgentGroupContainers } from '../../container-restart.js';
|
||||
import { getSession } from '../../db/sessions.js';
|
||||
import { writeSessionMessage } from '../../session-manager.js';
|
||||
import {
|
||||
getContainerConfig,
|
||||
updateContainerConfigScalars,
|
||||
@@ -54,6 +57,47 @@ registerResource({
|
||||
],
|
||||
operations: { list: 'open', get: 'open', create: 'approval', update: 'approval', delete: 'approval' },
|
||||
customOperations: {
|
||||
'restart': {
|
||||
access: 'approval',
|
||||
description:
|
||||
'Restart containers for a group. Use --id <group-id> [--rebuild] [--message <text>]. ' +
|
||||
'From inside a container, --id is auto-filled and only the calling session is restarted. ' +
|
||||
'--rebuild rebuilds the container image first. --message sets an on-wake message for the fresh container; ' +
|
||||
'if omitted, containers come back on the next user message.',
|
||||
handler: async (args, ctx) => {
|
||||
const id = (args.id as string) || (ctx.caller === 'agent' ? ctx.agentGroupId : undefined);
|
||||
if (!id) throw new Error('--id is required');
|
||||
if (args.rebuild) {
|
||||
await buildAgentGroupImage(id);
|
||||
}
|
||||
const message = args.message as string | undefined;
|
||||
|
||||
// From an agent: scope to the calling session only
|
||||
if (ctx.caller === 'agent') {
|
||||
if (message) {
|
||||
writeSessionMessage(id, ctx.sessionId, {
|
||||
id: `restart-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: id,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: JSON.stringify({ text: message, sender: 'system', senderId: 'system' }),
|
||||
onWake: 1,
|
||||
});
|
||||
}
|
||||
killContainer(ctx.sessionId, 'restarted via ncl', message ? () => {
|
||||
const s = getSession(ctx.sessionId);
|
||||
if (s) wakeContainer(s);
|
||||
} : undefined);
|
||||
return { restarted: 1, rebuilt: !!args.rebuild };
|
||||
}
|
||||
|
||||
// From the host: restart all running containers in the group
|
||||
const count = restartAgentGroupContainers(id, 'restarted via ncl', message);
|
||||
return { restarted: count, rebuilt: !!args.rebuild };
|
||||
},
|
||||
},
|
||||
'config get': {
|
||||
access: 'open',
|
||||
description: 'Show the container config for a group. Use --id <group-id>.',
|
||||
@@ -96,7 +140,6 @@ registerResource({
|
||||
}
|
||||
|
||||
updateContainerConfigScalars(id, updates);
|
||||
restartAgentGroupContainers(id, 'config updated via ncl');
|
||||
|
||||
const updated = getContainerConfig(id)!;
|
||||
return presentConfig(updated);
|
||||
@@ -124,7 +167,6 @@ registerResource({
|
||||
env: args.env ? (JSON.parse(args.env as string) as Record<string, string>) : {},
|
||||
};
|
||||
updateContainerConfigJson(id, 'mcp_servers', servers);
|
||||
restartAgentGroupContainers(id, `mcp server "${name}" added via ncl`);
|
||||
|
||||
return { added: name, servers };
|
||||
},
|
||||
@@ -145,7 +187,6 @@ registerResource({
|
||||
if (!servers[name]) throw new Error(`MCP server "${name}" not found`);
|
||||
delete servers[name];
|
||||
updateContainerConfigJson(id, 'mcp_servers', servers);
|
||||
restartAgentGroupContainers(id, `mcp server "${name}" removed via ncl`);
|
||||
|
||||
return { removed: name };
|
||||
},
|
||||
@@ -179,9 +220,10 @@ registerResource({
|
||||
}
|
||||
}
|
||||
|
||||
restartAgentGroupContainers(id, 'package added via ncl');
|
||||
|
||||
return { added: { apt: apt || null, npm: npm || null }, note: 'Image rebuild required for packages to take effect. Use install_packages from the agent or rebuild manually.' };
|
||||
return {
|
||||
added: { apt: apt || null, npm: npm || null },
|
||||
note: 'Image rebuild required for packages to take effect. Use install_packages from the agent or rebuild manually.',
|
||||
};
|
||||
},
|
||||
},
|
||||
'config remove-package': {
|
||||
@@ -209,9 +251,10 @@ registerResource({
|
||||
updateContainerConfigJson(id, 'packages_npm', filtered);
|
||||
}
|
||||
|
||||
restartAgentGroupContainers(id, 'package removed via ncl');
|
||||
|
||||
return { removed: { apt: apt || null, npm: npm || null }, note: 'Image rebuild required for package changes to take effect.' };
|
||||
return {
|
||||
removed: { apt: apt || null, npm: npm || null },
|
||||
note: 'Image rebuild required for package changes to take effect.',
|
||||
};
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
161
src/container-restart.test.ts
Normal file
161
src/container-restart.test.ts
Normal file
@@ -0,0 +1,161 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
|
||||
// --- Mocks ---
|
||||
|
||||
vi.mock('./log.js', () => ({
|
||||
log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() },
|
||||
}));
|
||||
|
||||
const mockIsContainerRunning = vi.fn<(id: string) => boolean>();
|
||||
const mockKillContainer = vi.fn<(id: string, reason: string, onExit?: () => void) => void>();
|
||||
const mockWakeContainer = vi.fn();
|
||||
vi.mock('./container-runner.js', () => ({
|
||||
isContainerRunning: (...args: unknown[]) => mockIsContainerRunning(args[0] as string),
|
||||
killContainer: (...args: unknown[]) => mockKillContainer(args[0] as string, args[1] as string, args[2] as (() => void) | undefined),
|
||||
wakeContainer: (...args: unknown[]) => mockWakeContainer(...args),
|
||||
}));
|
||||
|
||||
const mockGetSessionsByAgentGroup = vi.fn();
|
||||
const mockGetSession = vi.fn();
|
||||
vi.mock('./db/sessions.js', () => ({
|
||||
getSessionsByAgentGroup: (...args: unknown[]) => mockGetSessionsByAgentGroup(...args),
|
||||
getSession: (...args: unknown[]) => mockGetSession(...args),
|
||||
}));
|
||||
|
||||
const mockWriteSessionMessage = vi.fn();
|
||||
vi.mock('./session-manager.js', () => ({
|
||||
writeSessionMessage: (...args: unknown[]) => mockWriteSessionMessage(...args),
|
||||
}));
|
||||
|
||||
import { restartAgentGroupContainers } from './container-restart.js';
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
// --- Helpers ---
|
||||
|
||||
function makeSession(id: string, agentGroupId: string, status = 'active') {
|
||||
return { id, agent_group_id: agentGroupId, status };
|
||||
}
|
||||
|
||||
// --- Tests ---
|
||||
|
||||
describe('restartAgentGroupContainers', () => {
|
||||
it('skips sessions without a running container', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([
|
||||
makeSession('s1', 'g1'),
|
||||
makeSession('s2', 'g1'),
|
||||
]);
|
||||
mockIsContainerRunning.mockReturnValue(false);
|
||||
|
||||
const count = restartAgentGroupContainers('g1', 'test');
|
||||
|
||||
expect(count).toBe(0);
|
||||
expect(mockKillContainer).not.toHaveBeenCalled();
|
||||
expect(mockWriteSessionMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('skips non-active sessions', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([
|
||||
makeSession('s1', 'g1', 'closed'),
|
||||
]);
|
||||
mockIsContainerRunning.mockReturnValue(true);
|
||||
|
||||
const count = restartAgentGroupContainers('g1', 'test');
|
||||
|
||||
expect(count).toBe(0);
|
||||
expect(mockKillContainer).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('kills running containers and returns count', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([
|
||||
makeSession('s1', 'g1'),
|
||||
makeSession('s2', 'g1'),
|
||||
]);
|
||||
mockIsContainerRunning.mockImplementation((id) => id === 's1');
|
||||
|
||||
const count = restartAgentGroupContainers('g1', 'test');
|
||||
|
||||
expect(count).toBe(1);
|
||||
expect(mockKillContainer).toHaveBeenCalledTimes(1);
|
||||
expect(mockKillContainer).toHaveBeenCalledWith('s1', 'test', undefined);
|
||||
});
|
||||
|
||||
it('does not write wake message when wakeMessage is omitted', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]);
|
||||
mockIsContainerRunning.mockReturnValue(true);
|
||||
|
||||
restartAgentGroupContainers('g1', 'test');
|
||||
|
||||
expect(mockWriteSessionMessage).not.toHaveBeenCalled();
|
||||
expect(mockKillContainer).toHaveBeenCalledWith('s1', 'test', undefined);
|
||||
});
|
||||
|
||||
it('writes on_wake message and passes onExit callback when wakeMessage is provided', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]);
|
||||
mockIsContainerRunning.mockReturnValue(true);
|
||||
|
||||
restartAgentGroupContainers('g1', 'test', 'Resuming.');
|
||||
|
||||
// Should write an on-wake message
|
||||
expect(mockWriteSessionMessage).toHaveBeenCalledTimes(1);
|
||||
const [agentGroupId, sessionId, msg] = mockWriteSessionMessage.mock.calls[0];
|
||||
expect(agentGroupId).toBe('g1');
|
||||
expect(sessionId).toBe('s1');
|
||||
expect(msg.onWake).toBe(1);
|
||||
expect(JSON.parse(msg.content).text).toBe('Resuming.');
|
||||
|
||||
// Should pass an onExit callback to killContainer
|
||||
expect(mockKillContainer).toHaveBeenCalledTimes(1);
|
||||
const onExit = mockKillContainer.mock.calls[0][2];
|
||||
expect(typeof onExit).toBe('function');
|
||||
});
|
||||
|
||||
it('onExit callback calls wakeContainer with refreshed session', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]);
|
||||
mockIsContainerRunning.mockReturnValue(true);
|
||||
const freshSession = makeSession('s1', 'g1');
|
||||
mockGetSession.mockReturnValue(freshSession);
|
||||
|
||||
restartAgentGroupContainers('g1', 'test', 'Resuming.');
|
||||
|
||||
// Simulate container exit by calling the onExit callback
|
||||
const onExit = mockKillContainer.mock.calls[0][2] as () => void;
|
||||
onExit();
|
||||
|
||||
expect(mockGetSession).toHaveBeenCalledWith('s1');
|
||||
expect(mockWakeContainer).toHaveBeenCalledWith(freshSession);
|
||||
});
|
||||
|
||||
it('onExit callback does not wake if session no longer exists', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]);
|
||||
mockIsContainerRunning.mockReturnValue(true);
|
||||
mockGetSession.mockReturnValue(undefined);
|
||||
|
||||
restartAgentGroupContainers('g1', 'test', 'Resuming.');
|
||||
|
||||
const onExit = mockKillContainer.mock.calls[0][2] as () => void;
|
||||
onExit();
|
||||
|
||||
expect(mockWakeContainer).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('handles multiple running sessions with wake message', () => {
|
||||
mockGetSessionsByAgentGroup.mockReturnValue([
|
||||
makeSession('s1', 'g1'),
|
||||
makeSession('s2', 'g1'),
|
||||
]);
|
||||
mockIsContainerRunning.mockReturnValue(true);
|
||||
|
||||
const count = restartAgentGroupContainers('g1', 'test', 'Config updated.');
|
||||
|
||||
expect(count).toBe(2);
|
||||
expect(mockKillContainer).toHaveBeenCalledTimes(2);
|
||||
expect(mockWriteSessionMessage).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Each session gets its own on-wake message
|
||||
expect(mockWriteSessionMessage.mock.calls[0][1]).toBe('s1');
|
||||
expect(mockWriteSessionMessage.mock.calls[1][1]).toBe('s2');
|
||||
});
|
||||
});
|
||||
@@ -1,44 +1,57 @@
|
||||
/**
|
||||
* Helper to restart all running containers for an agent group.
|
||||
*
|
||||
* Used by:
|
||||
* - self-mod approval handlers (after config change)
|
||||
* - ncl groups config update (after CLI config change)
|
||||
* Writes an on_wake message to each session, kills the container, then
|
||||
* wakes a fresh container via the onExit callback — race-free.
|
||||
*/
|
||||
import { killContainer } from './container-runner.js';
|
||||
import { getSessionsByAgentGroup } from './db/sessions.js';
|
||||
import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js';
|
||||
import { getSession, getSessionsByAgentGroup } from './db/sessions.js';
|
||||
import { log } from './log.js';
|
||||
import { writeSessionMessage } from './session-manager.js';
|
||||
|
||||
/**
|
||||
* Kill all running containers for an agent group and schedule wake messages
|
||||
* so the host sweep respawns them with fresh config.
|
||||
* Kill all running containers for an agent group and respawn them.
|
||||
*
|
||||
* Only targets sessions that actually have a running container.
|
||||
* If `wakeMessage` is provided, each session gets an on_wake message
|
||||
* (picked up only by the fresh container's first poll) and a
|
||||
* wakeContainer call on exit. Without it, containers are killed and
|
||||
* only come back on the next real user message.
|
||||
*/
|
||||
export function restartAgentGroupContainers(agentGroupId: string, reason: string): void {
|
||||
const sessions = getSessionsByAgentGroup(agentGroupId).filter((s) => s.status === 'active');
|
||||
export function restartAgentGroupContainers(
|
||||
agentGroupId: string,
|
||||
reason: string,
|
||||
wakeMessage?: string,
|
||||
): number {
|
||||
const sessions = getSessionsByAgentGroup(agentGroupId).filter(
|
||||
(s) => s.status === 'active' && isContainerRunning(s.id),
|
||||
);
|
||||
|
||||
for (const session of sessions) {
|
||||
killContainer(session.id, reason);
|
||||
writeSessionMessage(agentGroupId, session.id, {
|
||||
id: `restart-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: agentGroupId,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: JSON.stringify({
|
||||
text: `Container restarted: ${reason}. Resuming.`,
|
||||
sender: 'system',
|
||||
senderId: 'system',
|
||||
}),
|
||||
processAfter: new Date(Date.now() + 5000)
|
||||
.toISOString()
|
||||
.replace('T', ' ')
|
||||
.replace(/\.\d+Z$/, ''),
|
||||
});
|
||||
if (wakeMessage) {
|
||||
writeSessionMessage(agentGroupId, session.id, {
|
||||
id: `restart-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: agentGroupId,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: JSON.stringify({
|
||||
text: wakeMessage,
|
||||
sender: 'system',
|
||||
senderId: 'system',
|
||||
}),
|
||||
onWake: 1,
|
||||
});
|
||||
}
|
||||
killContainer(session.id, reason, wakeMessage ? () => {
|
||||
const s = getSession(session.id);
|
||||
if (s) wakeContainer(s);
|
||||
} : undefined);
|
||||
}
|
||||
|
||||
if (sessions.length > 0) {
|
||||
log.info('Restarted agent group containers', { agentGroupId, reason, count: sessions.length });
|
||||
log.info('Restarting agent group containers', { agentGroupId, reason, count: sessions.length });
|
||||
}
|
||||
return sessions.length;
|
||||
}
|
||||
|
||||
@@ -190,10 +190,14 @@ async function spawnContainer(session: Session): Promise<void> {
|
||||
}
|
||||
|
||||
/** Kill a container for a session. */
|
||||
export function killContainer(sessionId: string, reason: string): void {
|
||||
export function killContainer(sessionId: string, reason: string, onExit?: () => void): void {
|
||||
const entry = activeContainers.get(sessionId);
|
||||
if (!entry) return;
|
||||
|
||||
if (onExit) {
|
||||
entry.process.once('close', onExit);
|
||||
}
|
||||
|
||||
log.info('Killing container', { sessionId, reason, containerName: entry.containerName });
|
||||
try {
|
||||
stopContainer(entry.containerName);
|
||||
|
||||
@@ -177,7 +177,10 @@ CREATE TABLE IF NOT EXISTS messages_in (
|
||||
-- the reply routes back to this exact session, not to the source agent
|
||||
-- group's "newest" session. NULL on channel-side inbound and on a2a rows
|
||||
-- written before this column existed.
|
||||
source_session_id TEXT
|
||||
source_session_id TEXT,
|
||||
on_wake INTEGER NOT NULL DEFAULT 0
|
||||
-- 1 = only deliver on the container's first poll (fresh start).
|
||||
-- Dying containers (past first poll) skip these rows.
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id);
|
||||
|
||||
|
||||
@@ -114,14 +114,20 @@ export function insertMessage(
|
||||
* path for the target's reply. NULL on channel-side inbound.
|
||||
*/
|
||||
sourceSessionId?: string | null;
|
||||
/**
|
||||
* 1 = only deliver on the container's first poll (fresh start).
|
||||
* Dying containers (past first poll) skip these rows.
|
||||
*/
|
||||
onWake?: 0 | 1;
|
||||
},
|
||||
): void {
|
||||
db.prepare(
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id)
|
||||
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId)`,
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id, on_wake)
|
||||
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId, @onWake)`,
|
||||
).run({
|
||||
...message,
|
||||
trigger: message.trigger ?? 1,
|
||||
onWake: message.onWake ?? 0,
|
||||
sourceSessionId: message.sourceSessionId ?? null,
|
||||
seq: nextEvenSeq(db),
|
||||
});
|
||||
@@ -318,6 +324,11 @@ export function migrateMessagesInTable(db: Database.Database): void {
|
||||
// their replies fall back to the legacy "newest active session" lookup.
|
||||
db.prepare('ALTER TABLE messages_in ADD COLUMN source_session_id TEXT').run();
|
||||
}
|
||||
if (!cols.has('on_wake')) {
|
||||
// 1 = only deliver on the container's first poll (fresh start).
|
||||
// All existing rows are normal messages, so default 0.
|
||||
db.prepare('ALTER TABLE messages_in ADD COLUMN on_wake INTEGER NOT NULL DEFAULT 0').run();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -4,14 +4,16 @@
|
||||
* The approvals module calls these when an admin clicks Approve on a
|
||||
* pending_approvals row whose action matches. Each handler mutates the
|
||||
* container config in the DB, rebuilds/kills the container as needed,
|
||||
* and lets the host sweep respawn it on the next message.
|
||||
* and writes an on_wake message so the fresh container picks up where
|
||||
* the old one left off.
|
||||
*
|
||||
* install_packages: update DB + rebuild image + kill container.
|
||||
* add_mcp_server: update DB + kill container only.
|
||||
* install_packages: update DB + rebuild image + kill container + on_wake.
|
||||
* add_mcp_server: update DB + kill container + on_wake.
|
||||
*/
|
||||
import { buildAgentGroupImage, killContainer } from '../../container-runner.js';
|
||||
import { buildAgentGroupImage, killContainer, wakeContainer } from '../../container-runner.js';
|
||||
import { getAgentGroup } from '../../db/agent-groups.js';
|
||||
import { getContainerConfig, updateContainerConfigJson } from '../../db/container-configs.js';
|
||||
import { getSession } from '../../db/sessions.js';
|
||||
import type { McpServerConfig } from '../../container-config.js';
|
||||
import { log } from '../../log.js';
|
||||
import { writeSessionMessage } from '../../session-manager.js';
|
||||
@@ -53,7 +55,6 @@ export const applyInstallPackages: ApprovalHandler = async ({ session, payload,
|
||||
log.info('Package install approved', { agentGroupId: session.agent_group_id, userId });
|
||||
try {
|
||||
await buildAgentGroupImage(session.agent_group_id);
|
||||
killContainer(session.id, 'rebuild applied');
|
||||
writeSessionMessage(session.agent_group_id, session.id, {
|
||||
id: `appr-note-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
@@ -66,10 +67,11 @@ export const applyInstallPackages: ApprovalHandler = async ({ session, payload,
|
||||
sender: 'system',
|
||||
senderId: 'system',
|
||||
}),
|
||||
processAfter: new Date(Date.now() + 5000)
|
||||
.toISOString()
|
||||
.replace('T', ' ')
|
||||
.replace(/\.\d+Z$/, ''),
|
||||
onWake: 1,
|
||||
});
|
||||
killContainer(session.id, 'rebuild applied', () => {
|
||||
const s = getSession(session.id);
|
||||
if (s) wakeContainer(s);
|
||||
});
|
||||
log.info('Container rebuild completed (bundled with install)', { agentGroupId: session.agent_group_id });
|
||||
} catch (e) {
|
||||
@@ -102,7 +104,23 @@ export const applyAddMcpServer: ApprovalHandler = async ({ session, payload, use
|
||||
};
|
||||
updateContainerConfigJson(agentGroup.id, 'mcp_servers', servers);
|
||||
|
||||
killContainer(session.id, 'mcp server added');
|
||||
notify(`MCP server "${payload.name}" added. Your container will restart with it on the next message.`);
|
||||
writeSessionMessage(session.agent_group_id, session.id, {
|
||||
id: `appr-note-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: session.agent_group_id,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: JSON.stringify({
|
||||
text: `MCP server "${payload.name}" added. Verify it's available (e.g. list your tools) and report the result to the user.`,
|
||||
sender: 'system',
|
||||
senderId: 'system',
|
||||
}),
|
||||
onWake: 1,
|
||||
});
|
||||
killContainer(session.id, 'mcp server added', () => {
|
||||
const s = getSession(session.id);
|
||||
if (s) wakeContainer(s);
|
||||
});
|
||||
log.info('MCP server add approved', { agentGroupId: session.agent_group_id, userId });
|
||||
};
|
||||
|
||||
@@ -216,6 +216,11 @@ export function writeSessionMessage(
|
||||
* path so the target's reply routes back to that exact session.
|
||||
*/
|
||||
sourceSessionId?: string | null;
|
||||
/**
|
||||
* 1 = only deliver on the container's first poll (fresh start).
|
||||
* Dying containers (past first poll) skip these rows.
|
||||
*/
|
||||
onWake?: 0 | 1;
|
||||
},
|
||||
): void {
|
||||
// Extract base64 attachment data, save to inbox, replace with file paths
|
||||
@@ -235,6 +240,7 @@ export function writeSessionMessage(
|
||||
recurrence: message.recurrence ?? null,
|
||||
trigger: message.trigger ?? 1,
|
||||
sourceSessionId: message.sourceSessionId ?? null,
|
||||
onWake: message.onWake ?? 0,
|
||||
});
|
||||
} finally {
|
||||
db.close();
|
||||
|
||||
Reference in New Issue
Block a user