diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 871e43a..51a82d7 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -196,7 +196,8 @@ export function initTestSessionDb(): { inbound: Database; outbound: Database } { platform_id TEXT, channel_type TEXT, thread_id TEXT, - content TEXT NOT NULL + content TEXT NOT NULL, + on_wake INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE delivered ( message_out_id TEXT PRIMARY KEY, diff --git a/container/agent-runner/src/db/messages-in.ts b/container/agent-runner/src/db/messages-in.ts index 88906ed..d3a1a33 100644 --- a/container/agent-runner/src/db/messages-in.ts +++ b/container/agent-runner/src/db/messages-in.ts @@ -49,7 +49,7 @@ function getMaxMessagesPerPrompt(): number { * sees the prior context it missed. Host's countDueMessages gates waking on * trigger=1 separately (see src/db/session-db.ts). */ -export function getPendingMessages(): MessageInRow[] { +export function getPendingMessages(isFirstPoll = false): MessageInRow[] { const inbound = openInboundDb(); const outbound = getOutboundDb(); @@ -59,10 +59,11 @@ export function getPendingMessages(): MessageInRow[] { `SELECT * FROM messages_in WHERE status = 'pending' AND (process_after IS NULL OR datetime(process_after) <= datetime('now')) + AND (on_wake = 0 OR ?1 = 1) ORDER BY seq DESC - LIMIT ?`, + LIMIT ?2`, ) - .all(getMaxMessagesPerPrompt()) as MessageInRow[]; + .all(isFirstPoll ? 1 : 0, getMaxMessagesPerPrompt()) as MessageInRow[]; if (pending.length === 0) return []; diff --git a/container/agent-runner/src/poll-loop.test.ts b/container/agent-runner/src/poll-loop.test.ts index 82f9f75..29b769b 100644 --- a/container/agent-runner/src/poll-loop.test.ts +++ b/container/agent-runner/src/poll-loop.test.ts @@ -14,13 +14,18 @@ afterEach(() => { closeSessionDb(); }); -function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string; trigger?: 0 | 1 }) { +function insertMessage( + id: string, + kind: string, + content: object, + opts?: { processAfter?: string; trigger?: 0 | 1; onWake?: 0 | 1 }, +) { getInboundDb() .prepare( - `INSERT INTO messages_in (id, kind, timestamp, status, process_after, trigger, content) - VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?)`, + `INSERT INTO messages_in (id, kind, timestamp, status, process_after, trigger, on_wake, content) + VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?, ?)`, ) - .run(id, kind, opts?.processAfter ?? null, opts?.trigger ?? 1, JSON.stringify(content)); + .run(id, kind, opts?.processAfter ?? null, opts?.trigger ?? 1, opts?.onWake ?? 0, JSON.stringify(content)); } describe('formatter', () => { @@ -131,6 +136,58 @@ describe('accumulate gate (trigger column)', () => { }); }); +describe('on_wake filtering', () => { + it('first poll returns on_wake=1 messages', () => { + insertMessage('m1', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 }); + const messages = getPendingMessages(true); + expect(messages).toHaveLength(1); + expect(messages[0].id).toBe('m1'); + }); + + it('subsequent polls skip on_wake=1 messages', () => { + insertMessage('m1', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 }); + const messages = getPendingMessages(false); + expect(messages).toHaveLength(0); + }); + + it('normal messages returned regardless of isFirstPoll', () => { + insertMessage('m1', 'chat', { sender: 'A', text: 'hello' }); + expect(getPendingMessages(true)).toHaveLength(1); + + // Reset: mark completed so we can re-test with a fresh message + markCompleted(['m1']); + insertMessage('m2', 'chat', { sender: 'A', text: 'hello again' }); + expect(getPendingMessages(false)).toHaveLength(1); + }); + + it('mixed batch: first poll returns both normal and on_wake messages', () => { + insertMessage('m1', 'chat', { sender: 'A', text: 'user msg' }); + insertMessage('m2', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 }); + const messages = getPendingMessages(true); + expect(messages).toHaveLength(2); + expect(messages.map((m) => m.id).sort()).toEqual(['m1', 'm2']); + }); + + it('mixed batch: subsequent poll returns only normal messages', () => { + insertMessage('m1', 'chat', { sender: 'A', text: 'user msg' }); + insertMessage('m2', 'chat', { sender: 'system', text: 'Resuming.' }, { onWake: 1 }); + const messages = getPendingMessages(false); + expect(messages).toHaveLength(1); + expect(messages[0].id).toBe('m1'); + }); + + it('on_wake defaults to 0 for inserts without explicit value', () => { + getInboundDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, content) + VALUES ('m1', 'chat', datetime('now'), 'pending', '{"text":"hi"}')`, + ) + .run(); + // Should be returned even on non-first poll (on_wake=0) + expect(getPendingMessages(false)).toHaveLength(1); + }); +}); + describe('routing', () => { it('should extract routing from messages', () => { getInboundDb() diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index e0ac722..bbf45be 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -67,9 +67,11 @@ export async function runPollLoop(config: PollLoopConfig): Promise { clearStaleProcessingAcks(); let pollCount = 0; + let isFirstPoll = true; while (true) { // Skip system messages — they're responses for MCP tools (e.g., ask_user_question) - const messages = getPendingMessages().filter((m) => m.kind !== 'system'); + const messages = getPendingMessages(isFirstPoll).filter((m) => m.kind !== 'system'); + isFirstPoll = false; pollCount++; // Periodic heartbeat so we know the loop is alive diff --git a/src/cli/resources/groups.ts b/src/cli/resources/groups.ts index 68d1b53..8ea42b0 100644 --- a/src/cli/resources/groups.ts +++ b/src/cli/resources/groups.ts @@ -1,5 +1,8 @@ import type { McpServerConfig } from '../../container-config.js'; +import { buildAgentGroupImage, killContainer, wakeContainer } from '../../container-runner.js'; import { restartAgentGroupContainers } from '../../container-restart.js'; +import { getSession } from '../../db/sessions.js'; +import { writeSessionMessage } from '../../session-manager.js'; import { getContainerConfig, updateContainerConfigScalars, @@ -54,6 +57,47 @@ registerResource({ ], operations: { list: 'open', get: 'open', create: 'approval', update: 'approval', delete: 'approval' }, customOperations: { + 'restart': { + access: 'approval', + description: + 'Restart containers for a group. Use --id [--rebuild] [--message ]. ' + + 'From inside a container, --id is auto-filled and only the calling session is restarted. ' + + '--rebuild rebuilds the container image first. --message sets an on-wake message for the fresh container; ' + + 'if omitted, containers come back on the next user message.', + handler: async (args, ctx) => { + const id = (args.id as string) || (ctx.caller === 'agent' ? ctx.agentGroupId : undefined); + if (!id) throw new Error('--id is required'); + if (args.rebuild) { + await buildAgentGroupImage(id); + } + const message = args.message as string | undefined; + + // From an agent: scope to the calling session only + if (ctx.caller === 'agent') { + if (message) { + writeSessionMessage(id, ctx.sessionId, { + id: `restart-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'chat', + timestamp: new Date().toISOString(), + platformId: id, + channelType: 'agent', + threadId: null, + content: JSON.stringify({ text: message, sender: 'system', senderId: 'system' }), + onWake: 1, + }); + } + killContainer(ctx.sessionId, 'restarted via ncl', message ? () => { + const s = getSession(ctx.sessionId); + if (s) wakeContainer(s); + } : undefined); + return { restarted: 1, rebuilt: !!args.rebuild }; + } + + // From the host: restart all running containers in the group + const count = restartAgentGroupContainers(id, 'restarted via ncl', message); + return { restarted: count, rebuilt: !!args.rebuild }; + }, + }, 'config get': { access: 'open', description: 'Show the container config for a group. Use --id .', @@ -96,7 +140,6 @@ registerResource({ } updateContainerConfigScalars(id, updates); - restartAgentGroupContainers(id, 'config updated via ncl'); const updated = getContainerConfig(id)!; return presentConfig(updated); @@ -124,7 +167,6 @@ registerResource({ env: args.env ? (JSON.parse(args.env as string) as Record) : {}, }; updateContainerConfigJson(id, 'mcp_servers', servers); - restartAgentGroupContainers(id, `mcp server "${name}" added via ncl`); return { added: name, servers }; }, @@ -145,7 +187,6 @@ registerResource({ if (!servers[name]) throw new Error(`MCP server "${name}" not found`); delete servers[name]; updateContainerConfigJson(id, 'mcp_servers', servers); - restartAgentGroupContainers(id, `mcp server "${name}" removed via ncl`); return { removed: name }; }, @@ -179,9 +220,10 @@ registerResource({ } } - restartAgentGroupContainers(id, 'package added via ncl'); - - return { added: { apt: apt || null, npm: npm || null }, note: 'Image rebuild required for packages to take effect. Use install_packages from the agent or rebuild manually.' }; + return { + added: { apt: apt || null, npm: npm || null }, + note: 'Image rebuild required for packages to take effect. Use install_packages from the agent or rebuild manually.', + }; }, }, 'config remove-package': { @@ -209,9 +251,10 @@ registerResource({ updateContainerConfigJson(id, 'packages_npm', filtered); } - restartAgentGroupContainers(id, 'package removed via ncl'); - - return { removed: { apt: apt || null, npm: npm || null }, note: 'Image rebuild required for package changes to take effect.' }; + return { + removed: { apt: apt || null, npm: npm || null }, + note: 'Image rebuild required for package changes to take effect.', + }; }, }, }, diff --git a/src/container-restart.test.ts b/src/container-restart.test.ts new file mode 100644 index 0000000..956df63 --- /dev/null +++ b/src/container-restart.test.ts @@ -0,0 +1,161 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +// --- Mocks --- + +vi.mock('./log.js', () => ({ + log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, +})); + +const mockIsContainerRunning = vi.fn<(id: string) => boolean>(); +const mockKillContainer = vi.fn<(id: string, reason: string, onExit?: () => void) => void>(); +const mockWakeContainer = vi.fn(); +vi.mock('./container-runner.js', () => ({ + isContainerRunning: (...args: unknown[]) => mockIsContainerRunning(args[0] as string), + killContainer: (...args: unknown[]) => mockKillContainer(args[0] as string, args[1] as string, args[2] as (() => void) | undefined), + wakeContainer: (...args: unknown[]) => mockWakeContainer(...args), +})); + +const mockGetSessionsByAgentGroup = vi.fn(); +const mockGetSession = vi.fn(); +vi.mock('./db/sessions.js', () => ({ + getSessionsByAgentGroup: (...args: unknown[]) => mockGetSessionsByAgentGroup(...args), + getSession: (...args: unknown[]) => mockGetSession(...args), +})); + +const mockWriteSessionMessage = vi.fn(); +vi.mock('./session-manager.js', () => ({ + writeSessionMessage: (...args: unknown[]) => mockWriteSessionMessage(...args), +})); + +import { restartAgentGroupContainers } from './container-restart.js'; + +beforeEach(() => { + vi.clearAllMocks(); +}); + +// --- Helpers --- + +function makeSession(id: string, agentGroupId: string, status = 'active') { + return { id, agent_group_id: agentGroupId, status }; +} + +// --- Tests --- + +describe('restartAgentGroupContainers', () => { + it('skips sessions without a running container', () => { + mockGetSessionsByAgentGroup.mockReturnValue([ + makeSession('s1', 'g1'), + makeSession('s2', 'g1'), + ]); + mockIsContainerRunning.mockReturnValue(false); + + const count = restartAgentGroupContainers('g1', 'test'); + + expect(count).toBe(0); + expect(mockKillContainer).not.toHaveBeenCalled(); + expect(mockWriteSessionMessage).not.toHaveBeenCalled(); + }); + + it('skips non-active sessions', () => { + mockGetSessionsByAgentGroup.mockReturnValue([ + makeSession('s1', 'g1', 'closed'), + ]); + mockIsContainerRunning.mockReturnValue(true); + + const count = restartAgentGroupContainers('g1', 'test'); + + expect(count).toBe(0); + expect(mockKillContainer).not.toHaveBeenCalled(); + }); + + it('kills running containers and returns count', () => { + mockGetSessionsByAgentGroup.mockReturnValue([ + makeSession('s1', 'g1'), + makeSession('s2', 'g1'), + ]); + mockIsContainerRunning.mockImplementation((id) => id === 's1'); + + const count = restartAgentGroupContainers('g1', 'test'); + + expect(count).toBe(1); + expect(mockKillContainer).toHaveBeenCalledTimes(1); + expect(mockKillContainer).toHaveBeenCalledWith('s1', 'test', undefined); + }); + + it('does not write wake message when wakeMessage is omitted', () => { + mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]); + mockIsContainerRunning.mockReturnValue(true); + + restartAgentGroupContainers('g1', 'test'); + + expect(mockWriteSessionMessage).not.toHaveBeenCalled(); + expect(mockKillContainer).toHaveBeenCalledWith('s1', 'test', undefined); + }); + + it('writes on_wake message and passes onExit callback when wakeMessage is provided', () => { + mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]); + mockIsContainerRunning.mockReturnValue(true); + + restartAgentGroupContainers('g1', 'test', 'Resuming.'); + + // Should write an on-wake message + expect(mockWriteSessionMessage).toHaveBeenCalledTimes(1); + const [agentGroupId, sessionId, msg] = mockWriteSessionMessage.mock.calls[0]; + expect(agentGroupId).toBe('g1'); + expect(sessionId).toBe('s1'); + expect(msg.onWake).toBe(1); + expect(JSON.parse(msg.content).text).toBe('Resuming.'); + + // Should pass an onExit callback to killContainer + expect(mockKillContainer).toHaveBeenCalledTimes(1); + const onExit = mockKillContainer.mock.calls[0][2]; + expect(typeof onExit).toBe('function'); + }); + + it('onExit callback calls wakeContainer with refreshed session', () => { + mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]); + mockIsContainerRunning.mockReturnValue(true); + const freshSession = makeSession('s1', 'g1'); + mockGetSession.mockReturnValue(freshSession); + + restartAgentGroupContainers('g1', 'test', 'Resuming.'); + + // Simulate container exit by calling the onExit callback + const onExit = mockKillContainer.mock.calls[0][2] as () => void; + onExit(); + + expect(mockGetSession).toHaveBeenCalledWith('s1'); + expect(mockWakeContainer).toHaveBeenCalledWith(freshSession); + }); + + it('onExit callback does not wake if session no longer exists', () => { + mockGetSessionsByAgentGroup.mockReturnValue([makeSession('s1', 'g1')]); + mockIsContainerRunning.mockReturnValue(true); + mockGetSession.mockReturnValue(undefined); + + restartAgentGroupContainers('g1', 'test', 'Resuming.'); + + const onExit = mockKillContainer.mock.calls[0][2] as () => void; + onExit(); + + expect(mockWakeContainer).not.toHaveBeenCalled(); + }); + + it('handles multiple running sessions with wake message', () => { + mockGetSessionsByAgentGroup.mockReturnValue([ + makeSession('s1', 'g1'), + makeSession('s2', 'g1'), + ]); + mockIsContainerRunning.mockReturnValue(true); + + const count = restartAgentGroupContainers('g1', 'test', 'Config updated.'); + + expect(count).toBe(2); + expect(mockKillContainer).toHaveBeenCalledTimes(2); + expect(mockWriteSessionMessage).toHaveBeenCalledTimes(2); + + // Each session gets its own on-wake message + expect(mockWriteSessionMessage.mock.calls[0][1]).toBe('s1'); + expect(mockWriteSessionMessage.mock.calls[1][1]).toBe('s2'); + }); +}); diff --git a/src/container-restart.ts b/src/container-restart.ts index 74d7d4f..6f83531 100644 --- a/src/container-restart.ts +++ b/src/container-restart.ts @@ -1,44 +1,57 @@ /** * Helper to restart all running containers for an agent group. * - * Used by: - * - self-mod approval handlers (after config change) - * - ncl groups config update (after CLI config change) + * Writes an on_wake message to each session, kills the container, then + * wakes a fresh container via the onExit callback — race-free. */ -import { killContainer } from './container-runner.js'; -import { getSessionsByAgentGroup } from './db/sessions.js'; +import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js'; +import { getSession, getSessionsByAgentGroup } from './db/sessions.js'; import { log } from './log.js'; import { writeSessionMessage } from './session-manager.js'; /** - * Kill all running containers for an agent group and schedule wake messages - * so the host sweep respawns them with fresh config. + * Kill all running containers for an agent group and respawn them. + * + * Only targets sessions that actually have a running container. + * If `wakeMessage` is provided, each session gets an on_wake message + * (picked up only by the fresh container's first poll) and a + * wakeContainer call on exit. Without it, containers are killed and + * only come back on the next real user message. */ -export function restartAgentGroupContainers(agentGroupId: string, reason: string): void { - const sessions = getSessionsByAgentGroup(agentGroupId).filter((s) => s.status === 'active'); +export function restartAgentGroupContainers( + agentGroupId: string, + reason: string, + wakeMessage?: string, +): number { + const sessions = getSessionsByAgentGroup(agentGroupId).filter( + (s) => s.status === 'active' && isContainerRunning(s.id), + ); for (const session of sessions) { - killContainer(session.id, reason); - writeSessionMessage(agentGroupId, session.id, { - id: `restart-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, - kind: 'chat', - timestamp: new Date().toISOString(), - platformId: agentGroupId, - channelType: 'agent', - threadId: null, - content: JSON.stringify({ - text: `Container restarted: ${reason}. Resuming.`, - sender: 'system', - senderId: 'system', - }), - processAfter: new Date(Date.now() + 5000) - .toISOString() - .replace('T', ' ') - .replace(/\.\d+Z$/, ''), - }); + if (wakeMessage) { + writeSessionMessage(agentGroupId, session.id, { + id: `restart-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'chat', + timestamp: new Date().toISOString(), + platformId: agentGroupId, + channelType: 'agent', + threadId: null, + content: JSON.stringify({ + text: wakeMessage, + sender: 'system', + senderId: 'system', + }), + onWake: 1, + }); + } + killContainer(session.id, reason, wakeMessage ? () => { + const s = getSession(session.id); + if (s) wakeContainer(s); + } : undefined); } if (sessions.length > 0) { - log.info('Restarted agent group containers', { agentGroupId, reason, count: sessions.length }); + log.info('Restarting agent group containers', { agentGroupId, reason, count: sessions.length }); } + return sessions.length; } diff --git a/src/container-runner.ts b/src/container-runner.ts index cdf93f2..903bd8b 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -190,10 +190,14 @@ async function spawnContainer(session: Session): Promise { } /** Kill a container for a session. */ -export function killContainer(sessionId: string, reason: string): void { +export function killContainer(sessionId: string, reason: string, onExit?: () => void): void { const entry = activeContainers.get(sessionId); if (!entry) return; + if (onExit) { + entry.process.once('close', onExit); + } + log.info('Killing container', { sessionId, reason, containerName: entry.containerName }); try { stopContainer(entry.containerName); diff --git a/src/db/schema.ts b/src/db/schema.ts index 48d9ce3..533ec51 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -177,7 +177,10 @@ CREATE TABLE IF NOT EXISTS messages_in ( -- the reply routes back to this exact session, not to the source agent -- group's "newest" session. NULL on channel-side inbound and on a2a rows -- written before this column existed. - source_session_id TEXT + source_session_id TEXT, + on_wake INTEGER NOT NULL DEFAULT 0 + -- 1 = only deliver on the container's first poll (fresh start). + -- Dying containers (past first poll) skip these rows. ); CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id); diff --git a/src/db/session-db.ts b/src/db/session-db.ts index 6713702..15ba0e4 100644 --- a/src/db/session-db.ts +++ b/src/db/session-db.ts @@ -114,14 +114,20 @@ export function insertMessage( * path for the target's reply. NULL on channel-side inbound. */ sourceSessionId?: string | null; + /** + * 1 = only deliver on the container's first poll (fresh start). + * Dying containers (past first poll) skip these rows. + */ + onWake?: 0 | 1; }, ): void { db.prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id) - VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId)`, + `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id, on_wake) + VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId, @onWake)`, ).run({ ...message, trigger: message.trigger ?? 1, + onWake: message.onWake ?? 0, sourceSessionId: message.sourceSessionId ?? null, seq: nextEvenSeq(db), }); @@ -318,6 +324,11 @@ export function migrateMessagesInTable(db: Database.Database): void { // their replies fall back to the legacy "newest active session" lookup. db.prepare('ALTER TABLE messages_in ADD COLUMN source_session_id TEXT').run(); } + if (!cols.has('on_wake')) { + // 1 = only deliver on the container's first poll (fresh start). + // All existing rows are normal messages, so default 0. + db.prepare('ALTER TABLE messages_in ADD COLUMN on_wake INTEGER NOT NULL DEFAULT 0').run(); + } } /** diff --git a/src/modules/self-mod/apply.ts b/src/modules/self-mod/apply.ts index b9753ab..c5318ff 100644 --- a/src/modules/self-mod/apply.ts +++ b/src/modules/self-mod/apply.ts @@ -4,14 +4,16 @@ * The approvals module calls these when an admin clicks Approve on a * pending_approvals row whose action matches. Each handler mutates the * container config in the DB, rebuilds/kills the container as needed, - * and lets the host sweep respawn it on the next message. + * and writes an on_wake message so the fresh container picks up where + * the old one left off. * - * install_packages: update DB + rebuild image + kill container. - * add_mcp_server: update DB + kill container only. + * install_packages: update DB + rebuild image + kill container + on_wake. + * add_mcp_server: update DB + kill container + on_wake. */ -import { buildAgentGroupImage, killContainer } from '../../container-runner.js'; +import { buildAgentGroupImage, killContainer, wakeContainer } from '../../container-runner.js'; import { getAgentGroup } from '../../db/agent-groups.js'; import { getContainerConfig, updateContainerConfigJson } from '../../db/container-configs.js'; +import { getSession } from '../../db/sessions.js'; import type { McpServerConfig } from '../../container-config.js'; import { log } from '../../log.js'; import { writeSessionMessage } from '../../session-manager.js'; @@ -53,7 +55,6 @@ export const applyInstallPackages: ApprovalHandler = async ({ session, payload, log.info('Package install approved', { agentGroupId: session.agent_group_id, userId }); try { await buildAgentGroupImage(session.agent_group_id); - killContainer(session.id, 'rebuild applied'); writeSessionMessage(session.agent_group_id, session.id, { id: `appr-note-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, kind: 'chat', @@ -66,10 +67,11 @@ export const applyInstallPackages: ApprovalHandler = async ({ session, payload, sender: 'system', senderId: 'system', }), - processAfter: new Date(Date.now() + 5000) - .toISOString() - .replace('T', ' ') - .replace(/\.\d+Z$/, ''), + onWake: 1, + }); + killContainer(session.id, 'rebuild applied', () => { + const s = getSession(session.id); + if (s) wakeContainer(s); }); log.info('Container rebuild completed (bundled with install)', { agentGroupId: session.agent_group_id }); } catch (e) { @@ -102,7 +104,23 @@ export const applyAddMcpServer: ApprovalHandler = async ({ session, payload, use }; updateContainerConfigJson(agentGroup.id, 'mcp_servers', servers); - killContainer(session.id, 'mcp server added'); - notify(`MCP server "${payload.name}" added. Your container will restart with it on the next message.`); + writeSessionMessage(session.agent_group_id, session.id, { + id: `appr-note-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'chat', + timestamp: new Date().toISOString(), + platformId: session.agent_group_id, + channelType: 'agent', + threadId: null, + content: JSON.stringify({ + text: `MCP server "${payload.name}" added. Verify it's available (e.g. list your tools) and report the result to the user.`, + sender: 'system', + senderId: 'system', + }), + onWake: 1, + }); + killContainer(session.id, 'mcp server added', () => { + const s = getSession(session.id); + if (s) wakeContainer(s); + }); log.info('MCP server add approved', { agentGroupId: session.agent_group_id, userId }); }; diff --git a/src/session-manager.ts b/src/session-manager.ts index 5c423ea..38c77f2 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -216,6 +216,11 @@ export function writeSessionMessage( * path so the target's reply routes back to that exact session. */ sourceSessionId?: string | null; + /** + * 1 = only deliver on the container's first poll (fresh start). + * Dying containers (past first poll) skip these rows. + */ + onWake?: 0 | 1; }, ): void { // Extract base64 attachment data, save to inbox, replace with file paths @@ -235,6 +240,7 @@ export function writeSessionMessage( recurrence: message.recurrence ?? null, trigger: message.trigger ?? 1, sourceSessionId: message.sourceSessionId ?? null, + onWake: message.onWake ?? 0, }); } finally { db.close();