Merge branch 'main' into fix/host-sweep-orphan-processing-ack

This commit is contained in:
gavrielc
2026-05-01 18:42:04 +03:00
committed by GitHub
37 changed files with 1284 additions and 391 deletions

View File

@@ -135,6 +135,7 @@ export interface ChannelAdapter {
// Optional
setTyping?(platformId: string, threadId: string | null): Promise<void>;
syncConversations?(): Promise<ConversationInfo[]>;
resolveChannelName?(platformId: string): Promise<string | null>;
/**
* Subscribe the bot to a thread so follow-up messages route via the

View File

@@ -23,6 +23,8 @@ import {
sessionDir,
inboundDbPath,
outboundDbPath,
readOutboxFiles,
clearOutbox,
} from './session-manager.js';
import { getSession, findSession } from './db/sessions.js';
import type { InboundEvent } from './channels/adapter.js';
@@ -108,6 +110,147 @@ describe('session manager', () => {
outDb.close();
});
it('should reject outbound attachment filenames that escape the message outbox', () => {
initSessionFolder('ag-1', 'sess-test');
const dir = sessionDir('ag-1', 'sess-test');
const msgOutbox = path.join(dir, 'outbox', 'msg-1');
fs.mkdirSync(msgOutbox, { recursive: true });
const outside = path.join(TEST_DIR, 'outside.txt');
fs.writeFileSync(outside, 'outside secret');
expect(readOutboxFiles('ag-1', 'sess-test', 'msg-1', ['../../../../../outside.txt'])).toBeUndefined();
});
it('should reject outbound attachment symlinks that escape the message outbox', () => {
initSessionFolder('ag-1', 'sess-test');
const dir = sessionDir('ag-1', 'sess-test');
const msgOutbox = path.join(dir, 'outbox', 'msg-1');
fs.mkdirSync(msgOutbox, { recursive: true });
const outside = path.join(TEST_DIR, 'outside.txt');
fs.writeFileSync(outside, 'outside secret');
fs.symlinkSync('../../../../../outside.txt', path.join(msgOutbox, 'safe-name.txt'));
expect(readOutboxFiles('ag-1', 'sess-test', 'msg-1', ['safe-name.txt'])).toBeUndefined();
});
it('should not recursively delete outside the outbox for unsafe message ids', () => {
initSessionFolder('ag-1', 'sess-test');
const victimDir = path.join(TEST_DIR, 'victim-dir');
fs.mkdirSync(victimDir, { recursive: true });
fs.writeFileSync(path.join(victimDir, 'keep.txt'), 'do not delete');
clearOutbox('ag-1', 'sess-test', '../../../../victim-dir');
expect(fs.existsSync(path.join(victimDir, 'keep.txt'))).toBe(true);
});
it('should still read and clear normal basename outbox files', () => {
initSessionFolder('ag-1', 'sess-test');
const dir = sessionDir('ag-1', 'sess-test');
const msgOutbox = path.join(dir, 'outbox', 'msg-1');
fs.mkdirSync(msgOutbox, { recursive: true });
fs.writeFileSync(path.join(msgOutbox, 'result.txt'), 'ok');
const files = readOutboxFiles('ag-1', 'sess-test', 'msg-1', ['result.txt']);
expect(files).toHaveLength(1);
expect(files?.[0]?.filename).toBe('result.txt');
expect(files?.[0]?.data.toString()).toBe('ok');
clearOutbox('ag-1', 'sess-test', 'msg-1');
expect(fs.existsSync(msgOutbox)).toBe(false);
});
it('should reject inbound attachment writes through a pre-placed symlinked inbox dir', () => {
initSessionFolder('ag-1', 'sess-test');
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
// The container has /workspace write access, so it can pre create
// inbox/<msgId> as a symlink to escape.
const inboxRoot = path.join(sessionDir('ag-1', session.id), 'inbox');
fs.mkdirSync(inboxRoot, { recursive: true });
const evilTarget = path.join(TEST_DIR, 'evil-target');
fs.mkdirSync(evilTarget, { recursive: true });
fs.symlinkSync(evilTarget, path.join(inboxRoot, 'msg-evil'));
writeSessionMessage('ag-1', session.id, {
id: 'msg-evil',
kind: 'chat',
timestamp: now(),
content: JSON.stringify({
text: 'evil',
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
}),
});
expect(fs.existsSync(path.join(evilTarget, 'photo.png'))).toBe(false);
});
it('should refuse to follow a pre-existing symlink at the inbound attachment path', () => {
initSessionFolder('ag-1', 'sess-test');
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
// The container pre creates inbox/<msgId>/photo.png as a symlink to a
// host file. Without the wx flag, writeFileSync would follow it.
const inboxDir = path.join(sessionDir('ag-1', session.id), 'inbox', 'msg-sym');
fs.mkdirSync(inboxDir, { recursive: true });
const outside = path.join(TEST_DIR, 'outside.txt');
fs.writeFileSync(outside, 'ORIGINAL');
fs.symlinkSync(outside, path.join(inboxDir, 'photo.png'));
writeSessionMessage('ag-1', session.id, {
id: 'msg-sym',
kind: 'chat',
timestamp: now(),
content: JSON.stringify({
text: 'sym',
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
}),
});
expect(fs.readFileSync(outside, 'utf-8')).toBe('ORIGINAL');
});
it('should reject inbound attachments when messageId is unsafe', () => {
initSessionFolder('ag-1', 'sess-test');
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
writeSessionMessage('ag-1', session.id, {
id: '../../escape',
kind: 'chat',
timestamp: now(),
content: JSON.stringify({
text: 'msgid',
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
}),
});
const inboxRoot = path.join(sessionDir('ag-1', session.id), 'inbox');
if (fs.existsSync(inboxRoot)) {
expect(fs.readdirSync(inboxRoot)).toEqual([]);
}
});
it('should still save inbound attachments with safe basenames', () => {
initSessionFolder('ag-1', 'sess-test');
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
writeSessionMessage('ag-1', session.id, {
id: 'msg-ok',
kind: 'chat',
timestamp: now(),
content: JSON.stringify({
text: 'ok',
attachments: [{ name: 'photo.png', data: Buffer.from('PNGBYTES').toString('base64'), size: 8 }],
}),
});
const expected = path.join(sessionDir('ag-1', session.id), 'inbox', 'msg-ok', 'photo.png');
expect(fs.existsSync(expected)).toBe(true);
expect(fs.readFileSync(expected, 'utf-8')).toBe('PNGBYTES');
});
it('should resolve to existing session (shared mode)', () => {
const { session: s1, created: c1 } = resolveSession('ag-1', 'mg-1', null, 'shared');
expect(c1).toBe(true);

View File

@@ -153,8 +153,10 @@ describe('unknown-channel registration flow', () => {
expect(kind).toBe('chat-sdk');
const payload = JSON.parse(content as string);
expect(payload.type).toBe('ask_question');
// Card names the target agent so the owner knows what they're wiring to.
expect(payload.question).toContain('Andy');
// Single-agent card offers a direct "Connect to <name>" button.
const connectOption = payload.options.find((o: { value: string }) => o.value.startsWith('connect:'));
expect(connectOption).toBeDefined();
expect(connectOption.label).toContain('Andy');
const { getDb } = await import('../../db/connection.js');
const rows = getDb().prepare('SELECT * FROM pending_channel_approvals').all() as Array<{
@@ -202,11 +204,11 @@ describe('unknown-channel registration flow', () => {
};
expect(pending).toBeDefined();
// Owner clicks approve.
// Owner clicks "Connect to Andy" (single-agent card).
for (const handler of getResponseHandlers()) {
const claimed = await handler({
questionId: pending.messaging_group_id,
value: 'approve',
value: 'connect:ag-1',
userId: 'owner', // raw platform id — handler namespaces it
channelType: 'telegram',
platformId: 'dm-owner',
@@ -215,7 +217,7 @@ describe('unknown-channel registration flow', () => {
if (claimed) break;
}
// Wiring created with MVP defaults.
// Wiring created with defaults.
const mga = getDb()
.prepare('SELECT * FROM messaging_group_agents WHERE messaging_group_id = ?')
.get(pending.messaging_group_id) as {
@@ -261,7 +263,7 @@ describe('unknown-channel registration flow', () => {
for (const handler of getResponseHandlers()) {
const claimed = await handler({
questionId: pending.messaging_group_id,
value: 'approve',
value: 'connect:ag-1',
userId: 'owner',
channelType: 'telegram',
platformId: 'dm-owner',

View File

@@ -5,24 +5,32 @@
* addressed to the bot (SDK-confirmed mention or DM), it calls
* `requestChannelApproval` instead of silently dropping. The flow:
*
* 1. Pick the target agent group we'd wire to (MVP: first by name).
* Multi-agent picker is a follow-up — see ACTION-ITEMS.
* 1. Gather all existing agent groups.
* 2. Pick an eligible approver (owner / admin) and a reachable DM for
* them, reusing the same primitives the sender-approval flow uses.
* 3. Deliver an Approve / Ignore card that names the target agent
* explicitly so the owner knows what they're wiring to.
* 3. Deliver a card with three action families:
* a. Connect to [agent] — one button per existing agent group.
* Single-agent installs get a one-click connect.
* b. Connect new agent — prompts for a free-text name, creates
* the agent immediately on reply.
* c. Reject — deny the channel.
* 4. Record a `pending_channel_approvals` row holding the original event
* so it can be re-routed on approve.
* so it can be re-routed on connect/create.
*
* On approve (handler in index.ts):
* - Create `messaging_group_agents` with MVP defaults
* On connect (handler in index.ts):
* - Create `messaging_group_agents` with defaults
* (mention-sticky for groups / pattern='.' for DMs,
* sender_scope='known', ignored_message_policy='accumulate')
* - Add the triggering sender to `agent_group_members` so sender_scope
* doesn't bounce the replayed message into a sender-approval cascade
* - Delete the pending row, replay the original event
*
* On ignore:
* On connect new agent (handler in index.ts):
* - Prompt for a free-text agent name via DM
* - On reply: create the agent group + filesystem, then wire
* and replay as above
*
* On reject:
* - Set `messaging_groups.denied_at = now()` so the router stops
* escalating on this channel until an admin explicitly re-wires
* - Delete the pending row
@@ -36,19 +44,81 @@
* - Approver has no reachable DM.
* - Delivery adapter missing.
*/
import { normalizeOptions, type RawOption } from '../../channels/ask-question.js';
import { getAllAgentGroups } from '../../db/agent-groups.js';
import { getMessagingGroup } from '../../db/messaging-groups.js';
import { normalizeOptions, type NormalizedOption, type RawOption } from '../../channels/ask-question.js';
import { createAgentGroup, getAgentGroup, getAgentGroupByFolder, getAllAgentGroups } from '../../db/agent-groups.js';
import { getChannelAdapter } from '../../channels/channel-registry.js';
import { getMessagingGroup, updateMessagingGroup } from '../../db/messaging-groups.js';
import { getDeliveryAdapter } from '../../delivery.js';
import { initGroupFilesystem } from '../../group-init.js';
import { log } from '../../log.js';
import type { InboundEvent } from '../../channels/adapter.js';
import type { AgentGroup } from '../../types.js';
import { pickApprovalDelivery, pickApprover } from '../approvals/primitive.js';
import { createPendingChannelApproval, hasInFlightChannelApproval } from './db/pending-channel-approvals.js';
const APPROVAL_OPTIONS: RawOption[] = [
{ label: 'Approve', selectedLabel: '✅ Wired', value: 'approve' },
{ label: 'Ignore', selectedLabel: '🙅 Ignored', value: 'reject' },
];
// ── Value constants (response handler in index.ts parses these) ──
export const CONNECT_PREFIX = 'connect:';
export const NEW_AGENT_VALUE = 'new_agent';
export const CHOOSE_EXISTING_VALUE = 'choose_existing';
export const REJECT_VALUE = 'reject';
// ── Utilities ──
function toFolder(name: string): string {
return (
name
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '') || 'unnamed'
);
}
// ── Card builders ──
function buildApprovalOptions(agentGroups: AgentGroup[]): RawOption[] {
const options: RawOption[] = [];
if (agentGroups.length === 1) {
options.push({
label: `Connect to ${agentGroups[0].name}`,
selectedLabel: `✅ Connected to ${agentGroups[0].name}`,
value: `${CONNECT_PREFIX}${agentGroups[0].id}`,
});
} else {
options.push({
label: 'Choose existing agent',
selectedLabel: '📋 Choosing…',
value: CHOOSE_EXISTING_VALUE,
});
}
options.push({
label: 'Connect new agent',
selectedLabel: '🆕 Connecting new agent…',
value: NEW_AGENT_VALUE,
});
options.push({
label: 'Reject',
selectedLabel: '🙅 Rejected',
value: REJECT_VALUE,
});
return options;
}
function buildQuestionText(
isGroup: boolean,
senderName: string | undefined,
channelName: string | null,
channelType: string,
): string {
const who = senderName ?? 'Someone';
if (isGroup) {
const where = channelName ? `${channelName} on ${channelType}` : `a ${channelType} channel`;
return `${who} mentioned your bot in ${where}. How would you like to handle this channel?`;
}
return `${who} sent your bot a DM on ${channelType}. How would you like to handle it?`;
}
// ── Main flow ──
export interface RequestChannelApprovalInput {
messagingGroupId: string;
@@ -58,17 +128,11 @@ export interface RequestChannelApprovalInput {
export async function requestChannelApproval(input: RequestChannelApprovalInput): Promise<void> {
const { messagingGroupId, event } = input;
// In-flight dedup: don't spam the owner if the same unwired channel
// gets more mentions / DMs while a card is already pending.
if (hasInFlightChannelApproval(messagingGroupId)) {
log.debug('Channel registration already in flight — dropping retry', {
messagingGroupId,
});
log.debug('Channel registration already in flight — dropping retry', { messagingGroupId });
return;
}
// MVP: pick the first agent group by name. Multi-agent systems will get
// a richer card later (user picks the target from a list).
const agentGroups = getAllAgentGroups();
if (agentGroups.length === 0) {
log.warn('Channel registration skipped — no agent groups configured. Run /init-first-agent.', {
@@ -76,55 +140,65 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
});
return;
}
const target = agentGroups[0];
// Use first agent group for approver resolution — owners and global admins
// are returned regardless of which group we pass.
const referenceGroup = agentGroups[0];
// pickApprover takes the target agent group's id — gets scoped admins +
// global admins + owners. For fresh installs with only an owner, the
// owner is returned.
const approvers = pickApprover(target.id);
const approvers = pickApprover(referenceGroup.id);
if (approvers.length === 0) {
log.warn('Channel registration skipped — no owner or admin configured', {
messagingGroupId,
targetAgentGroupId: target.id,
targetAgentGroupId: referenceGroup.id,
});
return;
}
const originMg = getMessagingGroup(messagingGroupId);
const originChannelType = originMg?.channel_type ?? '';
// Resolve channel name if not yet persisted.
if (originMg && !originMg.name) {
const channelAdapter = getChannelAdapter(originChannelType);
if (channelAdapter?.resolveChannelName) {
try {
const name = await channelAdapter.resolveChannelName(originMg.platform_id);
if (name) {
updateMessagingGroup(originMg.id, { name });
originMg.name = name;
}
} catch {
/* non-critical */
}
}
}
const delivery = await pickApprovalDelivery(approvers, originChannelType);
if (!delivery) {
log.warn('Channel registration skipped — no DM channel for any approver', {
messagingGroupId,
targetAgentGroupId: target.id,
targetAgentGroupId: referenceGroup.id,
});
return;
}
const isGroup = event.message?.isGroup ?? originMg?.is_group === 1;
// Extract sender name from the event content for a human-readable card.
let senderName: string | undefined;
try {
const parsed = JSON.parse(event.message.content) as Record<string, unknown>;
senderName = (parsed.senderName ?? parsed.sender) as string | undefined;
} catch {
// non-critical — fall through to generic wording
// non-critical
}
const title = isGroup ? '📣 Bot mentioned in new chat' : '💬 New direct message';
const question = isGroup
? senderName
? `${senderName} mentioned your agent in a ${originChannelType} channel. Wire it to ${target.name} and let it engage?`
: `Your agent was mentioned in a ${originChannelType} channel. Wire it to ${target.name} and let it engage?`
: senderName
? `${senderName} DM'd your agent on ${originChannelType}. Wire it to ${target.name} and let it respond?`
: `Someone DM'd your agent on ${originChannelType}. Wire it to ${target.name} and let it respond?`;
const options = normalizeOptions(APPROVAL_OPTIONS);
const channelName = originMg?.name ?? null;
const title = isGroup ? '📣 Bot mentioned in new channel' : '💬 New direct message';
const question = buildQuestionText(isGroup, senderName, channelName, originChannelType);
const options = normalizeOptions(buildApprovalOptions(agentGroups));
createPendingChannelApproval({
messaging_group_id: messagingGroupId,
agent_group_id: target.id,
agent_group_id: referenceGroup.id,
original_message: JSON.stringify(event),
approver_user_id: delivery.userId,
created_at: new Date().toISOString(),
@@ -134,9 +208,7 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
const adapter = getDeliveryAdapter();
if (!adapter) {
log.error('Channel registration row created but no delivery adapter is wired', {
messagingGroupId,
});
log.error('Channel registration row created but no delivery adapter is wired', { messagingGroupId });
return;
}
@@ -148,9 +220,6 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
'chat-sdk',
JSON.stringify({
type: 'ask_question',
// Use messaging_group_id as the questionId — it's unique per card
// (PK on pending table dedups) and lets the response handler look
// up the pending row directly without another index.
questionId: messagingGroupId,
title,
question,
@@ -159,16 +228,56 @@ export async function requestChannelApproval(input: RequestChannelApprovalInput)
);
log.info('Channel registration card delivered', {
messagingGroupId,
targetAgentGroupId: target.id,
agentGroupCount: agentGroups.length,
approver: delivery.userId,
});
} catch (err) {
log.error('Channel registration card delivery failed', {
messagingGroupId,
err,
});
log.error('Channel registration card delivery failed', { messagingGroupId, err });
}
}
export const APPROVE_VALUE = 'approve';
export const REJECT_VALUE = 'reject';
// ── Helpers for the response handler (index.ts) ──
/**
* Build normalized options for the agent-selection follow-up card.
*/
export function buildAgentSelectionOptions(agentGroups: AgentGroup[]): NormalizedOption[] {
const options: RawOption[] = agentGroups.map((ag) => ({
label: ag.name,
selectedLabel: `✅ Connected to ${ag.name}`,
value: `${CONNECT_PREFIX}${ag.id}`,
}));
options.push({
label: 'Cancel',
selectedLabel: '🙅 Cancelled',
value: REJECT_VALUE,
});
return normalizeOptions(options);
}
/**
* Create a new agent group and initialize its filesystem. Handles
* folder-name collisions with numeric suffixes.
*/
export function createNewAgentGroup(name: string): AgentGroup {
let folder = toFolder(name);
const baseFolder = folder;
let suffix = 2;
while (getAgentGroupByFolder(folder)) {
folder = `${baseFolder}-${suffix}`;
suffix++;
}
const agId = `ag-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
createAgentGroup({
id: agId,
name,
folder,
agent_provider: null,
created_at: new Date().toISOString(),
});
const ag = getAgentGroup(agId)!;
initGroupFilesystem(ag);
return ag;
}

View File

@@ -51,6 +51,12 @@ export function hasInFlightChannelApproval(messagingGroupId: string): boolean {
return row !== undefined;
}
export function updatePendingChannelApprovalCard(messagingGroupId: string, title: string, optionsJson: string): void {
getDb()
.prepare('UPDATE pending_channel_approvals SET title = ?, options_json = ? WHERE messaging_group_id = ?')
.run(title, optionsJson, messagingGroupId);
}
export function deletePendingChannelApproval(messagingGroupId: string): void {
getDb().prepare('DELETE FROM pending_channel_approvals WHERE messaging_group_id = ?').run(messagingGroupId);
}

View File

@@ -16,27 +16,53 @@
* access gate is not registered and core defaults to allow-all.
*/
import { recordDroppedMessage } from '../../db/dropped-messages.js';
import { getAgentGroup, getAllAgentGroups } from '../../db/agent-groups.js';
import { createMessagingGroupAgent, setMessagingGroupDeniedAt } from '../../db/messaging-groups.js';
import {
routeInbound,
setAccessGate,
setChannelRequestGate,
setMessageInterceptor,
setSenderResolver,
setSenderScopeGate,
type AccessGateResult,
} from '../../router.js';
import type { InboundEvent } from '../../channels/adapter.js';
import { registerResponseHandler, type ResponsePayload } from '../../response-registry.js';
import { getDeliveryAdapter } from '../../delivery.js';
import { log } from '../../log.js';
import type { MessagingGroup, MessagingGroupAgent } from '../../types.js';
import { canAccessAgentGroup } from './access.js';
import { requestChannelApproval } from './channel-approval.js';
import {
buildAgentSelectionOptions,
CHOOSE_EXISTING_VALUE,
CONNECT_PREFIX,
createNewAgentGroup,
NEW_AGENT_VALUE,
REJECT_VALUE,
requestChannelApproval,
} from './channel-approval.js';
import { addMember } from './db/agent-group-members.js';
import { deletePendingChannelApproval, getPendingChannelApproval } from './db/pending-channel-approvals.js';
import {
deletePendingChannelApproval,
getPendingChannelApproval,
updatePendingChannelApprovalCard,
} from './db/pending-channel-approvals.js';
import { deletePendingSenderApproval, getPendingSenderApproval } from './db/pending-sender-approvals.js';
import { hasAdminPrivilege } from './db/user-roles.js';
import { getUser, upsertUser } from './db/users.js';
import { requestSenderApproval } from './sender-approval.js';
import { ensureUserDm } from './user-dm.js';
// ── Free-text name input state ──
// Tracks approvers waiting for a text reply with the agent name. Keyed by
// namespaced userId (e.g. "slack:U0ABC"). Cleared on receipt or restart.
interface PendingNameInput {
channelMgId: string;
dmChannelType: string;
dmPlatformId: string;
}
const awaitingNameInput = new Map<string, PendingNameInput>();
function extractAndUpsertUser(event: InboundEvent): string | null {
let content: Record<string, unknown>;
@@ -271,22 +297,17 @@ setChannelRequestGate(async (mg, event) => {
* by messaging_group_id). If no such row, return false so downstream
* handlers get a shot.
*
* Approve: create the wiring with MVP defaults (mention-sticky for
* groups / pattern='.' for DMs; sender_scope='known';
* ignored_message_policy='accumulate'), add the triggering sender as a
* member so sender_scope doesn't immediately bounce them into a
* sender-approval card, then replay the original event.
*
* Deny: set `messaging_groups.denied_at = now()` so future mentions on
* this channel drop silently until an admin explicitly wires it.
* Value dispatch:
* connect:<id> — wire to an existing agent group, replay the message
* choose_existing — send a follow-up card listing all agents
* new_agent — prompt for a free-text agent name (interceptor
* captures the reply and creates immediately)
* reject — set denied_at, delete pending row
*/
async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<boolean> {
const row = getPendingChannelApproval(payload.questionId);
if (!row) return false;
// Click-auth: same pattern as sender-approval (see commit 68058cb).
// Raw platform userId → namespace with channelType → must match the
// designated approver OR have admin privilege over the target agent.
const clickerId = payload.userId ? `${payload.channelType}:${payload.userId}` : null;
const isAuthorized =
clickerId !== null && (clickerId === row.approver_user_id || hasAdminPrivilege(clickerId, row.agent_group_id));
@@ -296,25 +317,129 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
clickerId,
expectedApprover: row.approver_user_id,
});
return true; // claim but take no action
return true;
}
const approverId = clickerId;
const approved = payload.value === 'approve';
if (!approved) {
// ── Reject / Cancel ──
if (payload.value === REJECT_VALUE) {
setMessagingGroupDeniedAt(row.messaging_group_id, new Date().toISOString());
deletePendingChannelApproval(row.messaging_group_id);
log.info('Channel registration denied', {
messagingGroupId: row.messaging_group_id,
agentGroupId: row.agent_group_id,
approverId,
});
return true;
}
// Rehydrate the original event to know (a) whether it was a DM or group
// (chooses engage_mode default), and (b) who the triggering sender was
// (auto-member-add so sender_scope='known' doesn't bounce the replay).
// ── Choose existing agent — send agent-selection follow-up card ──
if (payload.value === CHOOSE_EXISTING_VALUE) {
const approverDm = await ensureUserDm(row.approver_user_id);
if (!approverDm) {
log.error('Channel registration: no DM channel for approver', {
messagingGroupId: row.messaging_group_id,
approverUserId: row.approver_user_id,
});
return true;
}
const adapter = getDeliveryAdapter();
if (!adapter) return true;
const agentGroups = getAllAgentGroups();
const options = buildAgentSelectionOptions(agentGroups);
const title = '📋 Choose an agent';
updatePendingChannelApprovalCard(row.messaging_group_id, title, JSON.stringify(options));
try {
await adapter.deliver(
approverDm.channel_type,
approverDm.platform_id,
null,
'chat-sdk',
JSON.stringify({
type: 'ask_question',
questionId: row.messaging_group_id,
title,
question: 'Which agent should handle this channel?',
options,
}),
);
} catch (err) {
log.error('Channel registration: agent-selection card delivery failed', {
messagingGroupId: row.messaging_group_id,
err,
});
}
return true;
}
// ── Create new agent — prompt for free-text name ──
if (payload.value === NEW_AGENT_VALUE) {
const approverDm = await ensureUserDm(row.approver_user_id);
if (!approverDm) {
log.error('Channel registration: no DM channel for approver', {
messagingGroupId: row.messaging_group_id,
approverUserId: row.approver_user_id,
});
return true;
}
const adapter = getDeliveryAdapter();
if (!adapter) {
log.error('Channel registration: no delivery adapter for name prompt', {
messagingGroupId: row.messaging_group_id,
});
return true;
}
awaitingNameInput.set(row.approver_user_id, {
channelMgId: row.messaging_group_id,
dmChannelType: approverDm.channel_type,
dmPlatformId: approverDm.platform_id,
});
try {
await adapter.deliver(
approverDm.channel_type,
approverDm.platform_id,
null,
'chat-sdk',
JSON.stringify({ text: 'Reply with the name for your new agent:' }),
);
} catch (err) {
log.error('Channel registration: name prompt delivery failed', {
messagingGroupId: row.messaging_group_id,
err,
});
awaitingNameInput.delete(row.approver_user_id);
}
return true;
}
// ── Resolve target agent group (connect to existing or create new) ──
let targetAgentGroupId: string;
if (payload.value.startsWith(CONNECT_PREFIX)) {
targetAgentGroupId = payload.value.slice(CONNECT_PREFIX.length);
const ag = getAgentGroup(targetAgentGroupId);
if (!ag) {
log.error('Channel registration: target agent group no longer exists', {
messagingGroupId: row.messaging_group_id,
targetAgentGroupId,
});
deletePendingChannelApproval(row.messaging_group_id);
return true;
}
} else {
log.warn('Channel registration: unknown response value', {
messagingGroupId: row.messaging_group_id,
value: payload.value,
});
return true;
}
// ── Wire + replay (shared path for connect and create) ──
let event: InboundEvent;
try {
event = JSON.parse(row.original_message) as InboundEvent;
@@ -327,15 +452,6 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
return true;
}
// Decide engage_mode from the original event. DMs (`isMention=true` &
// not in a group) get `pattern='.'` (always respond). Group mentions
// get `mention-sticky` (respond now + follow the thread).
//
// We can't read `mg.is_group` reliably here because we only auto-create
// the mg with `is_group=0` on first sight — the adapter hasn't told us
// yet whether it's actually a group. Fall back to the InboundEvent's
// `threadId`: a non-null threadId implies a threaded platform (Slack
// channel thread, Discord thread), which we treat as a group.
const isGroup = event.threadId !== null;
const engageMode: MessagingGroupAgent['engage_mode'] = isGroup ? 'mention-sticky' : 'pattern';
const engagePattern = isGroup ? null : '.';
@@ -344,7 +460,7 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
createMessagingGroupAgent({
id: mgaId,
messaging_group_id: row.messaging_group_id,
agent_group_id: row.agent_group_id,
agent_group_id: targetAgentGroupId,
engage_mode: engageMode,
engage_pattern: engagePattern,
sender_scope: 'known',
@@ -355,28 +471,22 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
});
log.info('Channel registration approved — wiring created', {
messagingGroupId: row.messaging_group_id,
agentGroupId: row.agent_group_id,
agentGroupId: targetAgentGroupId,
mgaId,
engageMode,
approverId,
});
// Auto-admit the triggering sender. Without this, the replay below
// would bounce through sender-approval (sender_scope='known' +
// sender-is-not-a-member).
const senderUserId = extractAndUpsertUser(event);
if (senderUserId) {
addMember({
user_id: senderUserId,
agent_group_id: row.agent_group_id,
agent_group_id: targetAgentGroupId,
added_by: approverId,
added_at: new Date().toISOString(),
});
}
// Clear the pending row BEFORE replay so the gate check on the second
// attempt sees a wired channel (agentCount > 0) and takes the fan-out
// path normally.
deletePendingChannelApproval(row.messaging_group_id);
try {
@@ -391,3 +501,117 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<
}
registerResponseHandler(handleChannelApprovalResponse);
// ── Free-text name interceptor ──
// Captures the next DM from an approver who clicked "Create new agent",
// creates the agent immediately, wires the channel, and replays.
setMessageInterceptor(async (event: InboundEvent): Promise<boolean> => {
const userId = extractAndUpsertUser(event);
if (!userId) return false;
const pending = awaitingNameInput.get(userId);
if (!pending) return false;
if (event.channelType !== pending.dmChannelType || event.platformId !== pending.dmPlatformId) return false;
awaitingNameInput.delete(userId);
let text: string | undefined;
try {
const parsed = JSON.parse(event.message.content) as Record<string, unknown>;
text = (typeof parsed.text === 'string' ? parsed.text : undefined)?.trim();
} catch {
/* fall through */
}
if (!text) {
log.warn('Channel registration: empty name reply, ignoring', { userId });
return true;
}
const row = getPendingChannelApproval(pending.channelMgId);
if (!row) return true;
const ag = createNewAgentGroup(text);
log.info('Channel registration: new agent group created', {
messagingGroupId: row.messaging_group_id,
agentGroupId: ag.id,
agentName: ag.name,
folder: ag.folder,
});
let originalEvent: InboundEvent;
try {
originalEvent = JSON.parse(row.original_message) as InboundEvent;
} catch (err) {
log.error('Channel registration: failed to parse stored event', {
messagingGroupId: row.messaging_group_id,
err,
});
deletePendingChannelApproval(row.messaging_group_id);
return true;
}
const isGroup = originalEvent.threadId !== null;
const engageMode: MessagingGroupAgent['engage_mode'] = isGroup ? 'mention-sticky' : 'pattern';
const engagePattern = isGroup ? null : '.';
const mgaId = `mga-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
createMessagingGroupAgent({
id: mgaId,
messaging_group_id: row.messaging_group_id,
agent_group_id: ag.id,
engage_mode: engageMode,
engage_pattern: engagePattern,
sender_scope: 'known',
ignored_message_policy: 'accumulate',
session_mode: 'shared',
priority: 0,
created_at: new Date().toISOString(),
});
log.info('Channel registration approved — wiring created', {
messagingGroupId: row.messaging_group_id,
agentGroupId: ag.id,
mgaId,
engageMode,
approverId: userId,
});
const senderUserId = extractAndUpsertUser(originalEvent);
if (senderUserId) {
addMember({
user_id: senderUserId,
agent_group_id: ag.id,
added_by: userId,
added_at: new Date().toISOString(),
});
}
deletePendingChannelApproval(row.messaging_group_id);
try {
await routeInbound(originalEvent);
} catch (err) {
log.error('Failed to replay message after channel approval', {
messagingGroupId: row.messaging_group_id,
err,
});
}
const adapter = getDeliveryAdapter();
if (adapter) {
const dm = await ensureUserDm(row.approver_user_id);
if (dm) {
adapter
.deliver(
dm.channel_type,
dm.platform_id,
null,
'chat-sdk',
JSON.stringify({ text: `✅ Agent "${ag.name}" created and connected.` }),
)
.catch(() => {});
}
}
return true;
});

View File

@@ -108,6 +108,20 @@ export function setSenderScopeGate(fn: SenderScopeGateFn): void {
senderScopeGate = fn;
}
/**
* Message-interceptor hook. Runs at the very top of routeInbound, before
* messaging-group resolution. When the interceptor returns true the message
* is consumed and routing stops. Used by the permissions module to capture
* free-text replies during multi-step approval flows (e.g. agent naming).
*/
export type MessageInterceptorFn = (event: InboundEvent) => Promise<boolean>;
let messageInterceptor: MessageInterceptorFn | null = null;
export function setMessageInterceptor(fn: MessageInterceptorFn): void {
messageInterceptor = fn;
}
/**
* Channel-registration hook. Runs when the router sees a mention/DM on a
* messaging group that has no wirings AND hasn't been denied. The hook is
@@ -142,6 +156,10 @@ function safeParseContent(raw: string): { text?: string; sender?: string; sender
* Creates messaging group + session if they don't exist yet.
*/
export async function routeInbound(event: InboundEvent): Promise<void> {
// Pre-route interceptor — lets modules consume messages before any routing
// (e.g. free-text replies during multi-step approval flows).
if (messageInterceptor && (await messageInterceptor(event))) return;
// 0. Apply the adapter's thread policy. Non-threaded adapters (Telegram,
// WhatsApp, iMessage, email) collapse threads to the channel.
const adapter = getChannelAdapter(event.channelType);

View File

@@ -21,7 +21,6 @@ import { DATA_DIR } from './config.js';
import { getMessagingGroup } from './db/messaging-groups.js';
import {
createSession,
findSession,
findSessionByAgentGroup,
findSessionForAgent,
getSession,
@@ -38,6 +37,11 @@ import {
import { log } from './log.js';
import type { Session } from './types.js';
function isPathInside(parent: string, child: string): boolean {
const relative = path.relative(parent, child);
return relative === '' || (!relative.startsWith('..') && !path.isAbsolute(relative));
}
/** Root directory for all session data. */
export function sessionsBaseDir(): string {
return path.join(DATA_DIR, 'v2-sessions');
@@ -234,6 +238,20 @@ export function writeSessionMessage(
/**
* If message content has attachments with base64 `data`, save them to
* the session's inbox directory and replace with `localPath`.
*
* Both `messageId` and `att.name` originate in untrusted input. WhatsApp
* passes `msg.key.id` through raw (and that field is client generated, so a
* peer can craft it), and other adapters may follow. The session dir is
* mounted writable into the container, so a compromised agent can also
* pre-place a symlink at `inbox/<future msgId>/` and wait for a chat message
* with a matching id to redirect the host's write.
*
* Defenses, mirrored from the outbound side:
* 1. basename check on `messageId` and `filename`.
* 2. lstat of the inbox dir to refuse pre-placed symlinks.
* 3. realpath-based containment under the session inbox root.
* 4. `wx` flag on writeFileSync to refuse following a pre-existing symlink
* at the target file path or overwriting any existing file.
*/
function extractAttachmentFiles(
agentGroupId: string,
@@ -251,34 +269,75 @@ function extractAttachmentFiles(
const attachments = parsed.attachments as Array<Record<string, unknown>> | undefined;
if (!Array.isArray(attachments)) return contentStr;
if (!isSafeAttachmentName(messageId)) {
log.warn('Rejecting unsafe inbound message id', { messageId });
return contentStr;
}
let changed = false;
for (const att of attachments) {
if (typeof att.data === 'string') {
// The name field is attacker-controlled: chat platforms with E2E
// attachment encryption (WhatsApp, Matrix) cannot sanitize filename
// server-side, and other adapters pass att.name through raw. Without
// this guard, `path.join(inboxDir, '../../...')` writes anywhere the
// host process has fs permission — see Signal Desktop's Nov 2025
// attachment-fileName advisory for the same archetype.
const rawName = deriveAttachmentName(att);
const filename = isSafeAttachmentName(rawName) ? rawName : `attachment-${Date.now()}`;
if (filename !== rawName) {
log.warn('Refused unsafe attachment filename — would escape inbox', {
messageId,
rawName,
replacement: filename,
});
}
const inboxDir = path.join(sessionDir(agentGroupId, sessionId), 'inbox', messageId);
fs.mkdirSync(inboxDir, { recursive: true });
const filePath = path.join(inboxDir, filename);
fs.writeFileSync(filePath, Buffer.from(att.data as string, 'base64'));
att.name = filename;
att.localPath = `inbox/${messageId}/${filename}`;
delete att.data;
changed = true;
log.debug('Saved attachment to inbox', { messageId, filename, size: att.size });
if (typeof att.data !== 'string') continue;
const rawName = deriveAttachmentName(att);
const filename = isSafeAttachmentName(rawName) ? rawName : `attachment-${Date.now()}`;
if (filename !== rawName) {
log.warn('Refused unsafe attachment filename, would escape inbox', {
messageId,
rawName,
replacement: filename,
});
}
const inboxDir = path.join(sessionDir(agentGroupId, sessionId), 'inbox', messageId);
// Refuse to mkdir through a symlink that the container may have pre placed
// at inboxDir. With recursive:true, mkdirSync would silently no op on a
// pre existing symlink and the subsequent writeFileSync would follow it.
if (fs.existsSync(inboxDir)) {
const stat = fs.lstatSync(inboxDir);
if (stat.isSymbolicLink() || !stat.isDirectory()) {
log.warn('Rejecting unsafe inbox directory', { messageId, inboxDir });
continue;
}
}
fs.mkdirSync(inboxDir, { recursive: true });
let realInboxDir: string;
try {
realInboxDir = fs.realpathSync(inboxDir);
} catch (err) {
log.warn('Failed to resolve inbox directory', { messageId, err });
continue;
}
const inboxRoot = path.join(sessionDir(agentGroupId, sessionId), 'inbox');
if (!isPathInside(fs.realpathSync(inboxRoot), realInboxDir)) {
log.warn('Inbox directory escaped session inbox root', { messageId, inboxDir });
continue;
}
const filePath = path.join(inboxDir, filename);
try {
// wx = exclusive create. Refuses to follow a pre existing symlink or
// overwrite any existing file. The host expects to be the sole writer
// of these attachments.
fs.writeFileSync(filePath, Buffer.from(att.data as string, 'base64'), { flag: 'wx' });
} catch (err: unknown) {
const e = err as NodeJS.ErrnoException;
if (e.code === 'EEXIST') {
log.warn('Inbox attachment target already exists, refusing to overwrite', {
messageId,
filename,
});
continue;
}
throw err;
}
att.name = filename;
att.localPath = `inbox/${messageId}/${filename}`;
delete att.data;
changed = true;
log.debug('Saved attachment to inbox', { messageId, filename, size: att.size });
}
return changed ? JSON.stringify(parsed) : contentStr;
@@ -369,19 +428,48 @@ export function readOutboxFiles(
messageId: string,
filenames: string[],
): OutboundFile[] | undefined {
if (!isSafeAttachmentName(messageId)) {
log.warn('Rejecting unsafe outbox message id', { messageId });
return undefined;
}
const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId);
if (!fs.existsSync(outboxDir)) return undefined;
let realOutboxDir: string;
try {
const stat = fs.lstatSync(outboxDir);
if (!stat.isDirectory() || stat.isSymbolicLink()) {
log.warn('Rejecting unsafe outbox directory', { messageId, outboxDir });
return undefined;
}
realOutboxDir = fs.realpathSync(outboxDir);
} catch (err) {
log.warn('Failed to inspect outbox directory', { messageId, err });
return undefined;
}
const files: OutboundFile[] = [];
for (const filename of filenames) {
// Reject any name that isn't a bare basename before touching the filesystem.
if (!isSafeAttachmentName(filename)) {
log.warn('Refused unsafe outbox filename would escape outbox', { messageId, filename });
log.warn('Refused unsafe outbox filename, would escape outbox', { messageId, filename });
continue;
}
const filePath = path.join(outboxDir, filename);
if (fs.existsSync(filePath)) {
files.push({ filename, data: fs.readFileSync(filePath) });
} else {
try {
const stat = fs.lstatSync(filePath);
if (!stat.isFile() || stat.isSymbolicLink()) {
log.warn('Rejecting unsafe outbox file', { messageId, filename });
continue;
}
const realFilePath = fs.realpathSync(filePath);
if (!isPathInside(realOutboxDir, realFilePath)) {
log.warn('Rejecting outbox file outside message directory', { messageId, filename });
continue;
}
files.push({ filename, data: fs.readFileSync(realFilePath) });
} catch {
log.warn('Outbox file not found', { messageId, filename });
}
}
@@ -395,10 +483,26 @@ export function readOutboxFiles(
* thrown error would trigger the delivery retry path and deliver twice.
*/
export function clearOutbox(agentGroupId: string, sessionId: string, messageId: string): void {
if (!isSafeAttachmentName(messageId)) {
log.warn('Rejecting unsafe outbox cleanup message id', { messageId });
return;
}
const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId);
if (!fs.existsSync(outboxDir)) return;
try {
fs.rmSync(outboxDir, { recursive: true, force: true });
const stat = fs.lstatSync(outboxDir);
if (!stat.isDirectory() || stat.isSymbolicLink()) {
log.warn('Rejecting unsafe outbox cleanup directory', { messageId, outboxDir });
return;
}
const realOutboxBase = fs.realpathSync(path.join(sessionDir(agentGroupId, sessionId), 'outbox'));
const realOutboxDir = fs.realpathSync(outboxDir);
if (!isPathInside(realOutboxBase, realOutboxDir)) {
log.warn('Rejecting outbox cleanup outside session outbox', { messageId, outboxDir });
return;
}
fs.rmSync(realOutboxDir, { recursive: true, force: true });
} catch (err) {
log.warn('Outbox cleanup failed (message already delivered)', { messageId, err });
}