merge: catch up nc-cli to main
Resolve conflict in src/index.ts shutdown sequence — keep both stopCliServer() from nc-cli and try/finally + resetCircuitBreaker() from main. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
71
src/attachment-naming.test.ts
Normal file
71
src/attachment-naming.test.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
|
||||
import { deriveAttachmentName, extForMime } from './attachment-naming.js';
|
||||
|
||||
describe('extForMime', () => {
|
||||
it('returns empty for undefined / non-string / empty', () => {
|
||||
expect(extForMime(undefined)).toBe('');
|
||||
expect(extForMime('')).toBe('');
|
||||
expect(extForMime({})).toBe('');
|
||||
expect(extForMime(null)).toBe('');
|
||||
expect(extForMime(42)).toBe('');
|
||||
});
|
||||
|
||||
it('maps common MIME types to canonical extensions', () => {
|
||||
expect(extForMime('image/jpeg')).toBe('jpg');
|
||||
expect(extForMime('application/pdf')).toBe('pdf');
|
||||
expect(extForMime('audio/ogg')).toBe('ogg');
|
||||
});
|
||||
|
||||
it('strips parameters and is case-insensitive', () => {
|
||||
expect(extForMime('image/JPEG; foo=bar')).toBe('jpg');
|
||||
expect(extForMime(' Application/PDF ')).toBe('pdf');
|
||||
expect(extForMime('text/plain; charset=utf-8')).toBe('txt');
|
||||
});
|
||||
|
||||
it('returns empty for unknown MIMEs', () => {
|
||||
expect(extForMime('application/octet-stream')).toBe('');
|
||||
expect(extForMime('application/x-totally-made-up')).toBe('');
|
||||
});
|
||||
});
|
||||
|
||||
describe('deriveAttachmentName', () => {
|
||||
it('returns explicit name when set, no derivation', () => {
|
||||
expect(deriveAttachmentName({ name: 'photo.jpg', mimeType: 'application/pdf' })).toBe('photo.jpg');
|
||||
});
|
||||
|
||||
it('ignores empty / non-string explicit name and falls through to derivation', () => {
|
||||
const out = deriveAttachmentName({ name: '', mimeType: 'application/pdf' });
|
||||
expect(out).toMatch(/^attachment-\d+\.pdf$/);
|
||||
|
||||
const out2 = deriveAttachmentName({ name: 42, mimeType: 'application/pdf' });
|
||||
expect(out2).toMatch(/^attachment-\d+\.pdf$/);
|
||||
});
|
||||
|
||||
it('derives extension from mimeType when no name', () => {
|
||||
expect(deriveAttachmentName({ mimeType: 'application/pdf' })).toMatch(/^attachment-\d+\.pdf$/);
|
||||
expect(deriveAttachmentName({ mimeType: 'image/jpeg' })).toMatch(/^attachment-\d+\.jpg$/);
|
||||
});
|
||||
|
||||
it('falls back to att.type when mimeType is missing (Telegram photos/stickers)', () => {
|
||||
expect(deriveAttachmentName({ type: 'photo' })).toMatch(/^attachment-\d+\.jpg$/);
|
||||
expect(deriveAttachmentName({ type: 'sticker' })).toMatch(/^attachment-\d+\.webp$/);
|
||||
expect(deriveAttachmentName({ type: 'voice' })).toMatch(/^attachment-\d+\.ogg$/);
|
||||
expect(deriveAttachmentName({ type: 'animation' })).toMatch(/^attachment-\d+\.mp4$/);
|
||||
});
|
||||
|
||||
it('case-insensitive att.type lookup', () => {
|
||||
expect(deriveAttachmentName({ type: 'PHOTO' })).toMatch(/^attachment-\d+\.jpg$/);
|
||||
});
|
||||
|
||||
it('returns bare timestamp when nothing matches', () => {
|
||||
expect(deriveAttachmentName({})).toMatch(/^attachment-\d+$/);
|
||||
expect(deriveAttachmentName({ mimeType: 'application/octet-stream' })).toMatch(/^attachment-\d+$/);
|
||||
expect(deriveAttachmentName({ type: 'mystery-class' })).toMatch(/^attachment-\d+$/);
|
||||
});
|
||||
|
||||
it('does not crash on non-string mimeType (defensive against buggy bridges)', () => {
|
||||
expect(() => deriveAttachmentName({ mimeType: { foo: 'bar' } })).not.toThrow();
|
||||
expect(deriveAttachmentName({ mimeType: { foo: 'bar' } })).toMatch(/^attachment-\d+$/);
|
||||
});
|
||||
});
|
||||
69
src/attachment-naming.ts
Normal file
69
src/attachment-naming.ts
Normal file
@@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Derive a safe, extensioned filename for inbound attachments when the
|
||||
* channel bridge passes data without an explicit `name`.
|
||||
*
|
||||
* Two-step lookup:
|
||||
* 1. `mimeType` → extension (Discord/Slack documents, Telegram document
|
||||
* uploads — channels that set the MIME but not a filename).
|
||||
* 2. `att.type` → extension (Telegram photos/stickers/voice/animations —
|
||||
* coarse media-class set by the chat-sdk bridge with no MIME).
|
||||
*
|
||||
* Output is still passed through `isSafeAttachmentName` at the call site.
|
||||
* The maps emit static values, so no derivation path can construct a
|
||||
* traversal payload — only an attacker-controlled `att.name` can, and that
|
||||
* goes through the safety guard unchanged.
|
||||
*/
|
||||
|
||||
// Map common MIME types to canonical file extensions. Without an extension,
|
||||
// agents (and humans) can't tell what kind of file landed in the inbox, and
|
||||
// tools keyed on extension (image viewers, exiftool, etc.) misbehave.
|
||||
const MIME_TO_EXT: Record<string, string> = {
|
||||
'image/jpeg': 'jpg',
|
||||
'image/png': 'png',
|
||||
'image/webp': 'webp',
|
||||
'image/gif': 'gif',
|
||||
'image/heic': 'heic',
|
||||
'audio/ogg': 'ogg',
|
||||
'audio/mpeg': 'mp3',
|
||||
'audio/wav': 'wav',
|
||||
'audio/mp4': 'm4a',
|
||||
'video/mp4': 'mp4',
|
||||
'video/webm': 'webm',
|
||||
'video/quicktime': 'mov',
|
||||
'application/pdf': 'pdf',
|
||||
'text/plain': 'txt',
|
||||
'application/json': 'json',
|
||||
'application/zip': 'zip',
|
||||
};
|
||||
|
||||
// Fallback when `mimeType` is missing — Telegram photos and stickers arrive
|
||||
// without an explicit MIME on the attachment object. The channel bridge sets
|
||||
// `att.type` to a coarse media-class (`photo` / `sticker` / `voice` / etc.)
|
||||
// which is reliable enough to derive a canonical extension. Telegram's GIFs
|
||||
// are actually MP4, hence `animation: 'mp4'`.
|
||||
const TYPE_TO_EXT: Record<string, string> = {
|
||||
image: 'jpg',
|
||||
photo: 'jpg',
|
||||
sticker: 'webp',
|
||||
voice: 'ogg',
|
||||
audio: 'mp3',
|
||||
video: 'mp4',
|
||||
animation: 'mp4',
|
||||
};
|
||||
|
||||
export function extForMime(mime: unknown): string {
|
||||
if (typeof mime !== 'string' || !mime) return '';
|
||||
const clean = mime.split(';')[0].trim().toLowerCase();
|
||||
return MIME_TO_EXT[clean] ?? '';
|
||||
}
|
||||
|
||||
export function deriveAttachmentName(att: Record<string, unknown>): string {
|
||||
const explicit = att.name;
|
||||
if (typeof explicit === 'string' && explicit) return explicit;
|
||||
let ext = extForMime(att.mimeType);
|
||||
if (!ext && typeof att.type === 'string') {
|
||||
ext = TYPE_TO_EXT[att.type.toLowerCase()] ?? '';
|
||||
}
|
||||
const ts = Date.now();
|
||||
return ext ? `attachment-${ts}.${ext}` : `attachment-${ts}`;
|
||||
}
|
||||
@@ -135,6 +135,7 @@ export interface ChannelAdapter {
|
||||
// Optional
|
||||
setTyping?(platformId: string, threadId: string | null): Promise<void>;
|
||||
syncConversations?(): Promise<ConversationInfo[]>;
|
||||
resolveChannelName?(platformId: string): Promise<string | null>;
|
||||
|
||||
/**
|
||||
* Subscribe the bot to a thread so follow-up messages route via the
|
||||
|
||||
@@ -253,12 +253,12 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
|
||||
// Chat SDK dispatch (handling-events.mdx §"Handler dispatch order") is
|
||||
// exclusive: subscribed → onSubscribedMessage; unsubscribed+mention →
|
||||
// onNewMention; unsubscribed+pattern-match → onNewMessage. Registering
|
||||
// with `/./` lets the router see every plain message on every
|
||||
// unsubscribed thread the bot can see. The router short-circuits via
|
||||
// with `/[\s\S]*/` lets the router see every plain message (including
|
||||
// media-only messages with empty text) on every unsubscribed thread the
|
||||
// getMessagingGroupWithAgentCount (~1 DB read) for unwired channels,
|
||||
// so forwarding every one is cheap enough to not need a bridge-side
|
||||
// flood gate.
|
||||
chat.onNewMessage(/./, async (thread, message) => {
|
||||
chat.onNewMessage(/[\s\S]*/, async (thread, message) => {
|
||||
const channelId = adapter.channelIdFromThreadId(thread.id);
|
||||
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, false, true));
|
||||
});
|
||||
|
||||
197
src/circuit-breaker.test.ts
Normal file
197
src/circuit-breaker.test.ts
Normal file
@@ -0,0 +1,197 @@
|
||||
/**
|
||||
* Unit tests for the startup circuit breaker.
|
||||
*
|
||||
* Covers state transitions, the documented backoff schedule, and the
|
||||
* fresh-install case where DATA_DIR doesn't exist yet (the breaker runs
|
||||
* before initDb, so it has to create the dir itself).
|
||||
*/
|
||||
import fs from 'fs';
|
||||
import os from 'os';
|
||||
import path from 'path';
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||
|
||||
// vi.mock factories are hoisted above imports, so they can't close over local
|
||||
// consts. vi.hoisted is hoisted alongside the mock and runs before any
|
||||
// `import` — so it can only use globals (no path/os modules). Use require()
|
||||
// inside the callback to compute the test dir.
|
||||
const { TEST_DIR } = vi.hoisted(() => {
|
||||
const nodePath = require('path') as typeof import('path');
|
||||
const nodeOs = require('os') as typeof import('os');
|
||||
return { TEST_DIR: nodePath.join(nodeOs.tmpdir(), 'nanoclaw-cb-test') };
|
||||
});
|
||||
const CB_PATH = path.join(TEST_DIR, 'circuit-breaker.json');
|
||||
|
||||
vi.mock('./config.js', async () => {
|
||||
const actual = await vi.importActual<typeof import('./config.js')>('./config.js');
|
||||
return { ...actual, DATA_DIR: TEST_DIR };
|
||||
});
|
||||
|
||||
vi.mock('./log.js', () => ({
|
||||
log: {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
fatal: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
import { enforceStartupBackoff, resetCircuitBreaker } from './circuit-breaker.js';
|
||||
|
||||
function readState(): { attempt: number; timestamp: string } {
|
||||
return JSON.parse(fs.readFileSync(CB_PATH, 'utf-8'));
|
||||
}
|
||||
|
||||
function seedState(attempt: number, timestamp = new Date().toISOString()): void {
|
||||
fs.writeFileSync(CB_PATH, JSON.stringify({ attempt, timestamp }));
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
fs.mkdirSync(TEST_DIR, { recursive: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
});
|
||||
|
||||
describe('resetCircuitBreaker', () => {
|
||||
it('deletes the state file', () => {
|
||||
seedState(3);
|
||||
expect(fs.existsSync(CB_PATH)).toBe(true);
|
||||
resetCircuitBreaker();
|
||||
expect(fs.existsSync(CB_PATH)).toBe(false);
|
||||
});
|
||||
|
||||
it('is a no-op when the file does not exist', () => {
|
||||
expect(fs.existsSync(CB_PATH)).toBe(false);
|
||||
expect(() => resetCircuitBreaker()).not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe('enforceStartupBackoff — state transitions', () => {
|
||||
it('first run writes attempt=1 and does not delay', async () => {
|
||||
vi.useFakeTimers();
|
||||
const start = Date.now();
|
||||
await enforceStartupBackoff();
|
||||
// No timers should have been queued — clean first start is 0s.
|
||||
expect(Date.now() - start).toBe(0);
|
||||
expect(readState().attempt).toBe(1);
|
||||
});
|
||||
|
||||
it('within reset window, attempt is incremented', async () => {
|
||||
seedState(1);
|
||||
vi.useFakeTimers();
|
||||
const promise = enforceStartupBackoff();
|
||||
await vi.runAllTimersAsync();
|
||||
await promise;
|
||||
expect(readState().attempt).toBe(2);
|
||||
});
|
||||
|
||||
it('outside reset window (>1h), attempt resets to 1', async () => {
|
||||
const longAgo = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString();
|
||||
seedState(5, longAgo);
|
||||
await enforceStartupBackoff();
|
||||
expect(readState().attempt).toBe(1);
|
||||
});
|
||||
|
||||
it('exactly at the reset window boundary still counts as "within"', async () => {
|
||||
// RESET_WINDOW_MS = 60min. Use 59min59s to stay inside even if the test
|
||||
// takes a few ms to execute.
|
||||
const justInside = new Date(Date.now() - (60 * 60 * 1000 - 1000)).toISOString();
|
||||
seedState(2, justInside);
|
||||
vi.useFakeTimers();
|
||||
const promise = enforceStartupBackoff();
|
||||
await vi.runAllTimersAsync();
|
||||
await promise;
|
||||
expect(readState().attempt).toBe(3);
|
||||
});
|
||||
|
||||
it('treats a malformed state file as no prior state', async () => {
|
||||
fs.writeFileSync(CB_PATH, '{ this is not json');
|
||||
await enforceStartupBackoff();
|
||||
expect(readState().attempt).toBe(1);
|
||||
});
|
||||
|
||||
it('resetCircuitBreaker after a startup actually clears the counter for the next startup', async () => {
|
||||
// Simulate: crash, restart (attempt=2), graceful shutdown, restart again.
|
||||
seedState(1);
|
||||
vi.useFakeTimers();
|
||||
const p1 = enforceStartupBackoff();
|
||||
await vi.runAllTimersAsync();
|
||||
await p1;
|
||||
expect(readState().attempt).toBe(2);
|
||||
|
||||
resetCircuitBreaker();
|
||||
expect(fs.existsSync(CB_PATH)).toBe(false);
|
||||
|
||||
await enforceStartupBackoff();
|
||||
expect(readState().attempt).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('enforceStartupBackoff — backoff schedule', () => {
|
||||
/**
|
||||
* Documented schedule:
|
||||
*
|
||||
* clean start → 1 crash → 2 crash → 3 crash → 4 crash → 5 crash → 6+ crash
|
||||
* 0s → 0s → 10s → 30s → 2min → 5min → 15min cap
|
||||
*
|
||||
* Each row is [priorAttempt seeded in the file, expected delay this run
|
||||
* produces in seconds]. priorAttempt=null = no file = very first start.
|
||||
*
|
||||
* To assert the *requested* delay (not just observed elapsed real time),
|
||||
* we spy on global.setTimeout and look at the longest call. runAllTimersAsync
|
||||
* lets the function complete so we can move on.
|
||||
*/
|
||||
const cases: Array<{ label: string; priorAttempt: number | null; expectedDelaySec: number }> = [
|
||||
{ label: 'clean first start (no file)', priorAttempt: null, expectedDelaySec: 0 },
|
||||
{ label: 'first crash (attempt=2)', priorAttempt: 1, expectedDelaySec: 0 },
|
||||
{ label: 'second crash (attempt=3)', priorAttempt: 2, expectedDelaySec: 10 },
|
||||
{ label: 'third crash (attempt=4)', priorAttempt: 3, expectedDelaySec: 30 },
|
||||
{ label: 'fourth crash (attempt=5)', priorAttempt: 4, expectedDelaySec: 120 },
|
||||
{ label: 'fifth crash (attempt=6)', priorAttempt: 5, expectedDelaySec: 300 },
|
||||
{ label: 'sixth crash (attempt=7) — cap', priorAttempt: 6, expectedDelaySec: 900 },
|
||||
{ label: 'far past cap (attempt=20)', priorAttempt: 19, expectedDelaySec: 900 },
|
||||
];
|
||||
|
||||
for (const { label, priorAttempt, expectedDelaySec } of cases) {
|
||||
it(`${label}: delays ${expectedDelaySec}s`, async () => {
|
||||
if (priorAttempt !== null) seedState(priorAttempt);
|
||||
|
||||
vi.useFakeTimers();
|
||||
const setTimeoutSpy = vi.spyOn(global, 'setTimeout');
|
||||
|
||||
const promise = enforceStartupBackoff();
|
||||
await vi.runAllTimersAsync();
|
||||
await promise;
|
||||
|
||||
// enforceStartupBackoff only calls setTimeout when delaySec > 0. Pick
|
||||
// the longest delay it requested (vitest may queue small internal
|
||||
// timers we don't care about).
|
||||
const requestedDelays = setTimeoutSpy.mock.calls.map((c) => c[1] ?? 0);
|
||||
const maxDelayMs = requestedDelays.length ? Math.max(...requestedDelays) : 0;
|
||||
|
||||
expect(maxDelayMs).toBe(expectedDelaySec * 1000);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
describe('enforceStartupBackoff — fresh install (DATA_DIR missing)', () => {
|
||||
/**
|
||||
* The breaker runs before initDb (which is what creates DATA_DIR). On a
|
||||
* fresh checkout the dir doesn't exist yet, so write() must create it
|
||||
* before writing the state file — otherwise the host crashes on its very
|
||||
* first start.
|
||||
*/
|
||||
it('creates DATA_DIR on demand and does not throw', async () => {
|
||||
fs.rmSync(TEST_DIR, { recursive: true });
|
||||
expect(fs.existsSync(TEST_DIR)).toBe(false);
|
||||
|
||||
await expect(enforceStartupBackoff()).resolves.toBeUndefined();
|
||||
expect(fs.existsSync(TEST_DIR)).toBe(true);
|
||||
expect(fs.existsSync(CB_PATH)).toBe(true);
|
||||
expect(readState().attempt).toBe(1);
|
||||
});
|
||||
});
|
||||
84
src/circuit-breaker.ts
Normal file
84
src/circuit-breaker.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { DATA_DIR } from './config.js';
|
||||
import { log } from './log.js';
|
||||
|
||||
const CB_PATH = path.join(DATA_DIR, 'circuit-breaker.json');
|
||||
const RESET_WINDOW_MS = 60 * 60 * 1000; // 1 hour
|
||||
// Index = number of consecutive crashes (0 = clean start, attempt 1).
|
||||
// 6+ crashes capped at 15min.
|
||||
const BACKOFF_SCHEDULE_S = [0, 0, 10, 30, 120, 300, 900];
|
||||
|
||||
interface CircuitBreakerState {
|
||||
attempt: number;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
function read(): CircuitBreakerState | null {
|
||||
try {
|
||||
const raw = fs.readFileSync(CB_PATH, 'utf-8');
|
||||
return JSON.parse(raw) as CircuitBreakerState;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function write(state: CircuitBreakerState): void {
|
||||
// The breaker runs before initDb (which is what creates DATA_DIR), so on a
|
||||
// fresh checkout the dir may not exist yet.
|
||||
fs.mkdirSync(DATA_DIR, { recursive: true });
|
||||
fs.writeFileSync(CB_PATH, JSON.stringify(state, null, 2) + '\n');
|
||||
}
|
||||
|
||||
function getDelay(attempt: number): number {
|
||||
const idx = Math.min(attempt - 1, BACKOFF_SCHEDULE_S.length - 1);
|
||||
return BACKOFF_SCHEDULE_S[idx];
|
||||
}
|
||||
|
||||
export function resetCircuitBreaker(): void {
|
||||
try {
|
||||
fs.unlinkSync(CB_PATH);
|
||||
log.info('Circuit breaker reset on clean shutdown');
|
||||
} catch {}
|
||||
}
|
||||
|
||||
export async function enforceStartupBackoff(): Promise<void> {
|
||||
const now = new Date();
|
||||
const prev = read();
|
||||
|
||||
let attempt: number;
|
||||
if (!prev) {
|
||||
attempt = 1;
|
||||
} else {
|
||||
const elapsedMs = now.getTime() - new Date(prev.timestamp).getTime();
|
||||
if (elapsedMs < RESET_WINDOW_MS) {
|
||||
attempt = prev.attempt + 1;
|
||||
log.warn('Previous startup was not a clean shutdown', {
|
||||
previousAttempt: prev.attempt,
|
||||
previousTimestamp: prev.timestamp,
|
||||
elapsedSec: Math.round(elapsedMs / 1000),
|
||||
});
|
||||
} else {
|
||||
attempt = 1;
|
||||
log.info('Circuit breaker reset — last startup was over 1h ago', {
|
||||
previousAttempt: prev.attempt,
|
||||
previousTimestamp: prev.timestamp,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
write({ attempt, timestamp: now.toISOString() });
|
||||
|
||||
const delaySec = getDelay(attempt);
|
||||
if (delaySec > 0) {
|
||||
const resumeAt = new Date(now.getTime() + delaySec * 1000).toISOString();
|
||||
log.warn('Circuit breaker: delaying startup due to repeated crashes', {
|
||||
attempt,
|
||||
delaySec,
|
||||
resumeAt,
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, delaySec * 1000));
|
||||
log.info('Circuit breaker: backoff complete, resuming startup', { attempt });
|
||||
}
|
||||
}
|
||||
@@ -58,7 +58,7 @@ const activeContainers = new Map<string, { process: ChildProcess; containerName:
|
||||
* a duplicate container against the same session directory, producing
|
||||
* racy double-replies.
|
||||
*/
|
||||
const wakePromises = new Map<string, Promise<void>>();
|
||||
const wakePromises = new Map<string, Promise<boolean>>();
|
||||
|
||||
export function getActiveContainerCount(): number {
|
||||
return activeContainers.size;
|
||||
@@ -73,20 +73,32 @@ export function isContainerRunning(sessionId: string): boolean {
|
||||
* (the in-flight wake promise is reused).
|
||||
*
|
||||
* The container runs the v2 agent-runner which polls the session DB.
|
||||
*
|
||||
* Contract: never throws. Returns `true` on successful spawn, `false` on
|
||||
* transient spawn failure (e.g. OneCLI gateway unreachable). Callers don't
|
||||
* need to wrap — the inbound row stays pending and host-sweep retries on
|
||||
* its next tick. Callers that care (e.g. the router's typing indicator)
|
||||
* can branch on the boolean.
|
||||
*/
|
||||
export function wakeContainer(session: Session): Promise<void> {
|
||||
export function wakeContainer(session: Session): Promise<boolean> {
|
||||
if (activeContainers.has(session.id)) {
|
||||
log.debug('Container already running', { sessionId: session.id });
|
||||
return Promise.resolve();
|
||||
return Promise.resolve(true);
|
||||
}
|
||||
const existing = wakePromises.get(session.id);
|
||||
if (existing) {
|
||||
log.debug('Container wake already in-flight — joining existing promise', { sessionId: session.id });
|
||||
return existing;
|
||||
}
|
||||
const promise = spawnContainer(session).finally(() => {
|
||||
wakePromises.delete(session.id);
|
||||
});
|
||||
const promise = spawnContainer(session)
|
||||
.then(() => true)
|
||||
.catch((err) => {
|
||||
log.warn('wakeContainer failed — host-sweep will retry', { sessionId: session.id, err });
|
||||
return false;
|
||||
})
|
||||
.finally(() => {
|
||||
wakePromises.delete(session.id);
|
||||
});
|
||||
wakePromises.set(session.id, promise);
|
||||
return promise;
|
||||
}
|
||||
@@ -435,20 +447,18 @@ async function buildContainerArgs(
|
||||
}
|
||||
|
||||
// OneCLI gateway — injects HTTPS_PROXY + certs so container API calls
|
||||
// are routed through the agent vault for credential injection.
|
||||
try {
|
||||
if (agentIdentifier) {
|
||||
await onecli.ensureAgent({ name: agentGroup.name, identifier: agentIdentifier });
|
||||
}
|
||||
const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier });
|
||||
if (onecliApplied) {
|
||||
log.info('OneCLI gateway applied', { containerName });
|
||||
} else {
|
||||
log.warn('OneCLI gateway not applied — container will have no credentials', { containerName });
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn('OneCLI gateway error — container will have no credentials', { containerName, err });
|
||||
// are routed through the agent vault for credential injection. Treated as
|
||||
// a transient hard failure: if we can't wire the gateway, we don't spawn.
|
||||
// The caller (router or host-sweep) catches the throw, leaves the inbound
|
||||
// message pending, and the next sweep tick retries.
|
||||
if (agentIdentifier) {
|
||||
await onecli.ensureAgent({ name: agentGroup.name, identifier: agentIdentifier });
|
||||
}
|
||||
const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier });
|
||||
if (!onecliApplied) {
|
||||
throw new Error('OneCLI gateway not applied — refusing to spawn container without credentials');
|
||||
}
|
||||
log.info('OneCLI gateway applied', { containerName });
|
||||
|
||||
// Host gateway
|
||||
args.push(...hostGatewayArgs());
|
||||
|
||||
@@ -32,6 +32,14 @@ export function openOutboundDb(dbPath: string): Database.Database {
|
||||
return db;
|
||||
}
|
||||
|
||||
/** Open the outbound DB for a session with write access. Only safe to call when no container is running. */
|
||||
export function openOutboundDbRw(dbPath: string): Database.Database {
|
||||
const db = new Database(dbPath);
|
||||
db.pragma('journal_mode = DELETE');
|
||||
db.pragma('busy_timeout = 5000');
|
||||
return db;
|
||||
}
|
||||
|
||||
export function upsertSessionRouting(
|
||||
db: Database.Database,
|
||||
routing: { channel_type: string | null; platform_id: string | null; thread_id: string | null },
|
||||
@@ -180,6 +188,19 @@ export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[]
|
||||
.all() as ProcessingClaim[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete orphan 'processing' rows. Called by the host after killing a
|
||||
* container so the leftover claim doesn't trip claim-stuck on the next sweep
|
||||
* tick (which would kill the freshly respawned container before its
|
||||
* agent-runner can run its own startup cleanup).
|
||||
*
|
||||
* Safe because the host only writes to outbound.db when no container is
|
||||
* running (we just killed it). Returns the number of rows deleted.
|
||||
*/
|
||||
export function deleteOrphanProcessingClaims(outDb: Database.Database): number {
|
||||
return outDb.prepare("DELETE FROM processing_ack WHERE status = 'processing'").run().changes;
|
||||
}
|
||||
|
||||
export interface ContainerState {
|
||||
current_tool: string | null;
|
||||
tool_declared_timeout_ms: number | null;
|
||||
|
||||
@@ -23,6 +23,8 @@ import {
|
||||
sessionDir,
|
||||
inboundDbPath,
|
||||
outboundDbPath,
|
||||
readOutboxFiles,
|
||||
clearOutbox,
|
||||
} from './session-manager.js';
|
||||
import { getSession, findSession } from './db/sessions.js';
|
||||
import type { InboundEvent } from './channels/adapter.js';
|
||||
@@ -108,6 +110,147 @@ describe('session manager', () => {
|
||||
outDb.close();
|
||||
});
|
||||
|
||||
it('should reject outbound attachment filenames that escape the message outbox', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const dir = sessionDir('ag-1', 'sess-test');
|
||||
const msgOutbox = path.join(dir, 'outbox', 'msg-1');
|
||||
fs.mkdirSync(msgOutbox, { recursive: true });
|
||||
|
||||
const outside = path.join(TEST_DIR, 'outside.txt');
|
||||
fs.writeFileSync(outside, 'outside secret');
|
||||
|
||||
expect(readOutboxFiles('ag-1', 'sess-test', 'msg-1', ['../../../../../outside.txt'])).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should reject outbound attachment symlinks that escape the message outbox', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const dir = sessionDir('ag-1', 'sess-test');
|
||||
const msgOutbox = path.join(dir, 'outbox', 'msg-1');
|
||||
fs.mkdirSync(msgOutbox, { recursive: true });
|
||||
|
||||
const outside = path.join(TEST_DIR, 'outside.txt');
|
||||
fs.writeFileSync(outside, 'outside secret');
|
||||
fs.symlinkSync('../../../../../outside.txt', path.join(msgOutbox, 'safe-name.txt'));
|
||||
|
||||
expect(readOutboxFiles('ag-1', 'sess-test', 'msg-1', ['safe-name.txt'])).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should not recursively delete outside the outbox for unsafe message ids', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const victimDir = path.join(TEST_DIR, 'victim-dir');
|
||||
fs.mkdirSync(victimDir, { recursive: true });
|
||||
fs.writeFileSync(path.join(victimDir, 'keep.txt'), 'do not delete');
|
||||
|
||||
clearOutbox('ag-1', 'sess-test', '../../../../victim-dir');
|
||||
|
||||
expect(fs.existsSync(path.join(victimDir, 'keep.txt'))).toBe(true);
|
||||
});
|
||||
|
||||
it('should still read and clear normal basename outbox files', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const dir = sessionDir('ag-1', 'sess-test');
|
||||
const msgOutbox = path.join(dir, 'outbox', 'msg-1');
|
||||
fs.mkdirSync(msgOutbox, { recursive: true });
|
||||
fs.writeFileSync(path.join(msgOutbox, 'result.txt'), 'ok');
|
||||
|
||||
const files = readOutboxFiles('ag-1', 'sess-test', 'msg-1', ['result.txt']);
|
||||
expect(files).toHaveLength(1);
|
||||
expect(files?.[0]?.filename).toBe('result.txt');
|
||||
expect(files?.[0]?.data.toString()).toBe('ok');
|
||||
|
||||
clearOutbox('ag-1', 'sess-test', 'msg-1');
|
||||
expect(fs.existsSync(msgOutbox)).toBe(false);
|
||||
});
|
||||
|
||||
it('should reject inbound attachment writes through a pre-placed symlinked inbox dir', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
|
||||
// The container has /workspace write access, so it can pre create
|
||||
// inbox/<msgId> as a symlink to escape.
|
||||
const inboxRoot = path.join(sessionDir('ag-1', session.id), 'inbox');
|
||||
fs.mkdirSync(inboxRoot, { recursive: true });
|
||||
const evilTarget = path.join(TEST_DIR, 'evil-target');
|
||||
fs.mkdirSync(evilTarget, { recursive: true });
|
||||
fs.symlinkSync(evilTarget, path.join(inboxRoot, 'msg-evil'));
|
||||
|
||||
writeSessionMessage('ag-1', session.id, {
|
||||
id: 'msg-evil',
|
||||
kind: 'chat',
|
||||
timestamp: now(),
|
||||
content: JSON.stringify({
|
||||
text: 'evil',
|
||||
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
|
||||
}),
|
||||
});
|
||||
|
||||
expect(fs.existsSync(path.join(evilTarget, 'photo.png'))).toBe(false);
|
||||
});
|
||||
|
||||
it('should refuse to follow a pre-existing symlink at the inbound attachment path', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
|
||||
// The container pre creates inbox/<msgId>/photo.png as a symlink to a
|
||||
// host file. Without the wx flag, writeFileSync would follow it.
|
||||
const inboxDir = path.join(sessionDir('ag-1', session.id), 'inbox', 'msg-sym');
|
||||
fs.mkdirSync(inboxDir, { recursive: true });
|
||||
const outside = path.join(TEST_DIR, 'outside.txt');
|
||||
fs.writeFileSync(outside, 'ORIGINAL');
|
||||
fs.symlinkSync(outside, path.join(inboxDir, 'photo.png'));
|
||||
|
||||
writeSessionMessage('ag-1', session.id, {
|
||||
id: 'msg-sym',
|
||||
kind: 'chat',
|
||||
timestamp: now(),
|
||||
content: JSON.stringify({
|
||||
text: 'sym',
|
||||
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
|
||||
}),
|
||||
});
|
||||
|
||||
expect(fs.readFileSync(outside, 'utf-8')).toBe('ORIGINAL');
|
||||
});
|
||||
|
||||
it('should reject inbound attachments when messageId is unsafe', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
|
||||
writeSessionMessage('ag-1', session.id, {
|
||||
id: '../../escape',
|
||||
kind: 'chat',
|
||||
timestamp: now(),
|
||||
content: JSON.stringify({
|
||||
text: 'msgid',
|
||||
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
|
||||
}),
|
||||
});
|
||||
|
||||
const inboxRoot = path.join(sessionDir('ag-1', session.id), 'inbox');
|
||||
if (fs.existsSync(inboxRoot)) {
|
||||
expect(fs.readdirSync(inboxRoot)).toEqual([]);
|
||||
}
|
||||
});
|
||||
|
||||
it('should still save inbound attachments with safe basenames', () => {
|
||||
initSessionFolder('ag-1', 'sess-test');
|
||||
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
|
||||
writeSessionMessage('ag-1', session.id, {
|
||||
id: 'msg-ok',
|
||||
kind: 'chat',
|
||||
timestamp: now(),
|
||||
content: JSON.stringify({
|
||||
text: 'ok',
|
||||
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
|
||||
}),
|
||||
});
|
||||
|
||||
const expected = path.join(sessionDir('ag-1', session.id), 'inbox', 'msg-ok', 'photo.png');
|
||||
expect(fs.existsSync(expected)).toBe(true);
|
||||
expect(fs.readFileSync(expected, 'utf-8')).toBe('PNGBYTES');
|
||||
});
|
||||
|
||||
it('should resolve to existing session (shared mode)', () => {
|
||||
const { session: s1, created: c1 } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
expect(c1).toBe(true);
|
||||
|
||||
@@ -3,9 +3,17 @@
|
||||
* ACTION-ITEMS item 9. Lives on the pure helper `decideStuckAction` so we
|
||||
* don't have to mock the filesystem or the container runner.
|
||||
*/
|
||||
import Database from 'better-sqlite3';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, decideStuckAction } from './host-sweep.js';
|
||||
import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js';
|
||||
import {
|
||||
ABSOLUTE_CEILING_MS,
|
||||
CLAIM_STUCK_MS,
|
||||
_resetStuckProcessingRowsForTesting,
|
||||
decideStuckAction,
|
||||
} from './host-sweep.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
const BASE = Date.parse('2026-04-20T12:00:00.000Z');
|
||||
|
||||
@@ -144,3 +152,143 @@ describe('decideStuckAction', () => {
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
});
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Orphan claim cleanup (regression test for the SIGKILL → claim-stuck loop)
|
||||
//
|
||||
// Repro of the production bug seen 2026-04-30: container A claimed message M
|
||||
// (writes processing_ack row with status='processing'). Host kills A by
|
||||
// absolute-ceiling. Old behavior: messages_in.M was reset to pending but
|
||||
// processing_ack.M survived. On the next sweep tick, wakeContainer spawned B,
|
||||
// the same-tick SLA check saw M's stale claim age (hours), and SIGKILL'd B
|
||||
// before agent-runner could run clearStaleProcessingAcks(). Loop. The fix
|
||||
// deletes processing_ack 'processing' rows when the host kills/cleans the
|
||||
// container, breaking the loop atomically.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
function makeSessionDbs(): { inDb: Database.Database; outDb: Database.Database } {
|
||||
const inDb = new Database(':memory:');
|
||||
inDb.exec(`
|
||||
CREATE TABLE messages_in (
|
||||
id TEXT PRIMARY KEY,
|
||||
seq INTEGER UNIQUE,
|
||||
kind TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
process_after TEXT,
|
||||
recurrence TEXT,
|
||||
series_id TEXT,
|
||||
tries INTEGER DEFAULT 0,
|
||||
trigger INTEGER NOT NULL DEFAULT 1,
|
||||
platform_id TEXT,
|
||||
channel_type TEXT,
|
||||
thread_id TEXT,
|
||||
content TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
const outDb = new Database(':memory:');
|
||||
outDb.exec(`
|
||||
CREATE TABLE processing_ack (
|
||||
message_id TEXT PRIMARY KEY,
|
||||
status TEXT NOT NULL,
|
||||
status_changed TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
return { inDb, outDb };
|
||||
}
|
||||
|
||||
function fakeSession(): Session {
|
||||
return {
|
||||
id: 'sess-test',
|
||||
agent_group_id: 'ag-test',
|
||||
messaging_group_id: null,
|
||||
thread_id: null,
|
||||
agent_provider: null,
|
||||
status: 'active',
|
||||
container_status: 'stopped',
|
||||
last_active: null,
|
||||
created_at: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
describe('deleteOrphanProcessingClaims', () => {
|
||||
it('removes only processing rows, leaves completed/failed alone', () => {
|
||||
const { outDb } = makeSessionDbs();
|
||||
const ts = new Date().toISOString();
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-proc', 'processing', ?)").run(ts);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-done', 'completed', ?)").run(ts);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-fail', 'failed', ?)").run(ts);
|
||||
|
||||
const removed = deleteOrphanProcessingClaims(outDb);
|
||||
|
||||
expect(removed).toBe(1);
|
||||
const remaining = outDb.prepare('SELECT message_id, status FROM processing_ack ORDER BY message_id').all();
|
||||
expect(remaining).toEqual([
|
||||
{ message_id: 'm-done', status: 'completed' },
|
||||
{ message_id: 'm-fail', status: 'failed' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('returns 0 when nothing to clear', () => {
|
||||
const { outDb } = makeSessionDbs();
|
||||
expect(deleteOrphanProcessingClaims(outDb)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('resetStuckProcessingRows — orphan claim cleanup', () => {
|
||||
it('deletes orphan processing_ack rows so next sweep tick does not see them', () => {
|
||||
const { inDb, outDb } = makeSessionDbs();
|
||||
const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); // 2h ago
|
||||
|
||||
// messages_in.status stays 'pending' during processing — only the
|
||||
// container's processing_ack moves to 'processing'. See
|
||||
// src/db/schema.ts header comment on processing_ack.
|
||||
inDb
|
||||
.prepare(
|
||||
"INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES ('m-1', 1, 'chat', ?, 'pending', '{}')",
|
||||
)
|
||||
.run(claimedAt);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-1', 'processing', ?)").run(claimedAt);
|
||||
|
||||
// Sanity: the orphan claim is what would trip claim-stuck.
|
||||
expect(getProcessingClaims(outDb)).toHaveLength(1);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling');
|
||||
|
||||
// Regression assertion: orphan claim is gone — next sweep tick will see
|
||||
// an empty claims list and not kill the freshly respawned container.
|
||||
expect(getProcessingClaims(outDb)).toEqual([]);
|
||||
|
||||
// And the message itself was rescheduled with backoff (existing behavior).
|
||||
const row = inDb.prepare('SELECT status, tries, process_after FROM messages_in WHERE id = ?').get('m-1') as {
|
||||
status: string;
|
||||
tries: number;
|
||||
process_after: string | null;
|
||||
};
|
||||
expect(row.status).toBe('pending');
|
||||
expect(row.tries).toBe(1);
|
||||
expect(row.process_after).not.toBeNull();
|
||||
});
|
||||
|
||||
it('still clears orphan claims even when the inbound message has already been retried (skip path)', () => {
|
||||
// Edge case: the inbound row was already rescheduled (process_after in
|
||||
// future), so the per-message retry loop skips it. The orphan in
|
||||
// processing_ack must still be removed — otherwise the bug remains.
|
||||
const { inDb, outDb } = makeSessionDbs();
|
||||
const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString();
|
||||
const future = new Date(Date.now() + 60_000).toISOString();
|
||||
|
||||
inDb
|
||||
.prepare(
|
||||
"INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, tries, content) VALUES ('m-2', 2, 'chat', ?, 'pending', ?, 1, '{}')",
|
||||
)
|
||||
.run(claimedAt, future);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck');
|
||||
|
||||
expect(getProcessingClaims(outDb)).toEqual([]);
|
||||
const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number };
|
||||
expect(row.tries).toBe(1); // not bumped, the skip path held
|
||||
});
|
||||
});
|
||||
|
||||
@@ -33,6 +33,7 @@ import { getActiveSessions } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import {
|
||||
countDueMessages,
|
||||
deleteOrphanProcessingClaims,
|
||||
getContainerState,
|
||||
getMessageForRetry,
|
||||
getProcessingClaims,
|
||||
@@ -42,7 +43,7 @@ import {
|
||||
type ContainerState,
|
||||
} from './db/session-db.js';
|
||||
import { log } from './log.js';
|
||||
import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js';
|
||||
import { openInboundDb, openOutboundDb, openOutboundDbRw, inboundDbPath, heartbeatPath } from './session-manager.js';
|
||||
import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
@@ -168,6 +169,8 @@ async function sweepSession(session: Session): Promise<void> {
|
||||
const dueCount = countDueMessages(inDb);
|
||||
if (dueCount > 0 && !isContainerRunning(session.id)) {
|
||||
log.info('Waking container for due messages', { sessionId: session.id, count: dueCount });
|
||||
// wakeContainer never throws — transient spawn failures (OneCLI down,
|
||||
// etc.) return false and leave messages pending for the next tick.
|
||||
await wakeContainer(session);
|
||||
}
|
||||
|
||||
@@ -247,11 +250,21 @@ function enforceRunningContainerSla(
|
||||
resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck');
|
||||
}
|
||||
|
||||
export function _resetStuckProcessingRowsForTesting(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
reason: string,
|
||||
): void {
|
||||
resetStuckProcessingRows(inDb, outDb, session, reason, outDb);
|
||||
}
|
||||
|
||||
function resetStuckProcessingRows(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
reason: string,
|
||||
writableOutDb?: Database.Database,
|
||||
): void {
|
||||
const claims = getProcessingClaims(outDb);
|
||||
const now = Date.now();
|
||||
@@ -283,4 +296,22 @@ function resetStuckProcessingRows(
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the orphan 'processing' rows. Without this, the next sweep tick
|
||||
// would re-read them, see the old status_changed timestamp, conclude the
|
||||
// freshly respawned container is stuck, and SIGKILL it before its
|
||||
// agent-runner has a chance to run clearStaleProcessingAcks() on startup.
|
||||
const ownsDb = !writableOutDb;
|
||||
let useDb: Database.Database | null = writableOutDb ?? null;
|
||||
try {
|
||||
if (!useDb) useDb = openOutboundDbRw(session.agent_group_id, session.id);
|
||||
const cleared = deleteOrphanProcessingClaims(useDb);
|
||||
if (cleared > 0) {
|
||||
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn('Failed to clear orphan processing claims', { sessionId: session.id, err });
|
||||
} finally {
|
||||
if (ownsDb) useDb?.close();
|
||||
}
|
||||
}
|
||||
|
||||
15
src/index.ts
15
src/index.ts
@@ -7,6 +7,7 @@
|
||||
import path from 'path';
|
||||
|
||||
import { DATA_DIR } from './config.js';
|
||||
import { enforceStartupBackoff, resetCircuitBreaker } from './circuit-breaker.js';
|
||||
import { migrateGroupsToClaudeLocal } from './claude-md-compose.js';
|
||||
import { initDb } from './db/connection.js';
|
||||
import { runMigrations } from './db/migrations/index.js';
|
||||
@@ -63,6 +64,9 @@ import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from
|
||||
async function main(): Promise<void> {
|
||||
log.info('NanoClaw starting');
|
||||
|
||||
// 0. Circuit breaker — backoff on rapid restarts
|
||||
await enforceStartupBackoff();
|
||||
|
||||
// 1. Init central DB
|
||||
const dbPath = path.join(DATA_DIR, 'v2.db');
|
||||
const db = initDb(dbPath);
|
||||
@@ -183,8 +187,15 @@ async function shutdown(signal: string): Promise<void> {
|
||||
stopDeliveryPolls();
|
||||
stopHostSweep();
|
||||
await stopCliServer();
|
||||
await teardownChannelAdapters();
|
||||
process.exit(0);
|
||||
try {
|
||||
await teardownChannelAdapters();
|
||||
} finally {
|
||||
// Always reset on graceful shutdown — even if teardown threw, we got here
|
||||
// via SIGTERM/SIGINT, not a crash, so the next start shouldn't be counted
|
||||
// as one.
|
||||
resetCircuitBreaker();
|
||||
process.exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
||||
|
||||
@@ -153,8 +153,10 @@ describe('unknown-channel registration flow', () => {
|
||||
expect(kind).toBe('chat-sdk');
|
||||
const payload = JSON.parse(content as string);
|
||||
expect(payload.type).toBe('ask_question');
|
||||
// Card names the target agent so the owner knows what they're wiring to.
|
||||
expect(payload.question).toContain('Andy');
|
||||
// Single-agent card offers a direct "Connect to <name>" button.
|
||||
const connectOption = payload.options.find((o: { value: string }) => o.value.startsWith('connect:'));
|
||||
expect(connectOption).toBeDefined();
|
||||
expect(connectOption.label).toContain('Andy');
|
||||
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const rows = getDb().prepare('SELECT * FROM pending_channel_approvals').all() as Array<{
|
||||
@@ -202,11 +204,11 @@ describe('unknown-channel registration flow', () => {
|
||||
};
|
||||
expect(pending).toBeDefined();
|
||||
|
||||
// Owner clicks approve.
|
||||
// Owner clicks "Connect to Andy" (single-agent card).
|
||||
for (const handler of getResponseHandlers()) {
|
||||
const claimed = await handler({
|
||||
questionId: pending.messaging_group_id,
|
||||
value: 'approve',
|
||||
value: 'connect:ag-1',
|
||||
userId: 'owner', // raw platform id — handler namespaces it
|
||||
channelType: 'telegram',
|
||||
platformId: 'dm-owner',
|
||||
@@ -215,7 +217,7 @@ describe('unknown-channel registration flow', () => {
|
||||
if (claimed) break;
|
||||
}
|
||||
|
||||
// Wiring created with MVP defaults.
|
||||
// Wiring created with defaults.
|
||||
const mga = getDb()
|
||||
.prepare('SELECT * FROM messaging_group_agents WHERE messaging_group_id = ?')
|
||||
.get(pending.messaging_group_id) as {
|
||||
@@ -261,7 +263,7 @@ describe('unknown-channel registration flow', () => {
|
||||
for (const handler of getResponseHandlers()) {
|
||||
const claimed = await handler({
|
||||
questionId: pending.messaging_group_id,
|
||||
value: 'approve',
|
||||
value: 'connect:ag-1',
|
||||
userId: 'owner',
|
||||
channelType: 'telegram',
|
||||
platformId: 'dm-owner',
|
||||
|
||||
@@ -5,24 +5,32 @@
|
||||
* addressed to the bot (SDK-confirmed mention or DM), it calls
|
||||
* `requestChannelApproval` instead of silently dropping. The flow:
|
||||
*
|
||||
* 1. Pick the target agent group we'd wire to (MVP: first by name).
|
||||
* Multi-agent picker is a follow-up — see ACTION-ITEMS.
|
||||
* 1. Gather all existing agent groups.
|
||||
* 2. Pick an eligible approver (owner / admin) and a reachable DM for
|
||||
* them, reusing the same primitives the sender-approval flow uses.
|
||||
* 3. Deliver an Approve / Ignore card that names the target agent
|
||||
* explicitly so the owner knows what they're wiring to.
|
||||
* 3. Deliver a card with three action families:
|
||||
* a. Connect to [agent] — one button per existing agent group.
|
||||
* Single-agent installs get a one-click connect.
|
||||
* b. Connect new agent — prompts for a free-text name, creates
|
||||
* the agent immediately on reply.
|
||||
* c. Reject — deny the channel.
|
||||
* 4. Record a `pending_channel_approvals` row holding the original event
|
||||
* so it can be re-routed on approve.
|
||||
* so it can be re-routed on connect/create.
|
||||
*
|
||||
* On approve (handler in index.ts):
|
||||
* - Create `messaging_group_agents` with MVP defaults
|
||||
* On connect (handler in index.ts):
|
||||
* - Create `messaging_group_agents` with defaults
|
||||
* (mention-sticky for groups / pattern='.' for DMs,
|
||||
* sender_scope='known', ignored_message_policy='accumulate')
|
||||
* - Add the triggering sender to `agent_group_members` so sender_scope
|
||||
* doesn't bounce the replayed message into a sender-approval cascade
|
||||
* - Delete the pending row, replay the original event
|
||||
*
|
||||
* On ignore:
|
||||
* On connect new agent (handler in index.ts):
|
||||
* - Prompt for a free-text agent name via DM
|
||||
* - On reply: create the agent group + filesystem, then wire
|
||||
* and replay as above
|
||||
*
|
||||
* On reject:
|
||||
* - Set `messaging_groups.denied_at = now()` so the router stops
|
||||
* escalating on this channel until an admin explicitly re-wires
|
||||
* - Delete the pending row
|
||||
@@ -36,19 +44,81 @@
|
||||
* - Approver has no reachable DM.
|
||||
* - Delivery adapter missing.
|
||||
*/
|
||||
import { normalizeOptions, type RawOption } from '../../channels/ask-question.js';
|
||||
import { getAllAgentGroups } from '../../db/agent-groups.js';
|
||||
import { getMessagingGroup } from '../../db/messaging-groups.js';
|
||||
import { normalizeOptions, type NormalizedOption, type RawOption } from '../../channels/ask-question.js';
|
||||
import { createAgentGroup, getAgentGroup, getAgentGroupByFolder, getAllAgentGroups } from '../../db/agent-groups.js';
|
||||
import { getChannelAdapter } from '../../channels/channel-registry.js';
|
||||
import { getMessagingGroup, updateMessagingGroup } from '../../db/messaging-groups.js';
|
||||
import { getDeliveryAdapter } from '../../delivery.js';
|
||||
import { initGroupFilesystem } from '../../group-init.js';
|
||||
import { log } from '../../log.js';
|
||||
import type { InboundEvent } from '../../channels/adapter.js';
|
||||
import type { AgentGroup } from '../../types.js';
|
||||
import { pickApprovalDelivery, pickApprover } from '../approvals/primitive.js';
|
||||
import { createPendingChannelApproval, hasInFlightChannelApproval } from './db/pending-channel-approvals.js';
|
||||
|
||||
const APPROVAL_OPTIONS: RawOption[] = [
|
||||
{ label: 'Approve', selectedLabel: '✅ Wired', value: 'approve' },
|
||||
{ label: 'Ignore', selectedLabel: '🙅 Ignored', value: 'reject' },
|
||||
];
|
||||
// ── Value constants (response handler in index.ts parses these) ──
|
||||
|
||||
export const CONNECT_PREFIX = 'connect:';
|
||||
export const NEW_AGENT_VALUE = 'new_agent';
|
||||
export const CHOOSE_EXISTING_VALUE = 'choose_existing';
|
||||
export const REJECT_VALUE = 'reject';
|
||||
|
||||
// ── Utilities ──
|
||||
|
||||
function toFolder(name: string): string {
|
||||
return (
|
||||
name
|
||||
.toLowerCase()
|
||||
.replace(/[^a-z0-9]+/g, '-')
|
||||
.replace(/^-+|-+$/g, '') || 'unnamed'
|
||||
);
|
||||
}
|
||||
|
||||
// ── Card builders ──
|
||||
|
||||
function buildApprovalOptions(agentGroups: AgentGroup[]): RawOption[] {
|
||||
const options: RawOption[] = [];
|
||||
if (agentGroups.length === 1) {
|
||||
options.push({
|
||||
label: `Connect to ${agentGroups[0].name}`,
|
||||
selectedLabel: `✅ Connected to ${agentGroups[0].name}`,
|
||||
value: `${CONNECT_PREFIX}${agentGroups[0].id}`,
|
||||
});
|
||||
} else {
|
||||
options.push({
|
||||
label: 'Choose existing agent',
|
||||
selectedLabel: '📋 Choosing…',
|
||||
value: CHOOSE_EXISTING_VALUE,
|
||||
});
|
||||
}
|
||||
options.push({
|
||||
label: 'Connect new agent',
|
||||
selectedLabel: '🆕 Connecting new agent…',
|
||||
value: NEW_AGENT_VALUE,
|
||||
});
|
||||
options.push({
|
||||
label: 'Reject',
|
||||
selectedLabel: '🙅 Rejected',
|
||||
value: REJECT_VALUE,
|
||||
});
|
||||
return options;
|
||||
}
|
||||
|
||||
function buildQuestionText(
|
||||
isGroup: boolean,
|
||||
senderName: string | undefined,
|
||||
channelName: string | null,
|
||||
channelType: string,
|
||||
): string {
|
||||
const who = senderName ?? 'Someone';
|
||||
if (isGroup) {
|
||||
const where = channelName ? `${channelName} on ${channelType}` : `a ${channelType} channel`;
|
||||
return `${who} mentioned your bot in ${where}. How would you like to handle this channel?`;
|
||||
}
|
||||
return `${who} sent your bot a DM on ${channelType}. How would you like to handle it?`;
|
||||
}
|
||||
|
||||
// ── Main flow ──
|
||||
|
||||
export interface RequestChannelApprovalInput {
|
||||
messagingGroupId: string;
|
||||
@@ -58,17 +128,11 @@ export interface RequestChannelApprovalInput {
|
||||
export async function requestChannelApproval(input: RequestChannelApprovalInput): Promise<void> {
|
||||
const { messagingGroupId, event } = input;
|
||||
|
||||
// In-flight dedup: don't spam the owner if the same unwired channel
|
||||
// gets more mentions / DMs while a card is already pending.
|
||||
if (hasInFlightChannelApproval(messagingGroupId)) {
|
||||
log.debug('Channel registration already in flight — dropping retry', {
|
||||
messagingGroupId,
|
||||
});
|
||||
log.debug('Channel registration already in flight — dropping retry', { messagingGroupId });
|
||||
return;
|
||||
}
|
||||
|
||||
// MVP: pick the first agent group by name. Multi-agent systems will get
|
||||
// a richer card later (user picks the target from a list).
|
||||
const agentGroups = getAllAgentGroups();
|
||||
if (agentGroups.length === 0) {
|
||||
log.warn('Channel registration skipped — no agent groups configured. Run /init-first-agent.', {
|
||||
@@ -76,55 +140,65 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
|
||||
});
|
||||
return;
|
||||
}
|
||||
const target = agentGroups[0];
|
||||
// Use first agent group for approver resolution — owners and global admins
|
||||
// are returned regardless of which group we pass.
|
||||
const referenceGroup = agentGroups[0];
|
||||
|
||||
// pickApprover takes the target agent group's id — gets scoped admins +
|
||||
// global admins + owners. For fresh installs with only an owner, the
|
||||
// owner is returned.
|
||||
const approvers = pickApprover(target.id);
|
||||
const approvers = pickApprover(referenceGroup.id);
|
||||
if (approvers.length === 0) {
|
||||
log.warn('Channel registration skipped — no owner or admin configured', {
|
||||
messagingGroupId,
|
||||
targetAgentGroupId: target.id,
|
||||
targetAgentGroupId: referenceGroup.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const originMg = getMessagingGroup(messagingGroupId);
|
||||
const originChannelType = originMg?.channel_type ?? '';
|
||||
|
||||
// Resolve channel name if not yet persisted.
|
||||
if (originMg && !originMg.name) {
|
||||
const channelAdapter = getChannelAdapter(originChannelType);
|
||||
if (channelAdapter?.resolveChannelName) {
|
||||
try {
|
||||
const name = await channelAdapter.resolveChannelName(originMg.platform_id);
|
||||
if (name) {
|
||||
updateMessagingGroup(originMg.id, { name });
|
||||
originMg.name = name;
|
||||
}
|
||||
} catch {
|
||||
/* non-critical */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const delivery = await pickApprovalDelivery(approvers, originChannelType);
|
||||
if (!delivery) {
|
||||
log.warn('Channel registration skipped — no DM channel for any approver', {
|
||||
messagingGroupId,
|
||||
targetAgentGroupId: target.id,
|
||||
targetAgentGroupId: referenceGroup.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const isGroup = event.message?.isGroup ?? originMg?.is_group === 1;
|
||||
|
||||
// Extract sender name from the event content for a human-readable card.
|
||||
let senderName: string | undefined;
|
||||
try {
|
||||
const parsed = JSON.parse(event.message.content) as Record<string, unknown>;
|
||||
senderName = (parsed.senderName ?? parsed.sender) as string | undefined;
|
||||
} catch {
|
||||
// non-critical — fall through to generic wording
|
||||
// non-critical
|
||||
}
|
||||
|
||||
const title = isGroup ? '📣 Bot mentioned in new chat' : '💬 New direct message';
|
||||
const question = isGroup
|
||||
? senderName
|
||||
? `${senderName} mentioned your agent in a ${originChannelType} channel. Wire it to ${target.name} and let it engage?`
|
||||
: `Your agent was mentioned in a ${originChannelType} channel. Wire it to ${target.name} and let it engage?`
|
||||
: senderName
|
||||
? `${senderName} DM'd your agent on ${originChannelType}. Wire it to ${target.name} and let it respond?`
|
||||
: `Someone DM'd your agent on ${originChannelType}. Wire it to ${target.name} and let it respond?`;
|
||||
const options = normalizeOptions(APPROVAL_OPTIONS);
|
||||
const channelName = originMg?.name ?? null;
|
||||
const title = isGroup ? '📣 Bot mentioned in new channel' : '💬 New direct message';
|
||||
const question = buildQuestionText(isGroup, senderName, channelName, originChannelType);
|
||||
const options = normalizeOptions(buildApprovalOptions(agentGroups));
|
||||
|
||||
createPendingChannelApproval({
|
||||
messaging_group_id: messagingGroupId,
|
||||
agent_group_id: target.id,
|
||||
agent_group_id: referenceGroup.id,
|
||||
original_message: JSON.stringify(event),
|
||||
approver_user_id: delivery.userId,
|
||||
created_at: new Date().toISOString(),
|
||||
@@ -134,9 +208,7 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
|
||||
|
||||
const adapter = getDeliveryAdapter();
|
||||
if (!adapter) {
|
||||
log.error('Channel registration row created but no delivery adapter is wired', {
|
||||
messagingGroupId,
|
||||
});
|
||||
log.error('Channel registration row created but no delivery adapter is wired', { messagingGroupId });
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -148,9 +220,6 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
|
||||
'chat-sdk',
|
||||
JSON.stringify({
|
||||
type: 'ask_question',
|
||||
// Use messaging_group_id as the questionId — it's unique per card
|
||||
// (PK on pending table dedups) and lets the response handler look
|
||||
// up the pending row directly without another index.
|
||||
questionId: messagingGroupId,
|
||||
title,
|
||||
question,
|
||||
@@ -159,16 +228,56 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
|
||||
);
|
||||
log.info('Channel registration card delivered', {
|
||||
messagingGroupId,
|
||||
targetAgentGroupId: target.id,
|
||||
agentGroupCount: agentGroups.length,
|
||||
approver: delivery.userId,
|
||||
});
|
||||
} catch (err) {
|
||||
log.error('Channel registration card delivery failed', {
|
||||
messagingGroupId,
|
||||
err,
|
||||
});
|
||||
log.error('Channel registration card delivery failed', { messagingGroupId, err });
|
||||
}
|
||||
}
|
||||
|
||||
export const APPROVE_VALUE = 'approve';
|
||||
export const REJECT_VALUE = 'reject';
|
||||
// ── Helpers for the response handler (index.ts) ──
|
||||
|
||||
/**
|
||||
* Build normalized options for the agent-selection follow-up card.
|
||||
*/
|
||||
export function buildAgentSelectionOptions(agentGroups: AgentGroup[]): NormalizedOption[] {
|
||||
const options: RawOption[] = agentGroups.map((ag) => ({
|
||||
label: ag.name,
|
||||
selectedLabel: `✅ Connected to ${ag.name}`,
|
||||
value: `${CONNECT_PREFIX}${ag.id}`,
|
||||
}));
|
||||
options.push({
|
||||
label: 'Cancel',
|
||||
selectedLabel: '🙅 Cancelled',
|
||||
value: REJECT_VALUE,
|
||||
});
|
||||
return normalizeOptions(options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new agent group and initialize its filesystem. Handles
|
||||
* folder-name collisions with numeric suffixes.
|
||||
*/
|
||||
export function createNewAgentGroup(name: string): AgentGroup {
|
||||
let folder = toFolder(name);
|
||||
const baseFolder = folder;
|
||||
let suffix = 2;
|
||||
while (getAgentGroupByFolder(folder)) {
|
||||
folder = `${baseFolder}-${suffix}`;
|
||||
suffix++;
|
||||
}
|
||||
|
||||
const agId = `ag-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
createAgentGroup({
|
||||
id: agId,
|
||||
name,
|
||||
folder,
|
||||
agent_provider: null,
|
||||
created_at: new Date().toISOString(),
|
||||
});
|
||||
|
||||
const ag = getAgentGroup(agId)!;
|
||||
initGroupFilesystem(ag);
|
||||
return ag;
|
||||
}
|
||||
|
||||
@@ -51,6 +51,12 @@ export function hasInFlightChannelApproval(messagingGroupId: string): boolean {
|
||||
return row !== undefined;
|
||||
}
|
||||
|
||||
export function updatePendingChannelApprovalCard(messagingGroupId: string, title: string, optionsJson: string): void {
|
||||
getDb()
|
||||
.prepare('UPDATE pending_channel_approvals SET title = ?, options_json = ? WHERE messaging_group_id = ?')
|
||||
.run(title, optionsJson, messagingGroupId);
|
||||
}
|
||||
|
||||
export function deletePendingChannelApproval(messagingGroupId: string): void {
|
||||
getDb().prepare('DELETE FROM pending_channel_approvals WHERE messaging_group_id = ?').run(messagingGroupId);
|
||||
}
|
||||
|
||||
@@ -16,27 +16,53 @@
|
||||
* access gate is not registered and core defaults to allow-all.
|
||||
*/
|
||||
import { recordDroppedMessage } from '../../db/dropped-messages.js';
|
||||
import { getAgentGroup, getAllAgentGroups } from '../../db/agent-groups.js';
|
||||
import { createMessagingGroupAgent, setMessagingGroupDeniedAt } from '../../db/messaging-groups.js';
|
||||
import {
|
||||
routeInbound,
|
||||
setAccessGate,
|
||||
setChannelRequestGate,
|
||||
setMessageInterceptor,
|
||||
setSenderResolver,
|
||||
setSenderScopeGate,
|
||||
type AccessGateResult,
|
||||
} from '../../router.js';
|
||||
import type { InboundEvent } from '../../channels/adapter.js';
|
||||
import { registerResponseHandler, type ResponsePayload } from '../../response-registry.js';
|
||||
import { getDeliveryAdapter } from '../../delivery.js';
|
||||
import { log } from '../../log.js';
|
||||
import type { MessagingGroup, MessagingGroupAgent } from '../../types.js';
|
||||
import { canAccessAgentGroup } from './access.js';
|
||||
import { requestChannelApproval } from './channel-approval.js';
|
||||
import {
|
||||
buildAgentSelectionOptions,
|
||||
CHOOSE_EXISTING_VALUE,
|
||||
CONNECT_PREFIX,
|
||||
createNewAgentGroup,
|
||||
NEW_AGENT_VALUE,
|
||||
REJECT_VALUE,
|
||||
requestChannelApproval,
|
||||
} from './channel-approval.js';
|
||||
import { addMember } from './db/agent-group-members.js';
|
||||
import { deletePendingChannelApproval, getPendingChannelApproval } from './db/pending-channel-approvals.js';
|
||||
import {
|
||||
deletePendingChannelApproval,
|
||||
getPendingChannelApproval,
|
||||
updatePendingChannelApprovalCard,
|
||||
} from './db/pending-channel-approvals.js';
|
||||
import { deletePendingSenderApproval, getPendingSenderApproval } from './db/pending-sender-approvals.js';
|
||||
import { hasAdminPrivilege } from './db/user-roles.js';
|
||||
import { getUser, upsertUser } from './db/users.js';
|
||||
import { requestSenderApproval } from './sender-approval.js';
|
||||
import { ensureUserDm } from './user-dm.js';
|
||||
|
||||
// ── Free-text name input state ──
|
||||
// Tracks approvers waiting for a text reply with the agent name. Keyed by
|
||||
// namespaced userId (e.g. "slack:U0ABC"). Cleared on receipt or restart.
|
||||
interface PendingNameInput {
|
||||
channelMgId: string;
|
||||
dmChannelType: string;
|
||||
dmPlatformId: string;
|
||||
}
|
||||
const awaitingNameInput = new Map<string, PendingNameInput>();
|
||||
|
||||
function extractAndUpsertUser(event: InboundEvent): string | null {
|
||||
let content: Record<string, unknown>;
|
||||
@@ -271,22 +297,17 @@ setChannelRequestGate(async (mg, event) => {
|
||||
* by messaging_group_id). If no such row, return false so downstream
|
||||
* handlers get a shot.
|
||||
*
|
||||
* Approve: create the wiring with MVP defaults (mention-sticky for
|
||||
* groups / pattern='.' for DMs; sender_scope='known';
|
||||
* ignored_message_policy='accumulate'), add the triggering sender as a
|
||||
* member so sender_scope doesn't immediately bounce them into a
|
||||
* sender-approval card, then replay the original event.
|
||||
*
|
||||
* Deny: set `messaging_groups.denied_at = now()` so future mentions on
|
||||
* this channel drop silently until an admin explicitly wires it.
|
||||
* Value dispatch:
|
||||
* connect:<id> — wire to an existing agent group, replay the message
|
||||
* choose_existing — send a follow-up card listing all agents
|
||||
* new_agent — prompt for a free-text agent name (interceptor
|
||||
* captures the reply and creates immediately)
|
||||
* reject — set denied_at, delete pending row
|
||||
*/
|
||||
async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<boolean> {
|
||||
const row = getPendingChannelApproval(payload.questionId);
|
||||
if (!row) return false;
|
||||
|
||||
// Click-auth: same pattern as sender-approval (see commit 68058cb).
|
||||
// Raw platform userId → namespace with channelType → must match the
|
||||
// designated approver OR have admin privilege over the target agent.
|
||||
const clickerId = payload.userId ? `${payload.channelType}:${payload.userId}` : null;
|
||||
const isAuthorized =
|
||||
clickerId !== null && (clickerId === row.approver_user_id || hasAdminPrivilege(clickerId, row.agent_group_id));
|
||||
@@ -296,25 +317,129 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
|
||||
clickerId,
|
||||
expectedApprover: row.approver_user_id,
|
||||
});
|
||||
return true; // claim but take no action
|
||||
return true;
|
||||
}
|
||||
const approverId = clickerId;
|
||||
const approved = payload.value === 'approve';
|
||||
|
||||
if (!approved) {
|
||||
// ── Reject / Cancel ──
|
||||
if (payload.value === REJECT_VALUE) {
|
||||
setMessagingGroupDeniedAt(row.messaging_group_id, new Date().toISOString());
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
log.info('Channel registration denied', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
agentGroupId: row.agent_group_id,
|
||||
approverId,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
// Rehydrate the original event to know (a) whether it was a DM or group
|
||||
// (chooses engage_mode default), and (b) who the triggering sender was
|
||||
// (auto-member-add so sender_scope='known' doesn't bounce the replay).
|
||||
// ── Choose existing agent — send agent-selection follow-up card ──
|
||||
if (payload.value === CHOOSE_EXISTING_VALUE) {
|
||||
const approverDm = await ensureUserDm(row.approver_user_id);
|
||||
if (!approverDm) {
|
||||
log.error('Channel registration: no DM channel for approver', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
approverUserId: row.approver_user_id,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
const adapter = getDeliveryAdapter();
|
||||
if (!adapter) return true;
|
||||
|
||||
const agentGroups = getAllAgentGroups();
|
||||
const options = buildAgentSelectionOptions(agentGroups);
|
||||
const title = '📋 Choose an agent';
|
||||
updatePendingChannelApprovalCard(row.messaging_group_id, title, JSON.stringify(options));
|
||||
|
||||
try {
|
||||
await adapter.deliver(
|
||||
approverDm.channel_type,
|
||||
approverDm.platform_id,
|
||||
null,
|
||||
'chat-sdk',
|
||||
JSON.stringify({
|
||||
type: 'ask_question',
|
||||
questionId: row.messaging_group_id,
|
||||
title,
|
||||
question: 'Which agent should handle this channel?',
|
||||
options,
|
||||
}),
|
||||
);
|
||||
} catch (err) {
|
||||
log.error('Channel registration: agent-selection card delivery failed', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
err,
|
||||
});
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// ── Create new agent — prompt for free-text name ──
|
||||
if (payload.value === NEW_AGENT_VALUE) {
|
||||
const approverDm = await ensureUserDm(row.approver_user_id);
|
||||
if (!approverDm) {
|
||||
log.error('Channel registration: no DM channel for approver', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
approverUserId: row.approver_user_id,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
const adapter = getDeliveryAdapter();
|
||||
if (!adapter) {
|
||||
log.error('Channel registration: no delivery adapter for name prompt', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
awaitingNameInput.set(row.approver_user_id, {
|
||||
channelMgId: row.messaging_group_id,
|
||||
dmChannelType: approverDm.channel_type,
|
||||
dmPlatformId: approverDm.platform_id,
|
||||
});
|
||||
|
||||
try {
|
||||
await adapter.deliver(
|
||||
approverDm.channel_type,
|
||||
approverDm.platform_id,
|
||||
null,
|
||||
'chat-sdk',
|
||||
JSON.stringify({ text: 'Reply with the name for your new agent:' }),
|
||||
);
|
||||
} catch (err) {
|
||||
log.error('Channel registration: name prompt delivery failed', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
err,
|
||||
});
|
||||
awaitingNameInput.delete(row.approver_user_id);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// ── Resolve target agent group (connect to existing or create new) ──
|
||||
let targetAgentGroupId: string;
|
||||
|
||||
if (payload.value.startsWith(CONNECT_PREFIX)) {
|
||||
targetAgentGroupId = payload.value.slice(CONNECT_PREFIX.length);
|
||||
const ag = getAgentGroup(targetAgentGroupId);
|
||||
if (!ag) {
|
||||
log.error('Channel registration: target agent group no longer exists', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
targetAgentGroupId,
|
||||
});
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
log.warn('Channel registration: unknown response value', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
value: payload.value,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
// ── Wire + replay (shared path for connect and create) ──
|
||||
let event: InboundEvent;
|
||||
try {
|
||||
event = JSON.parse(row.original_message) as InboundEvent;
|
||||
@@ -327,15 +452,6 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
|
||||
return true;
|
||||
}
|
||||
|
||||
// Decide engage_mode from the original event. DMs (`isMention=true` &
|
||||
// not in a group) get `pattern='.'` (always respond). Group mentions
|
||||
// get `mention-sticky` (respond now + follow the thread).
|
||||
//
|
||||
// We can't read `mg.is_group` reliably here because we only auto-create
|
||||
// the mg with `is_group=0` on first sight — the adapter hasn't told us
|
||||
// yet whether it's actually a group. Fall back to the InboundEvent's
|
||||
// `threadId`: a non-null threadId implies a threaded platform (Slack
|
||||
// channel thread, Discord thread), which we treat as a group.
|
||||
const isGroup = event.threadId !== null;
|
||||
const engageMode: MessagingGroupAgent['engage_mode'] = isGroup ? 'mention-sticky' : 'pattern';
|
||||
const engagePattern = isGroup ? null : '.';
|
||||
@@ -344,7 +460,7 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
|
||||
createMessagingGroupAgent({
|
||||
id: mgaId,
|
||||
messaging_group_id: row.messaging_group_id,
|
||||
agent_group_id: row.agent_group_id,
|
||||
agent_group_id: targetAgentGroupId,
|
||||
engage_mode: engageMode,
|
||||
engage_pattern: engagePattern,
|
||||
sender_scope: 'known',
|
||||
@@ -355,28 +471,22 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
|
||||
});
|
||||
log.info('Channel registration approved — wiring created', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
agentGroupId: row.agent_group_id,
|
||||
agentGroupId: targetAgentGroupId,
|
||||
mgaId,
|
||||
engageMode,
|
||||
approverId,
|
||||
});
|
||||
|
||||
// Auto-admit the triggering sender. Without this, the replay below
|
||||
// would bounce through sender-approval (sender_scope='known' +
|
||||
// sender-is-not-a-member).
|
||||
const senderUserId = extractAndUpsertUser(event);
|
||||
if (senderUserId) {
|
||||
addMember({
|
||||
user_id: senderUserId,
|
||||
agent_group_id: row.agent_group_id,
|
||||
agent_group_id: targetAgentGroupId,
|
||||
added_by: approverId,
|
||||
added_at: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
// Clear the pending row BEFORE replay so the gate check on the second
|
||||
// attempt sees a wired channel (agentCount > 0) and takes the fan-out
|
||||
// path normally.
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
|
||||
try {
|
||||
@@ -391,3 +501,117 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
|
||||
}
|
||||
|
||||
registerResponseHandler(handleChannelApprovalResponse);
|
||||
|
||||
// ── Free-text name interceptor ──
|
||||
// Captures the next DM from an approver who clicked "Create new agent",
|
||||
// creates the agent immediately, wires the channel, and replays.
|
||||
|
||||
setMessageInterceptor(async (event: InboundEvent): Promise<boolean> => {
|
||||
const userId = extractAndUpsertUser(event);
|
||||
if (!userId) return false;
|
||||
|
||||
const pending = awaitingNameInput.get(userId);
|
||||
if (!pending) return false;
|
||||
if (event.channelType !== pending.dmChannelType || event.platformId !== pending.dmPlatformId) return false;
|
||||
|
||||
awaitingNameInput.delete(userId);
|
||||
|
||||
let text: string | undefined;
|
||||
try {
|
||||
const parsed = JSON.parse(event.message.content) as Record<string, unknown>;
|
||||
text = (typeof parsed.text === 'string' ? parsed.text : undefined)?.trim();
|
||||
} catch {
|
||||
/* fall through */
|
||||
}
|
||||
|
||||
if (!text) {
|
||||
log.warn('Channel registration: empty name reply, ignoring', { userId });
|
||||
return true;
|
||||
}
|
||||
|
||||
const row = getPendingChannelApproval(pending.channelMgId);
|
||||
if (!row) return true;
|
||||
|
||||
const ag = createNewAgentGroup(text);
|
||||
log.info('Channel registration: new agent group created', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
agentGroupId: ag.id,
|
||||
agentName: ag.name,
|
||||
folder: ag.folder,
|
||||
});
|
||||
|
||||
let originalEvent: InboundEvent;
|
||||
try {
|
||||
originalEvent = JSON.parse(row.original_message) as InboundEvent;
|
||||
} catch (err) {
|
||||
log.error('Channel registration: failed to parse stored event', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
err,
|
||||
});
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
return true;
|
||||
}
|
||||
|
||||
const isGroup = originalEvent.threadId !== null;
|
||||
const engageMode: MessagingGroupAgent['engage_mode'] = isGroup ? 'mention-sticky' : 'pattern';
|
||||
const engagePattern = isGroup ? null : '.';
|
||||
|
||||
const mgaId = `mga-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
createMessagingGroupAgent({
|
||||
id: mgaId,
|
||||
messaging_group_id: row.messaging_group_id,
|
||||
agent_group_id: ag.id,
|
||||
engage_mode: engageMode,
|
||||
engage_pattern: engagePattern,
|
||||
sender_scope: 'known',
|
||||
ignored_message_policy: 'accumulate',
|
||||
session_mode: 'shared',
|
||||
priority: 0,
|
||||
created_at: new Date().toISOString(),
|
||||
});
|
||||
log.info('Channel registration approved — wiring created', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
agentGroupId: ag.id,
|
||||
mgaId,
|
||||
engageMode,
|
||||
approverId: userId,
|
||||
});
|
||||
|
||||
const senderUserId = extractAndUpsertUser(originalEvent);
|
||||
if (senderUserId) {
|
||||
addMember({
|
||||
user_id: senderUserId,
|
||||
agent_group_id: ag.id,
|
||||
added_by: userId,
|
||||
added_at: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
|
||||
try {
|
||||
await routeInbound(originalEvent);
|
||||
} catch (err) {
|
||||
log.error('Failed to replay message after channel approval', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
err,
|
||||
});
|
||||
}
|
||||
|
||||
const adapter = getDeliveryAdapter();
|
||||
if (adapter) {
|
||||
const dm = await ensureUserDm(row.approver_user_id);
|
||||
if (dm) {
|
||||
adapter
|
||||
.deliver(
|
||||
dm.channel_type,
|
||||
dm.platform_id,
|
||||
null,
|
||||
'chat-sdk',
|
||||
JSON.stringify({ text: `✅ Agent "${ag.name}" created and connected.` }),
|
||||
)
|
||||
.catch(() => {});
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
@@ -9,15 +9,17 @@
|
||||
* will later emit as event.platformId, or router lookups miss and messages
|
||||
* get silently dropped.
|
||||
*
|
||||
* Native adapters (Signal, WhatsApp, iMessage) use their own ID formats and
|
||||
* send them as-is — no channel prefix. WhatsApp/iMessage emit JIDs/emails
|
||||
* containing '@'. Signal emits raw phone numbers ('+15551234567') for DMs
|
||||
* and 'group:<id>' for group chats. Prefixing any of these would cause a
|
||||
* mismatch with what the adapter later emits.
|
||||
* Native adapters (Signal, WhatsApp, iMessage, DeltaChat) use their own ID
|
||||
* formats and send them as-is — no channel prefix. WhatsApp/iMessage emit
|
||||
* JIDs/emails containing '@'. Signal emits raw phone numbers ('+15551234567')
|
||||
* for DMs and 'group:<id>' for group chats. DeltaChat emits numeric chat IDs
|
||||
* ('12'). Prefixing any of these would cause a mismatch with what the adapter
|
||||
* later emits.
|
||||
*/
|
||||
export function namespacedPlatformId(channel: string, raw: string): string {
|
||||
if (raw.startsWith(`${channel}:`)) return raw;
|
||||
if (raw.includes('@')) return raw;
|
||||
if (raw.startsWith('+') || raw.startsWith('group:')) return raw;
|
||||
if (channel === 'deltachat') return raw;
|
||||
return `${channel}:${raw}`;
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ import {
|
||||
getMessagingGroupWithAgentCount,
|
||||
} from './db/messaging-groups.js';
|
||||
import { findSessionForAgent } from './db/sessions.js';
|
||||
import { startTypingRefresh } from './modules/typing/index.js';
|
||||
import { startTypingRefresh, stopTypingRefresh } from './modules/typing/index.js';
|
||||
import { log } from './log.js';
|
||||
import { resolveSession, writeSessionMessage, writeOutboundDirect } from './session-manager.js';
|
||||
import { wakeContainer } from './container-runner.js';
|
||||
@@ -108,6 +108,20 @@ export function setSenderScopeGate(fn: SenderScopeGateFn): void {
|
||||
senderScopeGate = fn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Message-interceptor hook. Runs at the very top of routeInbound, before
|
||||
* messaging-group resolution. When the interceptor returns true the message
|
||||
* is consumed and routing stops. Used by the permissions module to capture
|
||||
* free-text replies during multi-step approval flows (e.g. agent naming).
|
||||
*/
|
||||
export type MessageInterceptorFn = (event: InboundEvent) => Promise<boolean>;
|
||||
|
||||
let messageInterceptor: MessageInterceptorFn | null = null;
|
||||
|
||||
export function setMessageInterceptor(fn: MessageInterceptorFn): void {
|
||||
messageInterceptor = fn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Channel-registration hook. Runs when the router sees a mention/DM on a
|
||||
* messaging group that has no wirings AND hasn't been denied. The hook is
|
||||
@@ -142,6 +156,10 @@ function safeParseContent(raw: string): { text?: string; sender?: string; sender
|
||||
* Creates messaging group + session if they don't exist yet.
|
||||
*/
|
||||
export async function routeInbound(event: InboundEvent): Promise<void> {
|
||||
// Pre-route interceptor — lets modules consume messages before any routing
|
||||
// (e.g. free-text replies during multi-step approval flows).
|
||||
if (messageInterceptor && (await messageInterceptor(event))) return;
|
||||
|
||||
// 0. Apply the adapter's thread policy. Non-threaded adapters (Telegram,
|
||||
// WhatsApp, iMessage, email) collapse threads to the channel.
|
||||
const adapter = getChannelAdapter(event.channelType);
|
||||
@@ -457,7 +475,11 @@ async function deliverToAgent(
|
||||
startTypingRefresh(session.id, session.agent_group_id, event.channelType, event.platformId, event.threadId);
|
||||
const freshSession = getSession(session.id);
|
||||
if (freshSession) {
|
||||
await wakeContainer(freshSession);
|
||||
const woke = await wakeContainer(freshSession);
|
||||
// wakeContainer never throws — it returns false on transient spawn
|
||||
// failure (host-sweep retries). Stop the typing indicator we just
|
||||
// started so it doesn't leak; the inbound row stays pending.
|
||||
if (!woke) stopTypingRefresh(freshSession.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,13 +14,13 @@ import type Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { deriveAttachmentName } from './attachment-naming.js';
|
||||
import { isSafeAttachmentName } from './attachment-safety.js';
|
||||
import type { OutboundFile } from './channels/adapter.js';
|
||||
import { DATA_DIR } from './config.js';
|
||||
import { getMessagingGroup } from './db/messaging-groups.js';
|
||||
import {
|
||||
createSession,
|
||||
findSession,
|
||||
findSessionByAgentGroup,
|
||||
findSessionForAgent,
|
||||
getSession,
|
||||
@@ -30,6 +30,7 @@ import {
|
||||
ensureSchema,
|
||||
openInboundDb as openInboundDbRaw,
|
||||
openOutboundDb as openOutboundDbRaw,
|
||||
openOutboundDbRw as openOutboundDbRwRaw,
|
||||
upsertSessionRouting,
|
||||
insertMessage,
|
||||
migrateMessagesInTable,
|
||||
@@ -37,6 +38,11 @@ import {
|
||||
import { log } from './log.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
function isPathInside(parent: string, child: string): boolean {
|
||||
const relative = path.relative(parent, child);
|
||||
return relative === '' || (!relative.startsWith('..') && !path.isAbsolute(relative));
|
||||
}
|
||||
|
||||
/** Root directory for all session data. */
|
||||
export function sessionsBaseDir(): string {
|
||||
return path.join(DATA_DIR, 'v2-sessions');
|
||||
@@ -233,6 +239,20 @@ export function writeSessionMessage(
|
||||
/**
|
||||
* If message content has attachments with base64 `data`, save them to
|
||||
* the session's inbox directory and replace with `localPath`.
|
||||
*
|
||||
* Both `messageId` and `att.name` originate in untrusted input. WhatsApp
|
||||
* passes `msg.key.id` through raw (and that field is client generated, so a
|
||||
* peer can craft it), and other adapters may follow. The session dir is
|
||||
* mounted writable into the container, so a compromised agent can also
|
||||
* pre-place a symlink at `inbox/<future msgId>/` and wait for a chat message
|
||||
* with a matching id to redirect the host's write.
|
||||
*
|
||||
* Defenses, mirrored from the outbound side:
|
||||
* 1. basename check on `messageId` and `filename`.
|
||||
* 2. lstat of the inbox dir to refuse pre-placed symlinks.
|
||||
* 3. realpath-based containment under the session inbox root.
|
||||
* 4. `wx` flag on writeFileSync to refuse following a pre-existing symlink
|
||||
* at the target file path or overwriting any existing file.
|
||||
*/
|
||||
function extractAttachmentFiles(
|
||||
agentGroupId: string,
|
||||
@@ -250,34 +270,75 @@ function extractAttachmentFiles(
|
||||
const attachments = parsed.attachments as Array<Record<string, unknown>> | undefined;
|
||||
if (!Array.isArray(attachments)) return contentStr;
|
||||
|
||||
if (!isSafeAttachmentName(messageId)) {
|
||||
log.warn('Rejecting unsafe inbound message id', { messageId });
|
||||
return contentStr;
|
||||
}
|
||||
|
||||
let changed = false;
|
||||
for (const att of attachments) {
|
||||
if (typeof att.data === 'string') {
|
||||
// The name field is attacker-controlled: chat platforms with E2E
|
||||
// attachment encryption (WhatsApp, Matrix) cannot sanitize filename
|
||||
// server-side, and other adapters pass att.name through raw. Without
|
||||
// this guard, `path.join(inboxDir, '../../...')` writes anywhere the
|
||||
// host process has fs permission — see Signal Desktop's Nov 2025
|
||||
// attachment-fileName advisory for the same archetype.
|
||||
const rawName = (att.name as string | undefined) ?? `attachment-${Date.now()}`;
|
||||
const filename = isSafeAttachmentName(rawName) ? rawName : `attachment-${Date.now()}`;
|
||||
if (filename !== rawName) {
|
||||
log.warn('Refused unsafe attachment filename — would escape inbox', {
|
||||
messageId,
|
||||
rawName,
|
||||
replacement: filename,
|
||||
});
|
||||
}
|
||||
const inboxDir = path.join(sessionDir(agentGroupId, sessionId), 'inbox', messageId);
|
||||
fs.mkdirSync(inboxDir, { recursive: true });
|
||||
const filePath = path.join(inboxDir, filename);
|
||||
fs.writeFileSync(filePath, Buffer.from(att.data as string, 'base64'));
|
||||
att.name = filename;
|
||||
att.localPath = `inbox/${messageId}/${filename}`;
|
||||
delete att.data;
|
||||
changed = true;
|
||||
log.debug('Saved attachment to inbox', { messageId, filename, size: att.size });
|
||||
if (typeof att.data !== 'string') continue;
|
||||
|
||||
const rawName = deriveAttachmentName(att);
|
||||
const filename = isSafeAttachmentName(rawName) ? rawName : `attachment-${Date.now()}`;
|
||||
if (filename !== rawName) {
|
||||
log.warn('Refused unsafe attachment filename, would escape inbox', {
|
||||
messageId,
|
||||
rawName,
|
||||
replacement: filename,
|
||||
});
|
||||
}
|
||||
|
||||
const inboxDir = path.join(sessionDir(agentGroupId, sessionId), 'inbox', messageId);
|
||||
|
||||
// Refuse to mkdir through a symlink that the container may have pre placed
|
||||
// at inboxDir. With recursive:true, mkdirSync would silently no op on a
|
||||
// pre existing symlink and the subsequent writeFileSync would follow it.
|
||||
if (fs.existsSync(inboxDir)) {
|
||||
const stat = fs.lstatSync(inboxDir);
|
||||
if (stat.isSymbolicLink() || !stat.isDirectory()) {
|
||||
log.warn('Rejecting unsafe inbox directory', { messageId, inboxDir });
|
||||
continue;
|
||||
}
|
||||
}
|
||||
fs.mkdirSync(inboxDir, { recursive: true });
|
||||
|
||||
let realInboxDir: string;
|
||||
try {
|
||||
realInboxDir = fs.realpathSync(inboxDir);
|
||||
} catch (err) {
|
||||
log.warn('Failed to resolve inbox directory', { messageId, err });
|
||||
continue;
|
||||
}
|
||||
const inboxRoot = path.join(sessionDir(agentGroupId, sessionId), 'inbox');
|
||||
if (!isPathInside(fs.realpathSync(inboxRoot), realInboxDir)) {
|
||||
log.warn('Inbox directory escaped session inbox root', { messageId, inboxDir });
|
||||
continue;
|
||||
}
|
||||
|
||||
const filePath = path.join(inboxDir, filename);
|
||||
try {
|
||||
// wx = exclusive create. Refuses to follow a pre existing symlink or
|
||||
// overwrite any existing file. The host expects to be the sole writer
|
||||
// of these attachments.
|
||||
fs.writeFileSync(filePath, Buffer.from(att.data as string, 'base64'), { flag: 'wx' });
|
||||
} catch (err: unknown) {
|
||||
const e = err as NodeJS.ErrnoException;
|
||||
if (e.code === 'EEXIST') {
|
||||
log.warn('Inbox attachment target already exists, refusing to overwrite', {
|
||||
messageId,
|
||||
filename,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
||||
att.name = filename;
|
||||
att.localPath = `inbox/${messageId}/${filename}`;
|
||||
delete att.data;
|
||||
changed = true;
|
||||
log.debug('Saved attachment to inbox', { messageId, filename, size: att.size });
|
||||
}
|
||||
|
||||
return changed ? JSON.stringify(parsed) : contentStr;
|
||||
@@ -295,6 +356,11 @@ export function openOutboundDb(agentGroupId: string, sessionId: string): Databas
|
||||
return openOutboundDbRaw(outboundDbPath(agentGroupId, sessionId));
|
||||
}
|
||||
|
||||
/** Open the outbound DB for a session with write access. Only safe to call when no container is running. */
|
||||
export function openOutboundDbRw(agentGroupId: string, sessionId: string): Database.Database {
|
||||
return openOutboundDbRwRaw(outboundDbPath(agentGroupId, sessionId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a message directly to a session's outbound DB so the host delivery
|
||||
* loop picks it up. Used by the command gate to send denial responses
|
||||
@@ -368,14 +434,48 @@ export function readOutboxFiles(
|
||||
messageId: string,
|
||||
filenames: string[],
|
||||
): OutboundFile[] | undefined {
|
||||
if (!isSafeAttachmentName(messageId)) {
|
||||
log.warn('Rejecting unsafe outbox message id', { messageId });
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId);
|
||||
if (!fs.existsSync(outboxDir)) return undefined;
|
||||
|
||||
let realOutboxDir: string;
|
||||
try {
|
||||
const stat = fs.lstatSync(outboxDir);
|
||||
if (!stat.isDirectory() || stat.isSymbolicLink()) {
|
||||
log.warn('Rejecting unsafe outbox directory', { messageId, outboxDir });
|
||||
return undefined;
|
||||
}
|
||||
realOutboxDir = fs.realpathSync(outboxDir);
|
||||
} catch (err) {
|
||||
log.warn('Failed to inspect outbox directory', { messageId, err });
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const files: OutboundFile[] = [];
|
||||
for (const filename of filenames) {
|
||||
if (!isSafeAttachmentName(filename)) {
|
||||
log.warn('Refused unsafe outbox filename, would escape outbox', { messageId, filename });
|
||||
continue;
|
||||
}
|
||||
|
||||
const filePath = path.join(outboxDir, filename);
|
||||
if (fs.existsSync(filePath)) {
|
||||
files.push({ filename, data: fs.readFileSync(filePath) });
|
||||
} else {
|
||||
try {
|
||||
const stat = fs.lstatSync(filePath);
|
||||
if (!stat.isFile() || stat.isSymbolicLink()) {
|
||||
log.warn('Rejecting unsafe outbox file', { messageId, filename });
|
||||
continue;
|
||||
}
|
||||
const realFilePath = fs.realpathSync(filePath);
|
||||
if (!isPathInside(realOutboxDir, realFilePath)) {
|
||||
log.warn('Rejecting outbox file outside message directory', { messageId, filename });
|
||||
continue;
|
||||
}
|
||||
files.push({ filename, data: fs.readFileSync(realFilePath) });
|
||||
} catch {
|
||||
log.warn('Outbox file not found', { messageId, filename });
|
||||
}
|
||||
}
|
||||
@@ -389,10 +489,26 @@ export function readOutboxFiles(
|
||||
* thrown error would trigger the delivery retry path and deliver twice.
|
||||
*/
|
||||
export function clearOutbox(agentGroupId: string, sessionId: string, messageId: string): void {
|
||||
if (!isSafeAttachmentName(messageId)) {
|
||||
log.warn('Rejecting unsafe outbox cleanup message id', { messageId });
|
||||
return;
|
||||
}
|
||||
|
||||
const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId);
|
||||
if (!fs.existsSync(outboxDir)) return;
|
||||
try {
|
||||
fs.rmSync(outboxDir, { recursive: true, force: true });
|
||||
const stat = fs.lstatSync(outboxDir);
|
||||
if (!stat.isDirectory() || stat.isSymbolicLink()) {
|
||||
log.warn('Rejecting unsafe outbox cleanup directory', { messageId, outboxDir });
|
||||
return;
|
||||
}
|
||||
const realOutboxBase = fs.realpathSync(path.join(sessionDir(agentGroupId, sessionId), 'outbox'));
|
||||
const realOutboxDir = fs.realpathSync(outboxDir);
|
||||
if (!isPathInside(realOutboxBase, realOutboxDir)) {
|
||||
log.warn('Rejecting outbox cleanup outside session outbox', { messageId, outboxDir });
|
||||
return;
|
||||
}
|
||||
fs.rmSync(realOutboxDir, { recursive: true, force: true });
|
||||
} catch (err) {
|
||||
log.warn('Outbox cleanup failed (message already delivered)', { messageId, err });
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user