v2: make v2 the main entry point, move v1 to src/v1/
- Move all v1 files (index, router, container-runner, db, ipc, types, logger, channels/registry, and all utilities) to src/v1/ as a fully self-contained archive with no shared dependencies - Rename v2 files to remove -v2 suffix (index-v2.ts → index.ts, etc.) - Update all imports across v2 source, tests, and setup files - Migrate shared utilities (config, env, container-runtime, mount-security, timezone, group-folder) from pino logger to v2 log module - Migrate setup/ files from logger to log with argument order swap - Container agent-runner: move v1 entry to v1/, rename v2 to index.ts - Update setup skill to offer all 13 v2 channels - Install all Chat SDK adapter packages - dist/index.js now runs v2; dist/v1/index.js runs v1 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,150 +1,165 @@
|
||||
/**
|
||||
* Container Runner for NanoClaw
|
||||
* Spawns agent execution in containers and handles IPC
|
||||
* Container Runner v2
|
||||
* Spawns agent containers with session folder + agent group folder mounts.
|
||||
* The container runs the v2 agent-runner which polls the session DB.
|
||||
*/
|
||||
import { ChildProcess, spawn } from 'child_process';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import {
|
||||
CONTAINER_IMAGE,
|
||||
CONTAINER_MAX_OUTPUT_SIZE,
|
||||
CONTAINER_TIMEOUT,
|
||||
DATA_DIR,
|
||||
GROUPS_DIR,
|
||||
IDLE_TIMEOUT,
|
||||
ONECLI_URL,
|
||||
TIMEZONE,
|
||||
} from './config.js';
|
||||
import { resolveGroupFolderPath, resolveGroupIpcPath } from './group-folder.js';
|
||||
import { logger } from './logger.js';
|
||||
import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js';
|
||||
import { OneCLI } from '@onecli-sh/sdk';
|
||||
|
||||
import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZONE } from './config.js';
|
||||
import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import { getMessagingGroup } from './db/messaging-groups.js';
|
||||
import { log } from './log.js';
|
||||
import { validateAdditionalMounts } from './mount-security.js';
|
||||
import { RegisteredGroup } from './types.js';
|
||||
import {
|
||||
markContainerIdle,
|
||||
markContainerRunning,
|
||||
markContainerStopped,
|
||||
sessionDbPath,
|
||||
sessionDir,
|
||||
} from './session-manager.js';
|
||||
import type { AgentGroup, Session } from './types.js';
|
||||
|
||||
const onecli = new OneCLI({ url: ONECLI_URL });
|
||||
|
||||
// Sentinel markers for robust output parsing (must match agent-runner)
|
||||
const OUTPUT_START_MARKER = '---NANOCLAW_OUTPUT_START---';
|
||||
const OUTPUT_END_MARKER = '---NANOCLAW_OUTPUT_END---';
|
||||
|
||||
export interface ContainerInput {
|
||||
prompt: string;
|
||||
sessionId?: string;
|
||||
groupFolder: string;
|
||||
chatJid: string;
|
||||
isMain: boolean;
|
||||
isScheduledTask?: boolean;
|
||||
assistantName?: string;
|
||||
script?: string;
|
||||
}
|
||||
|
||||
export interface ContainerOutput {
|
||||
status: 'success' | 'error';
|
||||
result: string | null;
|
||||
newSessionId?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
interface VolumeMount {
|
||||
hostPath: string;
|
||||
containerPath: string;
|
||||
readonly: boolean;
|
||||
}
|
||||
|
||||
function buildVolumeMounts(group: RegisteredGroup, isMain: boolean): VolumeMount[] {
|
||||
const mounts: VolumeMount[] = [];
|
||||
const projectRoot = process.cwd();
|
||||
const groupDir = resolveGroupFolderPath(group.folder);
|
||||
/** Active containers tracked by session ID. */
|
||||
const activeContainers = new Map<string, { process: ChildProcess; containerName: string }>();
|
||||
|
||||
if (isMain) {
|
||||
// Main gets the project root read-only. Writable paths the agent needs
|
||||
// (store, group folder, IPC, .claude/) are mounted separately below.
|
||||
// Read-only prevents the agent from modifying host application code
|
||||
// (src/, dist/, package.json, etc.) which would bypass the sandbox
|
||||
// entirely on next restart.
|
||||
mounts.push({
|
||||
hostPath: projectRoot,
|
||||
containerPath: '/workspace/project',
|
||||
readonly: true,
|
||||
});
|
||||
export function getActiveContainerCount(): number {
|
||||
return activeContainers.size;
|
||||
}
|
||||
|
||||
// Shadow .env so the agent cannot read secrets from the mounted project root.
|
||||
// Credentials are injected by the OneCLI gateway, never exposed to containers.
|
||||
const envFile = path.join(projectRoot, '.env');
|
||||
if (fs.existsSync(envFile)) {
|
||||
mounts.push({
|
||||
hostPath: '/dev/null',
|
||||
containerPath: '/workspace/project/.env',
|
||||
readonly: true,
|
||||
});
|
||||
}
|
||||
export function isContainerRunning(sessionId: string): boolean {
|
||||
return activeContainers.has(sessionId);
|
||||
}
|
||||
|
||||
// Main gets writable access to the store (SQLite DB) so it can
|
||||
// query and write to the database directly.
|
||||
const storeDir = path.join(projectRoot, 'store');
|
||||
mounts.push({
|
||||
hostPath: storeDir,
|
||||
containerPath: '/workspace/project/store',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Main also gets its group folder as the working directory
|
||||
mounts.push({
|
||||
hostPath: groupDir,
|
||||
containerPath: '/workspace/group',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Global memory directory — writable for main so it can update shared context
|
||||
const globalDir = path.join(GROUPS_DIR, 'global');
|
||||
if (fs.existsSync(globalDir)) {
|
||||
mounts.push({
|
||||
hostPath: globalDir,
|
||||
containerPath: '/workspace/global',
|
||||
readonly: false,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// Other groups only get their own folder
|
||||
mounts.push({
|
||||
hostPath: groupDir,
|
||||
containerPath: '/workspace/group',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Global memory directory (read-only for non-main)
|
||||
// Only directory mounts are supported, not file mounts
|
||||
const globalDir = path.join(GROUPS_DIR, 'global');
|
||||
if (fs.existsSync(globalDir)) {
|
||||
mounts.push({
|
||||
hostPath: globalDir,
|
||||
containerPath: '/workspace/global',
|
||||
readonly: true,
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Wake up a container for a session. If already running, no-op.
|
||||
* The container runs the v2 agent-runner which polls the session DB.
|
||||
*/
|
||||
export async function wakeContainer(session: Session): Promise<void> {
|
||||
if (activeContainers.has(session.id)) {
|
||||
log.debug('Container already running', { sessionId: session.id });
|
||||
return;
|
||||
}
|
||||
|
||||
// Per-group Claude sessions directory (isolated from other groups)
|
||||
// Each group gets their own .claude/ to prevent cross-group session access
|
||||
const groupSessionsDir = path.join(DATA_DIR, 'sessions', group.folder, '.claude');
|
||||
fs.mkdirSync(groupSessionsDir, { recursive: true });
|
||||
const settingsFile = path.join(groupSessionsDir, 'settings.json');
|
||||
const agentGroup = getAgentGroup(session.agent_group_id);
|
||||
if (!agentGroup) {
|
||||
log.error('Agent group not found', { agentGroupId: session.agent_group_id });
|
||||
return;
|
||||
}
|
||||
|
||||
const mounts = buildMounts(agentGroup, session);
|
||||
const containerName = `nanoclaw-v2-${agentGroup.folder}-${Date.now()}`;
|
||||
const agentIdentifier = agentGroup.is_admin ? undefined : agentGroup.folder.toLowerCase().replace(/_/g, '-');
|
||||
const args = await buildContainerArgs(mounts, containerName, session, agentGroup, agentIdentifier);
|
||||
|
||||
log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName });
|
||||
|
||||
const container = spawn(CONTAINER_RUNTIME_BIN, args, { stdio: ['ignore', 'pipe', 'pipe'] });
|
||||
|
||||
activeContainers.set(session.id, { process: container, containerName });
|
||||
markContainerRunning(session.id);
|
||||
|
||||
// Log stderr
|
||||
container.stderr?.on('data', (data) => {
|
||||
for (const line of data.toString().trim().split('\n')) {
|
||||
if (line) log.debug(line, { container: agentGroup.folder });
|
||||
}
|
||||
});
|
||||
|
||||
// stdout is unused in v2 (all IO is via session DB)
|
||||
container.stdout?.on('data', () => {});
|
||||
|
||||
// Idle timeout: kill container after IDLE_TIMEOUT of no activity
|
||||
let idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT);
|
||||
|
||||
const resetIdle = () => {
|
||||
clearTimeout(idleTimer);
|
||||
idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT);
|
||||
};
|
||||
|
||||
// Reset idle timer when the host detects new messages_out (called by delivery.ts)
|
||||
const entry = activeContainers.get(session.id);
|
||||
if (entry) {
|
||||
(entry as { resetIdle?: () => void }).resetIdle = resetIdle;
|
||||
}
|
||||
|
||||
container.on('close', (code) => {
|
||||
clearTimeout(idleTimer);
|
||||
activeContainers.delete(session.id);
|
||||
markContainerStopped(session.id);
|
||||
log.info('Container exited', { sessionId: session.id, code, containerName });
|
||||
});
|
||||
|
||||
container.on('error', (err) => {
|
||||
clearTimeout(idleTimer);
|
||||
activeContainers.delete(session.id);
|
||||
markContainerStopped(session.id);
|
||||
log.error('Container spawn error', { sessionId: session.id, err });
|
||||
});
|
||||
}
|
||||
|
||||
/** Reset the idle timer for a session's container (called when messages_out are delivered). */
|
||||
export function resetContainerIdleTimer(sessionId: string): void {
|
||||
const entry = activeContainers.get(sessionId) as { resetIdle?: () => void } | undefined;
|
||||
entry?.resetIdle?.();
|
||||
}
|
||||
|
||||
/** Kill a container for a session. */
|
||||
export function killContainer(sessionId: string, reason: string): void {
|
||||
const entry = activeContainers.get(sessionId);
|
||||
if (!entry) return;
|
||||
|
||||
log.info('Killing container', { sessionId, reason, containerName: entry.containerName });
|
||||
try {
|
||||
stopContainer(entry.containerName);
|
||||
} catch {
|
||||
entry.process.kill('SIGKILL');
|
||||
}
|
||||
}
|
||||
|
||||
function buildMounts(agentGroup: AgentGroup, session: Session): VolumeMount[] {
|
||||
const mounts: VolumeMount[] = [];
|
||||
const projectRoot = process.cwd();
|
||||
const sessDir = sessionDir(agentGroup.id, session.id);
|
||||
const groupDir = path.resolve(GROUPS_DIR, agentGroup.folder);
|
||||
|
||||
// Session folder at /workspace (contains session.db, outbox/, .claude/)
|
||||
mounts.push({ hostPath: sessDir, containerPath: '/workspace', readonly: false });
|
||||
|
||||
// Agent group folder at /workspace/agent
|
||||
fs.mkdirSync(groupDir, { recursive: true });
|
||||
mounts.push({ hostPath: groupDir, containerPath: '/workspace/agent', readonly: false });
|
||||
|
||||
// Global memory directory
|
||||
const globalDir = path.join(GROUPS_DIR, 'global');
|
||||
if (fs.existsSync(globalDir)) {
|
||||
mounts.push({ hostPath: globalDir, containerPath: '/workspace/global', readonly: !agentGroup.is_admin });
|
||||
}
|
||||
|
||||
// Claude sessions directory (per agent group, shared across sessions)
|
||||
const claudeDir = path.join(DATA_DIR, 'v2-sessions', agentGroup.id, '.claude-shared');
|
||||
fs.mkdirSync(claudeDir, { recursive: true });
|
||||
const settingsFile = path.join(claudeDir, 'settings.json');
|
||||
if (!fs.existsSync(settingsFile)) {
|
||||
fs.writeFileSync(
|
||||
settingsFile,
|
||||
JSON.stringify(
|
||||
{
|
||||
env: {
|
||||
// Enable agent swarms (subagent orchestration)
|
||||
// https://code.claude.com/docs/en/agent-teams#orchestrate-teams-of-claude-code-sessions
|
||||
CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS: '1',
|
||||
// Load CLAUDE.md from additional mounted directories
|
||||
// https://code.claude.com/docs/en/memory#load-memory-from-additional-directories
|
||||
CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1',
|
||||
// Enable Claude's memory feature (persists user preferences between sessions)
|
||||
// https://code.claude.com/docs/en/memory#manage-auto-memory
|
||||
CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0',
|
||||
},
|
||||
},
|
||||
@@ -154,61 +169,46 @@ function buildVolumeMounts(group: RegisteredGroup, isMain: boolean): VolumeMount
|
||||
);
|
||||
}
|
||||
|
||||
// Sync skills from container/skills/ into each group's .claude/skills/
|
||||
const skillsSrc = path.join(process.cwd(), 'container', 'skills');
|
||||
const skillsDst = path.join(groupSessionsDir, 'skills');
|
||||
// Sync container skills
|
||||
const skillsSrc = path.join(projectRoot, 'container', 'skills');
|
||||
const skillsDst = path.join(claudeDir, 'skills');
|
||||
if (fs.existsSync(skillsSrc)) {
|
||||
for (const skillDir of fs.readdirSync(skillsSrc)) {
|
||||
const srcDir = path.join(skillsSrc, skillDir);
|
||||
if (!fs.statSync(srcDir).isDirectory()) continue;
|
||||
const dstDir = path.join(skillsDst, skillDir);
|
||||
fs.cpSync(srcDir, dstDir, { recursive: true });
|
||||
if (fs.statSync(srcDir).isDirectory()) {
|
||||
fs.cpSync(srcDir, path.join(skillsDst, skillDir), { recursive: true });
|
||||
}
|
||||
}
|
||||
}
|
||||
mounts.push({
|
||||
hostPath: groupSessionsDir,
|
||||
containerPath: '/home/node/.claude',
|
||||
readonly: false,
|
||||
});
|
||||
mounts.push({ hostPath: claudeDir, containerPath: '/home/node/.claude', readonly: false });
|
||||
|
||||
// Per-group IPC namespace: each group gets its own IPC directory
|
||||
// This prevents cross-group privilege escalation via IPC
|
||||
const groupIpcDir = resolveGroupIpcPath(group.folder);
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'messages'), { recursive: true });
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'tasks'), { recursive: true });
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'input'), { recursive: true });
|
||||
mounts.push({
|
||||
hostPath: groupIpcDir,
|
||||
containerPath: '/workspace/ipc',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Copy agent-runner source into a per-group writable location so agents
|
||||
// can customize it (add tools, change behavior) without affecting other
|
||||
// groups. Recompiled on container startup via entrypoint.sh.
|
||||
// Agent-runner source (per agent group, recompiled on container startup)
|
||||
const agentRunnerSrc = path.join(projectRoot, 'container', 'agent-runner', 'src');
|
||||
const groupAgentRunnerDir = path.join(DATA_DIR, 'sessions', group.folder, 'agent-runner-src');
|
||||
const groupRunnerDir = path.join(DATA_DIR, 'v2-sessions', agentGroup.id, 'agent-runner-src');
|
||||
if (fs.existsSync(agentRunnerSrc)) {
|
||||
const srcIndex = path.join(agentRunnerSrc, 'index.ts');
|
||||
const cachedIndex = path.join(groupAgentRunnerDir, 'index.ts');
|
||||
const needsCopy =
|
||||
!fs.existsSync(groupAgentRunnerDir) ||
|
||||
!fs.existsSync(cachedIndex) ||
|
||||
(fs.existsSync(srcIndex) && fs.statSync(srcIndex).mtimeMs > fs.statSync(cachedIndex).mtimeMs);
|
||||
if (needsCopy) {
|
||||
fs.cpSync(agentRunnerSrc, groupAgentRunnerDir, { recursive: true });
|
||||
// Always copy — source files may have changed beyond just the index
|
||||
fs.cpSync(agentRunnerSrc, groupRunnerDir, { recursive: true });
|
||||
}
|
||||
mounts.push({ hostPath: groupRunnerDir, containerPath: '/app/src', readonly: false });
|
||||
|
||||
// Admin: mount project root read-only
|
||||
if (agentGroup.is_admin) {
|
||||
mounts.push({ hostPath: projectRoot, containerPath: '/workspace/project', readonly: true });
|
||||
const envFile = path.join(projectRoot, '.env');
|
||||
if (fs.existsSync(envFile)) {
|
||||
mounts.push({ hostPath: '/dev/null', containerPath: '/workspace/project/.env', readonly: true });
|
||||
}
|
||||
}
|
||||
mounts.push({
|
||||
hostPath: groupAgentRunnerDir,
|
||||
containerPath: '/app/src',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Additional mounts validated against external allowlist (tamper-proof from containers)
|
||||
if (group.containerConfig?.additionalMounts) {
|
||||
const validatedMounts = validateAdditionalMounts(group.containerConfig.additionalMounts, group.name, isMain);
|
||||
mounts.push(...validatedMounts);
|
||||
// Additional mounts from container config
|
||||
const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {};
|
||||
if (containerConfig.additionalMounts) {
|
||||
const validated = validateAdditionalMounts(
|
||||
containerConfig.additionalMounts,
|
||||
agentGroup.name,
|
||||
!!agentGroup.is_admin,
|
||||
);
|
||||
mounts.push(...validated);
|
||||
}
|
||||
|
||||
return mounts;
|
||||
@@ -217,31 +217,38 @@ function buildVolumeMounts(group: RegisteredGroup, isMain: boolean): VolumeMount
|
||||
async function buildContainerArgs(
|
||||
mounts: VolumeMount[],
|
||||
containerName: string,
|
||||
session: Session,
|
||||
agentGroup: AgentGroup,
|
||||
agentIdentifier?: string,
|
||||
): Promise<string[]> {
|
||||
const args: string[] = ['run', '-i', '--rm', '--name', containerName];
|
||||
const args: string[] = ['run', '--rm', '--name', containerName];
|
||||
|
||||
// Pass host timezone so container's local time matches the user's
|
||||
// Environment
|
||||
args.push('-e', `TZ=${TIMEZONE}`);
|
||||
args.push('-e', `AGENT_PROVIDER=${session.agent_provider || agentGroup.agent_provider || 'claude'}`);
|
||||
args.push('-e', `SESSION_DB_PATH=/workspace/session.db`);
|
||||
|
||||
// OneCLI gateway handles credential injection — containers never see real secrets.
|
||||
// The gateway intercepts HTTPS traffic and injects API keys or OAuth tokens.
|
||||
const onecliApplied = await onecli.applyContainerConfig(args, {
|
||||
addHostMapping: false, // Nanoclaw already handles host gateway
|
||||
agent: agentIdentifier,
|
||||
});
|
||||
if (onecliApplied) {
|
||||
logger.info({ containerName }, 'OneCLI gateway config applied');
|
||||
} else {
|
||||
logger.warn({ containerName }, 'OneCLI gateway not reachable — container will have no credentials');
|
||||
// Pass admin user ID and assistant name from messaging group/agent group
|
||||
if (session.messaging_group_id) {
|
||||
const mg = getMessagingGroup(session.messaging_group_id);
|
||||
if (mg?.admin_user_id) {
|
||||
args.push('-e', `NANOCLAW_ADMIN_USER_ID=${mg.admin_user_id}`);
|
||||
}
|
||||
}
|
||||
if (agentGroup.name) {
|
||||
args.push('-e', `NANOCLAW_ASSISTANT_NAME=${agentGroup.name}`);
|
||||
}
|
||||
|
||||
// Runtime-specific args for host gateway resolution
|
||||
// OneCLI gateway
|
||||
const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier });
|
||||
if (onecliApplied) {
|
||||
log.debug('OneCLI gateway applied', { containerName });
|
||||
}
|
||||
|
||||
// Host gateway
|
||||
args.push(...hostGatewayArgs());
|
||||
|
||||
// Run as host user so bind-mounted files are accessible.
|
||||
// Skip when running as root (uid 0), as the container's node user (uid 1000),
|
||||
// or when getuid is unavailable (native Windows without WSL).
|
||||
// User mapping
|
||||
const hostUid = process.getuid?.();
|
||||
const hostGid = process.getgid?.();
|
||||
if (hostUid != null && hostUid !== 0 && hostUid !== 1000) {
|
||||
@@ -249,6 +256,7 @@ async function buildContainerArgs(
|
||||
args.push('-e', 'HOME=/home/node');
|
||||
}
|
||||
|
||||
// Volume mounts
|
||||
for (const mount of mounts) {
|
||||
if (mount.readonly) {
|
||||
args.push(...readonlyMountArgs(mount.hostPath, mount.containerPath));
|
||||
@@ -257,421 +265,13 @@ async function buildContainerArgs(
|
||||
}
|
||||
}
|
||||
|
||||
// Override entrypoint: compile agent-runner source, run v2 entry point (no stdin)
|
||||
args.push('--entrypoint', 'bash');
|
||||
args.push(CONTAINER_IMAGE);
|
||||
args.push(
|
||||
'-c',
|
||||
'cd /app && npx tsc --outDir /tmp/dist 2>&1 >&2 && ln -sf /app/node_modules /tmp/dist/node_modules && node /tmp/dist/index.js',
|
||||
);
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
export async function runContainerAgent(
|
||||
group: RegisteredGroup,
|
||||
input: ContainerInput,
|
||||
onProcess: (proc: ChildProcess, containerName: string) => void,
|
||||
onOutput?: (output: ContainerOutput) => Promise<void>,
|
||||
): Promise<ContainerOutput> {
|
||||
const startTime = Date.now();
|
||||
|
||||
const groupDir = resolveGroupFolderPath(group.folder);
|
||||
fs.mkdirSync(groupDir, { recursive: true });
|
||||
|
||||
const mounts = buildVolumeMounts(group, input.isMain);
|
||||
const safeName = group.folder.replace(/[^a-zA-Z0-9-]/g, '-');
|
||||
const containerName = `nanoclaw-${safeName}-${Date.now()}`;
|
||||
// Main group uses the default OneCLI agent; others use their own agent.
|
||||
const agentIdentifier = input.isMain ? undefined : group.folder.toLowerCase().replace(/_/g, '-');
|
||||
const containerArgs = await buildContainerArgs(mounts, containerName, agentIdentifier);
|
||||
|
||||
logger.debug(
|
||||
{
|
||||
group: group.name,
|
||||
containerName,
|
||||
mounts: mounts.map((m) => `${m.hostPath} -> ${m.containerPath}${m.readonly ? ' (ro)' : ''}`),
|
||||
containerArgs: containerArgs.join(' '),
|
||||
},
|
||||
'Container mount configuration',
|
||||
);
|
||||
|
||||
logger.info(
|
||||
{
|
||||
group: group.name,
|
||||
containerName,
|
||||
mountCount: mounts.length,
|
||||
isMain: input.isMain,
|
||||
},
|
||||
'Spawning container agent',
|
||||
);
|
||||
|
||||
const logsDir = path.join(groupDir, 'logs');
|
||||
fs.mkdirSync(logsDir, { recursive: true });
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const container = spawn(CONTAINER_RUNTIME_BIN, containerArgs, {
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
});
|
||||
|
||||
onProcess(container, containerName);
|
||||
|
||||
let stdout = '';
|
||||
let stderr = '';
|
||||
let stdoutTruncated = false;
|
||||
let stderrTruncated = false;
|
||||
|
||||
container.stdin.write(JSON.stringify(input));
|
||||
container.stdin.end();
|
||||
|
||||
// Streaming output: parse OUTPUT_START/END marker pairs as they arrive
|
||||
let parseBuffer = '';
|
||||
let newSessionId: string | undefined;
|
||||
let outputChain = Promise.resolve();
|
||||
|
||||
container.stdout.on('data', (data) => {
|
||||
const chunk = data.toString();
|
||||
|
||||
// Always accumulate for logging
|
||||
if (!stdoutTruncated) {
|
||||
const remaining = CONTAINER_MAX_OUTPUT_SIZE - stdout.length;
|
||||
if (chunk.length > remaining) {
|
||||
stdout += chunk.slice(0, remaining);
|
||||
stdoutTruncated = true;
|
||||
logger.warn({ group: group.name, size: stdout.length }, 'Container stdout truncated due to size limit');
|
||||
} else {
|
||||
stdout += chunk;
|
||||
}
|
||||
}
|
||||
|
||||
// Stream-parse for output markers
|
||||
if (onOutput) {
|
||||
parseBuffer += chunk;
|
||||
let startIdx: number;
|
||||
while ((startIdx = parseBuffer.indexOf(OUTPUT_START_MARKER)) !== -1) {
|
||||
const endIdx = parseBuffer.indexOf(OUTPUT_END_MARKER, startIdx);
|
||||
if (endIdx === -1) break; // Incomplete pair, wait for more data
|
||||
|
||||
const jsonStr = parseBuffer.slice(startIdx + OUTPUT_START_MARKER.length, endIdx).trim();
|
||||
parseBuffer = parseBuffer.slice(endIdx + OUTPUT_END_MARKER.length);
|
||||
|
||||
try {
|
||||
const parsed: ContainerOutput = JSON.parse(jsonStr);
|
||||
if (parsed.newSessionId) {
|
||||
newSessionId = parsed.newSessionId;
|
||||
}
|
||||
hadStreamingOutput = true;
|
||||
// Activity detected — reset the hard timeout
|
||||
resetTimeout();
|
||||
// Call onOutput for all markers (including null results)
|
||||
// so idle timers start even for "silent" query completions.
|
||||
outputChain = outputChain.then(() => onOutput(parsed));
|
||||
} catch (err) {
|
||||
logger.warn({ group: group.name, error: err }, 'Failed to parse streamed output chunk');
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
container.stderr.on('data', (data) => {
|
||||
const chunk = data.toString();
|
||||
const lines = chunk.trim().split('\n');
|
||||
for (const line of lines) {
|
||||
if (line) logger.debug({ container: group.folder }, line);
|
||||
}
|
||||
// Don't reset timeout on stderr — SDK writes debug logs continuously.
|
||||
// Timeout only resets on actual output (OUTPUT_MARKER in stdout).
|
||||
if (stderrTruncated) return;
|
||||
const remaining = CONTAINER_MAX_OUTPUT_SIZE - stderr.length;
|
||||
if (chunk.length > remaining) {
|
||||
stderr += chunk.slice(0, remaining);
|
||||
stderrTruncated = true;
|
||||
logger.warn({ group: group.name, size: stderr.length }, 'Container stderr truncated due to size limit');
|
||||
} else {
|
||||
stderr += chunk;
|
||||
}
|
||||
});
|
||||
|
||||
let timedOut = false;
|
||||
let hadStreamingOutput = false;
|
||||
const configTimeout = group.containerConfig?.timeout || CONTAINER_TIMEOUT;
|
||||
// Grace period: hard timeout must be at least IDLE_TIMEOUT + 30s so the
|
||||
// graceful _close sentinel has time to trigger before the hard kill fires.
|
||||
const timeoutMs = Math.max(configTimeout, IDLE_TIMEOUT + 30_000);
|
||||
|
||||
const killOnTimeout = () => {
|
||||
timedOut = true;
|
||||
logger.error({ group: group.name, containerName }, 'Container timeout, stopping gracefully');
|
||||
try {
|
||||
stopContainer(containerName);
|
||||
} catch (err) {
|
||||
logger.warn({ group: group.name, containerName, err }, 'Graceful stop failed, force killing');
|
||||
container.kill('SIGKILL');
|
||||
}
|
||||
};
|
||||
|
||||
let timeout = setTimeout(killOnTimeout, timeoutMs);
|
||||
|
||||
// Reset the timeout whenever there's activity (streaming output)
|
||||
const resetTimeout = () => {
|
||||
clearTimeout(timeout);
|
||||
timeout = setTimeout(killOnTimeout, timeoutMs);
|
||||
};
|
||||
|
||||
container.on('close', (code) => {
|
||||
clearTimeout(timeout);
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
if (timedOut) {
|
||||
const ts = new Date().toISOString().replace(/[:.]/g, '-');
|
||||
const timeoutLog = path.join(logsDir, `container-${ts}.log`);
|
||||
fs.writeFileSync(
|
||||
timeoutLog,
|
||||
[
|
||||
`=== Container Run Log (TIMEOUT) ===`,
|
||||
`Timestamp: ${new Date().toISOString()}`,
|
||||
`Group: ${group.name}`,
|
||||
`Container: ${containerName}`,
|
||||
`Duration: ${duration}ms`,
|
||||
`Exit Code: ${code}`,
|
||||
`Had Streaming Output: ${hadStreamingOutput}`,
|
||||
].join('\n'),
|
||||
);
|
||||
|
||||
// Timeout after output = idle cleanup, not failure.
|
||||
// The agent already sent its response; this is just the
|
||||
// container being reaped after the idle period expired.
|
||||
if (hadStreamingOutput) {
|
||||
logger.info(
|
||||
{ group: group.name, containerName, duration, code },
|
||||
'Container timed out after output (idle cleanup)',
|
||||
);
|
||||
outputChain.then(() => {
|
||||
resolve({
|
||||
status: 'success',
|
||||
result: null,
|
||||
newSessionId,
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
logger.error({ group: group.name, containerName, duration, code }, 'Container timed out with no output');
|
||||
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Container timed out after ${configTimeout}ms`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
|
||||
const logFile = path.join(logsDir, `container-${timestamp}.log`);
|
||||
const isVerbose = process.env.LOG_LEVEL === 'debug' || process.env.LOG_LEVEL === 'trace';
|
||||
|
||||
const logLines = [
|
||||
`=== Container Run Log ===`,
|
||||
`Timestamp: ${new Date().toISOString()}`,
|
||||
`Group: ${group.name}`,
|
||||
`IsMain: ${input.isMain}`,
|
||||
`Duration: ${duration}ms`,
|
||||
`Exit Code: ${code}`,
|
||||
`Stdout Truncated: ${stdoutTruncated}`,
|
||||
`Stderr Truncated: ${stderrTruncated}`,
|
||||
``,
|
||||
];
|
||||
|
||||
const isError = code !== 0;
|
||||
|
||||
if (isVerbose || isError) {
|
||||
// On error, log input metadata only — not the full prompt.
|
||||
// Full input is only included at verbose level to avoid
|
||||
// persisting user conversation content on every non-zero exit.
|
||||
if (isVerbose) {
|
||||
logLines.push(`=== Input ===`, JSON.stringify(input, null, 2), ``);
|
||||
} else {
|
||||
logLines.push(
|
||||
`=== Input Summary ===`,
|
||||
`Prompt length: ${input.prompt.length} chars`,
|
||||
`Session ID: ${input.sessionId || 'new'}`,
|
||||
``,
|
||||
);
|
||||
}
|
||||
logLines.push(
|
||||
`=== Container Args ===`,
|
||||
containerArgs.join(' '),
|
||||
``,
|
||||
`=== Mounts ===`,
|
||||
mounts.map((m) => `${m.hostPath} -> ${m.containerPath}${m.readonly ? ' (ro)' : ''}`).join('\n'),
|
||||
``,
|
||||
`=== Stderr${stderrTruncated ? ' (TRUNCATED)' : ''} ===`,
|
||||
stderr,
|
||||
``,
|
||||
`=== Stdout${stdoutTruncated ? ' (TRUNCATED)' : ''} ===`,
|
||||
stdout,
|
||||
);
|
||||
} else {
|
||||
logLines.push(
|
||||
`=== Input Summary ===`,
|
||||
`Prompt length: ${input.prompt.length} chars`,
|
||||
`Session ID: ${input.sessionId || 'new'}`,
|
||||
``,
|
||||
`=== Mounts ===`,
|
||||
mounts.map((m) => `${m.containerPath}${m.readonly ? ' (ro)' : ''}`).join('\n'),
|
||||
``,
|
||||
);
|
||||
}
|
||||
|
||||
fs.writeFileSync(logFile, logLines.join('\n'));
|
||||
logger.debug({ logFile, verbose: isVerbose }, 'Container log written');
|
||||
|
||||
if (code !== 0) {
|
||||
logger.error(
|
||||
{
|
||||
group: group.name,
|
||||
code,
|
||||
duration,
|
||||
stderr,
|
||||
stdout,
|
||||
logFile,
|
||||
},
|
||||
'Container exited with error',
|
||||
);
|
||||
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Container exited with code ${code}: ${stderr.slice(-200)}`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Streaming mode: wait for output chain to settle, return completion marker
|
||||
if (onOutput) {
|
||||
outputChain.then(() => {
|
||||
logger.info({ group: group.name, duration, newSessionId }, 'Container completed (streaming mode)');
|
||||
resolve({
|
||||
status: 'success',
|
||||
result: null,
|
||||
newSessionId,
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Legacy mode: parse the last output marker pair from accumulated stdout
|
||||
try {
|
||||
// Extract JSON between sentinel markers for robust parsing
|
||||
const startIdx = stdout.indexOf(OUTPUT_START_MARKER);
|
||||
const endIdx = stdout.indexOf(OUTPUT_END_MARKER);
|
||||
|
||||
let jsonLine: string;
|
||||
if (startIdx !== -1 && endIdx !== -1 && endIdx > startIdx) {
|
||||
jsonLine = stdout.slice(startIdx + OUTPUT_START_MARKER.length, endIdx).trim();
|
||||
} else {
|
||||
// Fallback: last non-empty line (backwards compatibility)
|
||||
const lines = stdout.trim().split('\n');
|
||||
jsonLine = lines[lines.length - 1];
|
||||
}
|
||||
|
||||
const output: ContainerOutput = JSON.parse(jsonLine);
|
||||
|
||||
logger.info(
|
||||
{
|
||||
group: group.name,
|
||||
duration,
|
||||
status: output.status,
|
||||
hasResult: !!output.result,
|
||||
},
|
||||
'Container completed',
|
||||
);
|
||||
|
||||
resolve(output);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{
|
||||
group: group.name,
|
||||
stdout,
|
||||
stderr,
|
||||
error: err,
|
||||
},
|
||||
'Failed to parse container output',
|
||||
);
|
||||
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Failed to parse container output: ${err instanceof Error ? err.message : String(err)}`,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
container.on('error', (err) => {
|
||||
clearTimeout(timeout);
|
||||
logger.error({ group: group.name, containerName, error: err }, 'Container spawn error');
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Container spawn error: ${err.message}`,
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function writeTasksSnapshot(
|
||||
groupFolder: string,
|
||||
isMain: boolean,
|
||||
tasks: Array<{
|
||||
id: string;
|
||||
groupFolder: string;
|
||||
prompt: string;
|
||||
script?: string | null;
|
||||
schedule_type: string;
|
||||
schedule_value: string;
|
||||
status: string;
|
||||
next_run: string | null;
|
||||
}>,
|
||||
): void {
|
||||
// Write filtered tasks to the group's IPC directory
|
||||
const groupIpcDir = resolveGroupIpcPath(groupFolder);
|
||||
fs.mkdirSync(groupIpcDir, { recursive: true });
|
||||
|
||||
// Main sees all tasks, others only see their own
|
||||
const filteredTasks = isMain ? tasks : tasks.filter((t) => t.groupFolder === groupFolder);
|
||||
|
||||
const tasksFile = path.join(groupIpcDir, 'current_tasks.json');
|
||||
fs.writeFileSync(tasksFile, JSON.stringify(filteredTasks, null, 2));
|
||||
}
|
||||
|
||||
export interface AvailableGroup {
|
||||
jid: string;
|
||||
name: string;
|
||||
lastActivity: string;
|
||||
isRegistered: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write available groups snapshot for the container to read.
|
||||
* Only main group can see all available groups (for activation).
|
||||
* Non-main groups only see their own registration status.
|
||||
*/
|
||||
export function writeGroupsSnapshot(
|
||||
groupFolder: string,
|
||||
isMain: boolean,
|
||||
groups: AvailableGroup[],
|
||||
_registeredJids: Set<string>,
|
||||
): void {
|
||||
const groupIpcDir = resolveGroupIpcPath(groupFolder);
|
||||
fs.mkdirSync(groupIpcDir, { recursive: true });
|
||||
|
||||
// Main sees all groups; others see nothing (they can't activate groups)
|
||||
const visibleGroups = isMain ? groups : [];
|
||||
|
||||
const groupsFile = path.join(groupIpcDir, 'available_groups.json');
|
||||
fs.writeFileSync(
|
||||
groupsFile,
|
||||
JSON.stringify(
|
||||
{
|
||||
groups: visibleGroups,
|
||||
lastSync: new Date().toISOString(),
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user