Merge pull request #1877 from qwibitai/feature/channel-registration
Move engagement policy to router, add unknown-channel registration flow
This commit is contained in:
@@ -12,7 +12,7 @@ import { TIMEZONE, formatLocalTime } from './timezone.js';
|
||||
export type CommandCategory = 'admin' | 'filtered' | 'passthrough' | 'none';
|
||||
|
||||
const ADMIN_COMMANDS = new Set(['/remote-control', '/clear', '/compact', '/context', '/cost', '/files']);
|
||||
const FILTERED_COMMANDS = new Set(['/help', '/login', '/logout', '/doctor', '/config']);
|
||||
const FILTERED_COMMANDS = new Set(['/help', '/login', '/logout', '/doctor', '/config', '/start']);
|
||||
|
||||
export interface CommandInfo {
|
||||
category: CommandCategory;
|
||||
|
||||
@@ -5,45 +5,8 @@
|
||||
* Two patterns: native adapters (implement directly) or Chat SDK bridge (wrap a Chat SDK adapter).
|
||||
*/
|
||||
|
||||
/** Configuration for a registered conversation (messaging group + agent wiring). */
|
||||
export interface ConversationConfig {
|
||||
platformId: string;
|
||||
agentGroupId: string;
|
||||
/**
|
||||
* When does the agent engage on messages from this conversation?
|
||||
*
|
||||
* 'pattern' — regex test against message text; engagePattern='.'
|
||||
* means "always" (match everything)
|
||||
* 'mention' — fires only on @mention
|
||||
* 'mention-sticky' — fires on @mention, then auto-subscribes to the thread
|
||||
* and treats subsequent messages as engage-all.
|
||||
* Threaded platforms only (Slack/Discord/Linear).
|
||||
*/
|
||||
engageMode: 'pattern' | 'mention' | 'mention-sticky';
|
||||
/** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */
|
||||
engagePattern?: string | null;
|
||||
/**
|
||||
* What to do with messages this wiring doesn't engage on.
|
||||
*
|
||||
* 'drop' — discard silently
|
||||
* 'accumulate' — still forward to the host so the router can store the
|
||||
* message in this agent's session with trigger=0. It
|
||||
* rides along as context when the agent next wakes, but
|
||||
* doesn't wake it on its own.
|
||||
*
|
||||
* The bridge reads this to decide whether to forward a non-engaging
|
||||
* message at all — if any wiring on a conversation has 'accumulate', the
|
||||
* bridge forwards and lets the router apply the per-wiring decision.
|
||||
*/
|
||||
ignoredMessagePolicy?: 'drop' | 'accumulate';
|
||||
sessionMode: 'shared' | 'per-thread' | 'agent-shared';
|
||||
}
|
||||
|
||||
/** Passed to the adapter at setup time. */
|
||||
export interface ChannelSetup {
|
||||
/** Known conversations from central DB. */
|
||||
conversations: ConversationConfig[];
|
||||
|
||||
/** Called when an inbound message arrives from the platform. */
|
||||
onInbound(platformId: string, threadId: string | null, message: InboundMessage): void | Promise<void>;
|
||||
|
||||
@@ -125,7 +88,17 @@ export interface ChannelAdapter {
|
||||
// Optional
|
||||
setTyping?(platformId: string, threadId: string | null): Promise<void>;
|
||||
syncConversations?(): Promise<ConversationInfo[]>;
|
||||
updateConversations?(conversations: ConversationConfig[]): void;
|
||||
|
||||
/**
|
||||
* Subscribe the bot to a thread so follow-up messages route via the
|
||||
* platform's "subscribed message" path (onSubscribedMessage in Chat SDK).
|
||||
* Called by the router when a mention-sticky wiring first engages in a
|
||||
* thread. Idempotent: calling twice on the same thread is a no-op.
|
||||
*
|
||||
* Platforms without a subscription concept can omit this; the router
|
||||
* treats absence as a no-op.
|
||||
*/
|
||||
subscribe?(platformId: string, threadId: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* Open (or fetch) a DM with this user, returning the platform_id of the
|
||||
|
||||
@@ -64,8 +64,6 @@ function createMockAdapter(
|
||||
},
|
||||
|
||||
async setTyping() {},
|
||||
|
||||
updateConversations() {},
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -2,37 +2,19 @@ import { describe, expect, it } from 'vitest';
|
||||
|
||||
import type { Adapter } from 'chat';
|
||||
|
||||
import type { ConversationConfig } from './adapter.js';
|
||||
import { createChatSdkBridge, shouldEngage, type EngageSource } from './chat-sdk-bridge.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
|
||||
function stubAdapter(partial: Partial<Adapter>): Adapter {
|
||||
return { name: 'stub', ...partial } as unknown as Adapter;
|
||||
}
|
||||
|
||||
function cfg(
|
||||
partial: Partial<ConversationConfig> & { engageMode: ConversationConfig['engageMode'] },
|
||||
): ConversationConfig {
|
||||
return {
|
||||
platformId: partial.platformId ?? 'C1',
|
||||
agentGroupId: partial.agentGroupId ?? 'ag-1',
|
||||
engageMode: partial.engageMode,
|
||||
engagePattern: partial.engagePattern ?? null,
|
||||
ignoredMessagePolicy: partial.ignoredMessagePolicy ?? 'drop',
|
||||
sessionMode: partial.sessionMode ?? 'shared',
|
||||
};
|
||||
}
|
||||
|
||||
function mapFor(...configs: ConversationConfig[]): Map<string, ConversationConfig[]> {
|
||||
const map = new Map<string, ConversationConfig[]>();
|
||||
for (const c of configs) {
|
||||
const existing = map.get(c.platformId);
|
||||
if (existing) existing.push(c);
|
||||
else map.set(c.platformId, [c]);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
describe('createChatSdkBridge', () => {
|
||||
// The bridge is now transport-only: forward inbound events, relay outbound
|
||||
// ops. All per-wiring engage / accumulate / drop / subscribe decisions live
|
||||
// in the router (src/router.ts routeInbound / evaluateEngage) and are
|
||||
// exercised by host-core.test.ts end-to-end. These tests only cover the
|
||||
// bridge's narrow, platform-adjacent surface.
|
||||
|
||||
it('omits openDM when the underlying Chat SDK adapter has none', () => {
|
||||
const bridge = createChatSdkBridge({
|
||||
adapter: stubAdapter({}),
|
||||
@@ -59,162 +41,12 @@ describe('createChatSdkBridge', () => {
|
||||
expect(openDMCalls).toEqual(['user-42']);
|
||||
expect(platformId).toBe('stub:user-42');
|
||||
});
|
||||
});
|
||||
|
||||
describe('shouldEngage', () => {
|
||||
describe('unknown conversation', () => {
|
||||
const empty = new Map<string, ConversationConfig[]>();
|
||||
const sources: EngageSource[] = ['subscribed', 'mention', 'dm'];
|
||||
for (const source of sources) {
|
||||
it(`forwards for source='${source}' (may be a not-yet-wired group)`, () => {
|
||||
expect(shouldEngage(empty, 'C1', source, '')).toEqual({ forward: true, stickySubscribe: false });
|
||||
});
|
||||
}
|
||||
it("DROPS for source='new-message' (would flood from unwired channels)", () => {
|
||||
expect(shouldEngage(empty, 'C1', 'new-message', 'hello')).toEqual({
|
||||
forward: false,
|
||||
stickySubscribe: false,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("engageMode='mention' + ignoredMessagePolicy='drop' (default)", () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'mention' }));
|
||||
it('forwards on mention + dm', () => {
|
||||
expect(shouldEngage(conv, 'C1', 'mention', '').forward).toBe(true);
|
||||
expect(shouldEngage(conv, 'C1', 'dm', '').forward).toBe(true);
|
||||
});
|
||||
it('does NOT forward on subscribed or new-message (prevents keep-firing + flooding)', () => {
|
||||
expect(shouldEngage(conv, 'C1', 'subscribed', '').forward).toBe(false);
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', '').forward).toBe(false);
|
||||
});
|
||||
it('never asks to subscribe', () => {
|
||||
for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) {
|
||||
expect(shouldEngage(conv, 'C1', s, '').stickySubscribe).toBe(false);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("engageMode='mention-sticky'", () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'mention-sticky' }));
|
||||
it('forwards on mention + dm with stickySubscribe=true', () => {
|
||||
expect(shouldEngage(conv, 'C1', 'mention', '')).toEqual({ forward: true, stickySubscribe: true });
|
||||
expect(shouldEngage(conv, 'C1', 'dm', '')).toEqual({ forward: true, stickySubscribe: true });
|
||||
});
|
||||
it('forwards subscribed follow-ups without re-subscribing', () => {
|
||||
expect(shouldEngage(conv, 'C1', 'subscribed', '')).toEqual({ forward: true, stickySubscribe: false });
|
||||
});
|
||||
it('does NOT forward on new-message (explicit mention required to start)', () => {
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', '').forward).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("engageMode='pattern'", () => {
|
||||
it('pattern="." forwards on every source (when conversation is known)', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '.' }));
|
||||
for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) {
|
||||
expect(shouldEngage(conv, 'C1', s, 'anything').forward).toBe(true);
|
||||
}
|
||||
});
|
||||
|
||||
it('tests regex against text on new-message (the main bug fix)', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '^!report' }));
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', '!report now').forward).toBe(true);
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'nothing to see').forward).toBe(false);
|
||||
});
|
||||
|
||||
it('pattern regex applies on every source (symmetry)', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: 'deploy' }));
|
||||
for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) {
|
||||
expect(shouldEngage(conv, 'C1', s, 'time to deploy').forward).toBe(true);
|
||||
expect(shouldEngage(conv, 'C1', s, 'weather today').forward).toBe(false);
|
||||
}
|
||||
});
|
||||
|
||||
it('pattern never triggers sticky-subscribe', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '.' }));
|
||||
for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) {
|
||||
expect(shouldEngage(conv, 'C1', s, 'hi').stickySubscribe).toBe(false);
|
||||
}
|
||||
});
|
||||
|
||||
it('invalid regex fails open (admin sees something rather than silent drop)', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '[unclosed' }));
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'x').forward).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ignoredMessagePolicy='accumulate'", () => {
|
||||
it('forwards non-engaging new-message so the router can store it as context (trigger=0)', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'accumulate' }));
|
||||
// Plain message in unsubscribed group — mention rule says no engage,
|
||||
// but accumulate says forward anyway.
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'chit chat')).toEqual({
|
||||
forward: true,
|
||||
stickySubscribe: false,
|
||||
});
|
||||
});
|
||||
|
||||
it('forwards non-engaging subscribed messages for accumulation', () => {
|
||||
// mention wiring in a subscribed thread: the mention handler already
|
||||
// fired once, thread is now subscribed, follow-ups route here. The
|
||||
// base 'mention' rule wouldn't engage without an @-mention, but
|
||||
// accumulate says capture the context.
|
||||
const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'accumulate' }));
|
||||
expect(shouldEngage(conv, 'C1', 'subscribed', 'follow up talk').forward).toBe(true);
|
||||
});
|
||||
|
||||
it('does NOT set stickySubscribe purely from accumulate (avoid misleading bot presence)', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'mention-sticky', ignoredMessagePolicy: 'accumulate' }));
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'plain').stickySubscribe).toBe(false);
|
||||
});
|
||||
|
||||
it("accumulate doesn't override the 'unknown conversation → drop new-message' rule", () => {
|
||||
// Unknown conversation (not in map): accumulate can't be read because
|
||||
// there's no config to read from. We still drop.
|
||||
const empty = new Map<string, ConversationConfig[]>();
|
||||
expect(shouldEngage(empty, 'C-unknown', 'new-message', 'x').forward).toBe(false);
|
||||
});
|
||||
|
||||
it("drop policy + non-engaging message → doesn't forward", () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'drop' }));
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'plain').forward).toBe(false);
|
||||
});
|
||||
|
||||
it('engaging message with drop policy still forwards (engage wins regardless)', () => {
|
||||
const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'drop' }));
|
||||
expect(shouldEngage(conv, 'C1', 'mention', '@bot hi').forward).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('multiple wirings on one conversation', () => {
|
||||
it('takes the union across wirings (any-engage wins)', () => {
|
||||
// mention wiring + pattern wiring on the same channel. A plain message
|
||||
// should engage via the pattern wiring even though the mention wiring
|
||||
// alone would reject it.
|
||||
const conv = mapFor(
|
||||
cfg({ agentGroupId: 'ag-a', engageMode: 'mention' }),
|
||||
cfg({ agentGroupId: 'ag-b', engageMode: 'pattern', engagePattern: '^hi' }),
|
||||
);
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'hi there').forward).toBe(true);
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'something else').forward).toBe(false);
|
||||
});
|
||||
|
||||
it('any accumulate wiring causes forward even if all others would drop', () => {
|
||||
const conv = mapFor(
|
||||
cfg({ agentGroupId: 'ag-a', engageMode: 'mention', ignoredMessagePolicy: 'drop' }),
|
||||
cfg({ agentGroupId: 'ag-b', engageMode: 'mention', ignoredMessagePolicy: 'accumulate' }),
|
||||
);
|
||||
// Plain message: ag-a would drop, ag-b would accumulate → forward.
|
||||
expect(shouldEngage(conv, 'C1', 'new-message', 'plain').forward).toBe(true);
|
||||
});
|
||||
|
||||
it('stickySubscribe from any mention-sticky wiring wins', () => {
|
||||
const conv = mapFor(
|
||||
cfg({ agentGroupId: 'ag-a', engageMode: 'mention' }),
|
||||
cfg({ agentGroupId: 'ag-b', engageMode: 'mention-sticky' }),
|
||||
);
|
||||
expect(shouldEngage(conv, 'C1', 'mention', '')).toEqual({ forward: true, stickySubscribe: true });
|
||||
it('exposes subscribe (lets the router initiate thread subscription on mention-sticky engage)', () => {
|
||||
const bridge = createChatSdkBridge({
|
||||
adapter: stubAdapter({}),
|
||||
supportsThreads: true,
|
||||
});
|
||||
expect(typeof bridge.subscribe).toBe('function');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -21,7 +21,7 @@ import { SqliteStateAdapter } from '../state-sqlite.js';
|
||||
import { registerWebhookAdapter } from '../webhook-server.js';
|
||||
import { getAskQuestionRender } from '../db/sessions.js';
|
||||
import { normalizeOptions, type NormalizedOption } from './ask-question.js';
|
||||
import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js';
|
||||
import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js';
|
||||
|
||||
/** Adapter with optional gateway support (e.g., Discord). */
|
||||
interface GatewayAdapter extends Adapter {
|
||||
@@ -65,151 +65,14 @@ export interface ChatSdkBridgeConfig {
|
||||
transformOutboundText?: (text: string) => string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Which Chat SDK handler delivered this message. Determines which engage modes
|
||||
* can fire.
|
||||
*
|
||||
* - `subscribed` — `onSubscribedMessage`. Thread is already subscribed.
|
||||
* Every wiring mode (mention / mention-sticky / pattern)
|
||||
* evaluates normally.
|
||||
* - `mention` — `onNewMention`. Bot was @-mentioned in an unsubscribed
|
||||
* thread. mention + mention-sticky engage; pattern runs
|
||||
* the regex.
|
||||
* - `dm` — `onDirectMessage`. Unsubscribed DM. Treated like a
|
||||
* mention for engagement purposes.
|
||||
* - `new-message` — `onNewMessage(/./, …)`. Plain non-mention non-DM
|
||||
* message in an unsubscribed thread. Only `pattern`
|
||||
* wirings can fire here. mention / mention-sticky ignore
|
||||
* this source (they require an explicit mention).
|
||||
*/
|
||||
export type EngageSource = 'subscribed' | 'mention' | 'dm' | 'new-message';
|
||||
|
||||
/**
|
||||
* Should a message from (channelId, source, text) be forwarded to the host,
|
||||
* and if so, should the bridge subscribe the thread?
|
||||
*
|
||||
* Exported for testability — see `chat-sdk-bridge.test.ts`.
|
||||
*
|
||||
* We take the union across wired agents: if any wiring would engage OR any
|
||||
* wiring has `ignoredMessagePolicy='accumulate'`, the message is forwarded.
|
||||
* The host router then does the per-wiring decision in `deliverToAgent` —
|
||||
* engaging agents get `trigger=1` (wake), accumulating agents get
|
||||
* `trigger=0` (store as context, don't wake), drop-policy agents are
|
||||
* skipped (see `src/router.ts` routeInbound fan-out).
|
||||
*
|
||||
* `stickySubscribe` is only set when an actual engage happens (not just
|
||||
* accumulate) — subscribing a thread we'd only silently accumulate on would
|
||||
* misrepresent the bot's presence to other users.
|
||||
*/
|
||||
export function shouldEngage(
|
||||
conversations: Map<string, ConversationConfig[]>,
|
||||
channelId: string,
|
||||
source: EngageSource,
|
||||
text: string,
|
||||
): { forward: boolean; stickySubscribe: boolean } {
|
||||
const configs = conversations.get(channelId);
|
||||
|
||||
// Unknown conversation — behavior diverges by source:
|
||||
// - subscribed/mention/dm: forward anyway. These paths imply some
|
||||
// prior engagement (subscribe, @mention, DM open) and may be a new
|
||||
// group that hasn't been registered yet; central routing will log +
|
||||
// drop cleanly.
|
||||
// - new-message: DROP. `onNewMessage(/./, …)` fires for every message
|
||||
// in every unsubscribed thread the bot can see — including channels
|
||||
// the bot is merely *present* in but not wired to. Forwarding
|
||||
// everything would flood the host.
|
||||
if (!configs || configs.length === 0) {
|
||||
return { forward: source !== 'new-message', stickySubscribe: false };
|
||||
}
|
||||
|
||||
let engage = false;
|
||||
let accumulate = false;
|
||||
let stickySubscribe = false;
|
||||
|
||||
for (const cfg of configs) {
|
||||
let cfgEngages = false;
|
||||
switch (cfg.engageMode) {
|
||||
case 'mention':
|
||||
if (source === 'mention' || source === 'dm') cfgEngages = true;
|
||||
break;
|
||||
case 'mention-sticky':
|
||||
if (source === 'mention' || source === 'dm') {
|
||||
cfgEngages = true;
|
||||
stickySubscribe = true;
|
||||
} else if (source === 'subscribed') {
|
||||
// Thread was already subscribed on a prior mention — treat as
|
||||
// engage-all so follow-ups in the thread reach the agent.
|
||||
cfgEngages = true;
|
||||
}
|
||||
// source='new-message' → does not engage (requires explicit mention
|
||||
// to start). Accumulate policy is evaluated below if set.
|
||||
break;
|
||||
case 'pattern': {
|
||||
// Pattern evaluates on any source that delivers a plain message —
|
||||
// including new-message, which is the whole reason we registered
|
||||
// onNewMessage(/./). For mention/dm-delivered messages we still
|
||||
// test the regex (historical behavior), so pattern='foo' wirings
|
||||
// only fire on mentions whose text contains 'foo'.
|
||||
const pattern = cfg.engagePattern ?? '.';
|
||||
try {
|
||||
if (pattern === '.' || new RegExp(pattern).test(text)) cfgEngages = true;
|
||||
} catch {
|
||||
// Invalid regex → fail open so the admin can see something is
|
||||
// happening and fix the pattern.
|
||||
cfgEngages = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (cfgEngages) {
|
||||
engage = true;
|
||||
} else if (cfg.ignoredMessagePolicy === 'accumulate') {
|
||||
// Wiring doesn't engage on this message but wants it captured as
|
||||
// context for its session — forward so the router can write it with
|
||||
// trigger=0.
|
||||
accumulate = true;
|
||||
}
|
||||
}
|
||||
|
||||
return { forward: engage || accumulate, stickySubscribe };
|
||||
}
|
||||
|
||||
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
|
||||
const { adapter } = config;
|
||||
const transformText = (t: string): string => (config.transformOutboundText ? config.transformOutboundText(t) : t);
|
||||
let chat: Chat;
|
||||
let state: SqliteStateAdapter;
|
||||
let setupConfig: ChannelSetup;
|
||||
// Keyed by platformId. Multiple agents may be wired to the same
|
||||
// conversation — this holds all their configs so the bridge can apply the
|
||||
// most-permissive engage rule at gate time and only subscribe when at
|
||||
// least one wiring requested 'mention-sticky'.
|
||||
//
|
||||
// STALENESS: populated at setup() and updateConversations(). If wirings
|
||||
// change after setup, updateConversations() must be called to refresh
|
||||
// (ACTION-ITEMS item 17).
|
||||
let conversations: Map<string, ConversationConfig[]>;
|
||||
let gatewayAbort: AbortController | null = null;
|
||||
|
||||
function buildConversationMap(configs: ConversationConfig[]): Map<string, ConversationConfig[]> {
|
||||
const map = new Map<string, ConversationConfig[]>();
|
||||
for (const conv of configs) {
|
||||
const existing = map.get(conv.platformId);
|
||||
if (existing) existing.push(conv);
|
||||
else map.set(conv.platformId, [conv]);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
function engageDecision(
|
||||
channelId: string,
|
||||
source: EngageSource,
|
||||
text: string,
|
||||
): { forward: boolean; stickySubscribe: boolean } {
|
||||
return shouldEngage(conversations, channelId, source, text);
|
||||
}
|
||||
|
||||
async function messageToInbound(message: ChatMessage, isMention: boolean): Promise<InboundMessage> {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const serialized = message.toJSON() as Record<string, any>;
|
||||
@@ -277,7 +140,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
|
||||
|
||||
async setup(hostConfig: ChannelSetup) {
|
||||
setupConfig = hostConfig;
|
||||
conversations = buildConversationMap(hostConfig.conversations);
|
||||
|
||||
state = new SqliteStateAdapter();
|
||||
|
||||
@@ -289,83 +151,54 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
|
||||
logger: 'silent',
|
||||
});
|
||||
|
||||
// Subscribed threads — the conversation is already active (via prior
|
||||
// mention-sticky engagement or admin wiring). Gate on engageMode so a
|
||||
// plain 'mention' wiring doesn't keep firing after a one-off mention.
|
||||
// Four SDK dispatch paths — bridge just forwards. All per-wiring
|
||||
// engage / accumulate / drop / subscribe decisions live in the host
|
||||
// router (src/router.ts routeInbound / evaluateEngage). The bridge
|
||||
// only resolves channel ids and sets the platform-confirmed isMention
|
||||
// flag that routeInbound evaluates; the router calls back into
|
||||
// bridge.subscribe(...) when a mention-sticky wiring engages.
|
||||
|
||||
// Subscribed threads — every message in a thread we've previously
|
||||
// engaged. Carry the SDK's `message.isMention` through so mention-mode
|
||||
// wirings still fire on in-thread mentions.
|
||||
chat.onSubscribedMessage(async (thread, message) => {
|
||||
const channelId = adapter.channelIdFromThreadId(thread.id);
|
||||
const text = typeof message.text === 'string' ? message.text : '';
|
||||
const decision = engageDecision(channelId, 'subscribed', text);
|
||||
if (!decision.forward) return;
|
||||
// Subscribed path: the SDK sets message.isMention when the bot was
|
||||
// @-mentioned in an already-subscribed thread (docs at
|
||||
// handling-events.mdx). Forward it verbatim.
|
||||
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, message.isMention === true));
|
||||
});
|
||||
|
||||
// @mention in an unsubscribed thread — always engage; subscribe only
|
||||
// if the wiring is 'mention-sticky'.
|
||||
// @mention in an unsubscribed thread — SDK-confirmed bot mention.
|
||||
chat.onNewMention(async (thread, message) => {
|
||||
const channelId = adapter.channelIdFromThreadId(thread.id);
|
||||
const text = typeof message.text === 'string' ? message.text : '';
|
||||
const decision = engageDecision(channelId, 'mention', text);
|
||||
if (!decision.forward) return;
|
||||
// onNewMention only fires when the SDK confirms the bot was mentioned.
|
||||
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true));
|
||||
if (decision.stickySubscribe) {
|
||||
await thread.subscribe();
|
||||
}
|
||||
});
|
||||
|
||||
// DMs — apply engage rules too, but DMs typically default to pattern='.'
|
||||
// at setup time so this is a pass-through in practice. sticky subscribe
|
||||
// follows the same rule as a group mention.
|
||||
//
|
||||
// Thread id is passed through so sub-thread context reaches delivery
|
||||
// (Slack users can open threads inside a DM). The router collapses DM
|
||||
// sub-threads to one session (is_group=0 short-circuits the per-thread
|
||||
// escalation).
|
||||
// DMs — by definition addressed to the bot. Thread id flows through
|
||||
// so sub-thread context reaches delivery (Slack users can open threads
|
||||
// inside a DM). Router collapses DM sub-threads to one session via
|
||||
// is_group=0 short-circuit.
|
||||
chat.onDirectMessage(async (thread, message) => {
|
||||
const channelId = adapter.channelIdFromThreadId(thread.id);
|
||||
const text = typeof message.text === 'string' ? message.text : '';
|
||||
const decision = engageDecision(channelId, 'dm', text);
|
||||
log.info('Inbound DM received', {
|
||||
adapter: adapter.name,
|
||||
channelId,
|
||||
sender: (message.author as any)?.fullName ?? (message.author as any)?.userId ?? 'unknown',
|
||||
threadId: thread.id,
|
||||
forward: decision.forward,
|
||||
});
|
||||
if (!decision.forward) return;
|
||||
// A DM is by definition addressed to the bot — treat as a mention
|
||||
// for routing purposes. `mention` / `mention-sticky` wirings fire.
|
||||
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true));
|
||||
if (decision.stickySubscribe) {
|
||||
await thread.subscribe();
|
||||
}
|
||||
});
|
||||
|
||||
// Plain (non-mention, non-DM) messages in unsubscribed threads.
|
||||
// Plain messages in unsubscribed threads.
|
||||
//
|
||||
// Chat SDK dispatch (handling-events.mdx §"Handler dispatch order"):
|
||||
// subscribed threads → onSubscribedMessage; unsubscribed + mention →
|
||||
// onNewMention; unsubscribed + pattern match → onNewMessage. Dispatch
|
||||
// is exclusive — at most one handler fires per message.
|
||||
//
|
||||
// Without this handler, `engage_mode='pattern'` is silently dropped in
|
||||
// unsubscribed group threads because the SDK never surfaces the
|
||||
// message anywhere else. Registering with `/./` lets every wired
|
||||
// conversation's regex be evaluated in our `shouldEngage` — unknown
|
||||
// conversations are dropped there (see the source='new-message'
|
||||
// branch) so this doesn't flood the host on channels the bot isn't
|
||||
// wired to.
|
||||
// Chat SDK dispatch (handling-events.mdx §"Handler dispatch order") is
|
||||
// exclusive: subscribed → onSubscribedMessage; unsubscribed+mention →
|
||||
// onNewMention; unsubscribed+pattern-match → onNewMessage. Registering
|
||||
// with `/./` lets the router see every plain message on every
|
||||
// unsubscribed thread the bot can see. The router short-circuits via
|
||||
// getMessagingGroupWithAgentCount (~1 DB read) for unwired channels,
|
||||
// so forwarding every one is cheap enough to not need a bridge-side
|
||||
// flood gate.
|
||||
chat.onNewMessage(/./, async (thread, message) => {
|
||||
const channelId = adapter.channelIdFromThreadId(thread.id);
|
||||
const text = typeof message.text === 'string' ? message.text : '';
|
||||
const decision = engageDecision(channelId, 'new-message', text);
|
||||
if (!decision.forward) return;
|
||||
// SDK dispatch guarantees this is a non-mention non-DM message in an
|
||||
// unsubscribed thread — isMention is definitively false here.
|
||||
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, false));
|
||||
});
|
||||
|
||||
@@ -538,8 +371,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
|
||||
return true;
|
||||
},
|
||||
|
||||
updateConversations(configs: ConversationConfig[]) {
|
||||
conversations = buildConversationMap(configs);
|
||||
async subscribe(_platformId: string, threadId: string) {
|
||||
// Chat SDK's subscription state lives on the StateAdapter (not on the
|
||||
// Chat instance itself). SqliteStateAdapter.subscribe is idempotent —
|
||||
// a second call on an already-subscribed thread is a no-op. threadId
|
||||
// is the SDK's thread id, which is what the router already has from
|
||||
// the original inbound event.
|
||||
await state.subscribe(threadId);
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -37,6 +37,37 @@ export function getMessagingGroupByPlatform(channelType: string, platformId: str
|
||||
.get(channelType, platformId) as MessagingGroup | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Combined lookup for the router's fast-drop path. Returns the messaging
|
||||
* group (if it exists) and a count of wired agents in one query — lets
|
||||
* `routeInbound` short-circuit messages for unwired / unknown channels
|
||||
* with a single DB read instead of four (mg lookup, sender upsert, agents
|
||||
* lookup, dropped_messages insert).
|
||||
*
|
||||
* Returns `null` when no messaging_groups row exists for this channel.
|
||||
* Returns `{ mg, agentCount: 0 }` when the row exists but has no wired
|
||||
* agents. Uses the `UNIQUE(channel_type, platform_id)` index plus the
|
||||
* `UNIQUE(messaging_group_id, agent_group_id)` index for the JOIN — both
|
||||
* covered by existing SQLite auto-indexes from the UNIQUE constraints.
|
||||
*/
|
||||
export function getMessagingGroupWithAgentCount(
|
||||
channelType: string,
|
||||
platformId: string,
|
||||
): { mg: MessagingGroup; agentCount: number } | null {
|
||||
const row = getDb()
|
||||
.prepare(
|
||||
`SELECT mg.*, COUNT(mga.id) AS agent_count
|
||||
FROM messaging_groups mg
|
||||
LEFT JOIN messaging_group_agents mga ON mga.messaging_group_id = mg.id
|
||||
WHERE mg.channel_type = ? AND mg.platform_id = ?
|
||||
GROUP BY mg.id`,
|
||||
)
|
||||
.get(channelType, platformId) as (MessagingGroup & { agent_count: number }) | undefined;
|
||||
if (!row) return null;
|
||||
const { agent_count, ...mg } = row;
|
||||
return { mg: mg as MessagingGroup, agentCount: agent_count };
|
||||
}
|
||||
|
||||
export function getAllMessagingGroups(): MessagingGroup[] {
|
||||
return getDb().prepare('SELECT * FROM messaging_groups ORDER BY name').all() as MessagingGroup[];
|
||||
}
|
||||
@@ -69,6 +100,20 @@ export function deleteMessagingGroup(id: string): void {
|
||||
getDb().prepare('DELETE FROM messaging_groups WHERE id = ?').run(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a messaging group as denied by the owner (channel-registration flow).
|
||||
* Future mentions on this channel silently drop until an admin explicitly
|
||||
* wires it via `createMessagingGroupAgent`, which implicitly clears the
|
||||
* denied state by making `agentCount > 0` — the router's denied-channel
|
||||
* check sits on the `agentCount === 0` branch.
|
||||
*
|
||||
* Passing null unsets the flag (used by tests or a future "unblock channel"
|
||||
* admin command).
|
||||
*/
|
||||
export function setMessagingGroupDeniedAt(id: string, deniedAt: string | null): void {
|
||||
getDb().prepare('UPDATE messaging_groups SET denied_at = ? WHERE id = ?').run(deniedAt, id);
|
||||
}
|
||||
|
||||
// ── Messaging Group Agents ──
|
||||
|
||||
/**
|
||||
|
||||
48
src/db/migrations/012-channel-registration.ts
Normal file
48
src/db/migrations/012-channel-registration.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Unknown-channel registration flow.
|
||||
*
|
||||
* When a channel that isn't wired to any agent group receives a mention or
|
||||
* DM, the router escalates to the owner for approval before wiring. Approve
|
||||
* creates a `messaging_group_agents` row (with conservative defaults) and
|
||||
* replays the triggering event. Deny marks the channel denied forever
|
||||
* (stored as a timestamp on `messaging_groups.denied_at`) so future
|
||||
* messages on that channel drop silently without re-prompting.
|
||||
*
|
||||
* Two changes:
|
||||
* 1. `messaging_groups.denied_at TEXT NULL` — set on deny, checked in the
|
||||
* router before re-escalating. ALTER TABLE ADD COLUMN is FK-safe
|
||||
* unlike the table rebuild that bit us in migration 011.
|
||||
* 2. `pending_channel_approvals` table. PRIMARY KEY on
|
||||
* `messaging_group_id` gives free in-flight dedup — a second mention
|
||||
* while the card is pending is silently dropped by INSERT OR IGNORE,
|
||||
* preventing card spam.
|
||||
*/
|
||||
import type Database from 'better-sqlite3';
|
||||
import type { Migration } from './index.js';
|
||||
|
||||
export const migration012: Migration = {
|
||||
version: 12,
|
||||
name: 'channel-registration',
|
||||
up: (db: Database.Database) => {
|
||||
// 1. Add denied_at to messaging_groups. Idempotent guard in case the
|
||||
// column was added by some other path before this migration ran.
|
||||
const cols = db.prepare("PRAGMA table_info('messaging_groups')").all() as Array<{ name: string }>;
|
||||
if (!cols.some((c) => c.name === 'denied_at')) {
|
||||
db.exec(`ALTER TABLE messaging_groups ADD COLUMN denied_at TEXT`);
|
||||
}
|
||||
|
||||
// 2. pending_channel_approvals.
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS pending_channel_approvals (
|
||||
messaging_group_id TEXT PRIMARY KEY REFERENCES messaging_groups(id),
|
||||
agent_group_id TEXT NOT NULL REFERENCES agent_groups(id),
|
||||
-- The agent the approved wiring will target.
|
||||
-- Picked at request time (currently: earliest
|
||||
-- agent_group by created_at).
|
||||
original_message TEXT NOT NULL, -- JSON serialized InboundEvent
|
||||
approver_user_id TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
},
|
||||
};
|
||||
@@ -8,6 +8,7 @@ import { migration008 } from './008-dropped-messages.js';
|
||||
import { migration009 } from './009-drop-pending-credentials.js';
|
||||
import { migration010 } from './010-engage-modes.js';
|
||||
import { migration011 } from './011-pending-sender-approvals.js';
|
||||
import { migration012 } from './012-channel-registration.js';
|
||||
import { moduleApprovalsPendingApprovals } from './module-approvals-pending-approvals.js';
|
||||
import { moduleApprovalsTitleOptions } from './module-approvals-title-options.js';
|
||||
|
||||
@@ -27,6 +28,7 @@ const migrations: Migration[] = [
|
||||
migration009,
|
||||
migration010,
|
||||
migration011,
|
||||
migration012,
|
||||
];
|
||||
|
||||
export function runMigrations(db: Database.Database): void {
|
||||
|
||||
@@ -244,26 +244,42 @@ describe('router', () => {
|
||||
expect(wakeContainer).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should auto-create messaging group for unknown platform', async () => {
|
||||
it('auto-creates messaging group only when the bot is addressed (mention/DM)', async () => {
|
||||
// The router's no-mg branch is escalation-gated: plain chatter on an
|
||||
// unknown channel stays silent (no DB writes) so a bot that sits in
|
||||
// many unwired channels doesn't bloat messaging_groups. Only explicit
|
||||
// mentions and DMs trigger auto-create.
|
||||
const { routeInbound } = await import('./router.js');
|
||||
const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js');
|
||||
|
||||
const event: InboundEvent = {
|
||||
// Plain message on unknown channel — should NOT auto-create.
|
||||
await routeInbound({
|
||||
channelType: 'slack',
|
||||
platformId: 'C-NEW-CHANNEL',
|
||||
platformId: 'C-PLAIN',
|
||||
threadId: null,
|
||||
message: {
|
||||
id: 'msg-2',
|
||||
id: 'msg-plain',
|
||||
kind: 'chat',
|
||||
content: JSON.stringify({ sender: 'User', text: 'Hi' }),
|
||||
timestamp: now(),
|
||||
},
|
||||
};
|
||||
});
|
||||
expect(getMessagingGroupByPlatform('slack', 'C-PLAIN')).toBeUndefined();
|
||||
|
||||
await routeInbound(event);
|
||||
|
||||
const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js');
|
||||
const mg = getMessagingGroupByPlatform('slack', 'C-NEW-CHANNEL');
|
||||
expect(mg).toBeDefined();
|
||||
// Mention on unknown channel — SHOULD auto-create (next step: channel-registration flow).
|
||||
await routeInbound({
|
||||
channelType: 'slack',
|
||||
platformId: 'C-MENTIONED',
|
||||
threadId: null,
|
||||
message: {
|
||||
id: 'msg-mentioned',
|
||||
kind: 'chat',
|
||||
content: JSON.stringify({ sender: 'User', text: '@bot hi' }),
|
||||
timestamp: now(),
|
||||
isMention: true,
|
||||
},
|
||||
});
|
||||
expect(getMessagingGroupByPlatform('slack', 'C-MENTIONED')).toBeDefined();
|
||||
});
|
||||
|
||||
it('should route multiple messages to the same session', async () => {
|
||||
|
||||
27
src/index.ts
27
src/index.ts
@@ -9,7 +9,6 @@ import path from 'path';
|
||||
import { DATA_DIR } from './config.js';
|
||||
import { initDb } from './db/connection.js';
|
||||
import { runMigrations } from './db/migrations/index.js';
|
||||
import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messaging-groups.js';
|
||||
import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js';
|
||||
import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js';
|
||||
import { startHostSweep, stopHostSweep } from './host-sweep.js';
|
||||
@@ -52,7 +51,7 @@ import './channels/index.js';
|
||||
// append registry-based modules. Imported for side effects (registrations).
|
||||
import './modules/index.js';
|
||||
|
||||
import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js';
|
||||
import type { ChannelAdapter, ChannelSetup } from './channels/adapter.js';
|
||||
import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js';
|
||||
|
||||
async function main(): Promise<void> {
|
||||
@@ -70,9 +69,7 @@ async function main(): Promise<void> {
|
||||
|
||||
// 3. Channel adapters
|
||||
await initChannelAdapters((adapter: ChannelAdapter): ChannelSetup => {
|
||||
const conversations = buildConversationConfigs(adapter.channelType);
|
||||
return {
|
||||
conversations,
|
||||
onInbound(platformId, threadId, message) {
|
||||
routeInbound({
|
||||
channelType: adapter.channelType,
|
||||
@@ -151,28 +148,6 @@ async function main(): Promise<void> {
|
||||
log.info('NanoClaw running');
|
||||
}
|
||||
|
||||
/** Build ConversationConfig[] for a channel type from the central DB. */
|
||||
function buildConversationConfigs(channelType: string): ConversationConfig[] {
|
||||
const groups = getMessagingGroupsByChannel(channelType);
|
||||
const configs: ConversationConfig[] = [];
|
||||
|
||||
for (const mg of groups) {
|
||||
const agents = getMessagingGroupAgents(mg.id);
|
||||
for (const agent of agents) {
|
||||
configs.push({
|
||||
platformId: mg.platform_id,
|
||||
agentGroupId: agent.agent_group_id,
|
||||
engageMode: agent.engage_mode,
|
||||
engagePattern: agent.engage_pattern,
|
||||
ignoredMessagePolicy: agent.ignored_message_policy,
|
||||
sessionMode: agent.session_mode,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return configs;
|
||||
}
|
||||
|
||||
/** Graceful shutdown. */
|
||||
async function shutdown(signal: string): Promise<void> {
|
||||
log.info('Shutdown signal received', { signal });
|
||||
|
||||
392
src/modules/permissions/channel-approval.test.ts
Normal file
392
src/modules/permissions/channel-approval.test.ts
Normal file
@@ -0,0 +1,392 @@
|
||||
/**
|
||||
* Integration tests for the unknown-channel registration flow (ACTION-ITEMS
|
||||
* item 22).
|
||||
*
|
||||
* Covers:
|
||||
* - Mention on an unwired channel fires an owner-approval card
|
||||
* - DM on an unwired channel fires a card (engage_mode will default to pattern='.')
|
||||
* - In-flight dedup: second mention while a card is pending doesn't spam
|
||||
* - Approve: wiring created with correct defaults, triggering sender added
|
||||
* as member, replay wakes the container
|
||||
* - Deny: messaging_groups.denied_at set, future mentions drop silently
|
||||
* - Unauthorized clicker is rejected (same pattern as sender-approval)
|
||||
* - No-owner install: no card, no row
|
||||
* - No agent groups configured: no card, no row
|
||||
*/
|
||||
import fs from 'fs';
|
||||
import { beforeEach, afterEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { initTestDb, closeDb, runMigrations } from '../../db/index.js';
|
||||
import { createAgentGroup } from '../../db/agent-groups.js';
|
||||
import { createMessagingGroup, getMessagingGroupByPlatform } from '../../db/messaging-groups.js';
|
||||
import { upsertUser } from './db/users.js';
|
||||
import { grantRole } from './db/user-roles.js';
|
||||
|
||||
// Mock container runner — prevent actual docker spawn.
|
||||
vi.mock('../../container-runner.js', () => ({
|
||||
wakeContainer: vi.fn().mockResolvedValue(undefined),
|
||||
isContainerRunning: vi.fn().mockReturnValue(false),
|
||||
getActiveContainerCount: vi.fn().mockReturnValue(0),
|
||||
killContainer: vi.fn(),
|
||||
}));
|
||||
|
||||
// Mock delivery adapter.
|
||||
const deliverMock = vi.fn().mockResolvedValue('plat-msg-id');
|
||||
vi.mock('../../delivery.js', () => ({
|
||||
getDeliveryAdapter: () => ({ deliver: deliverMock }),
|
||||
}));
|
||||
|
||||
// Mock ensureUserDm — look up the owner's preconfigured DM row instead of
|
||||
// hitting a real openDM RPC.
|
||||
vi.mock('./user-dm.js', () => ({
|
||||
ensureUserDm: vi.fn(async (userId: string) => {
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const row = getDb()
|
||||
.prepare(
|
||||
`SELECT mg.* FROM messaging_groups mg
|
||||
JOIN user_dms ud ON ud.messaging_group_id = mg.id
|
||||
WHERE ud.user_id = ?`,
|
||||
)
|
||||
.get(userId);
|
||||
return row;
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock('../../config.js', async () => {
|
||||
const actual = await vi.importActual('../../config.js');
|
||||
return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-channel-approval' };
|
||||
});
|
||||
|
||||
const TEST_DIR = '/tmp/nanoclaw-test-channel-approval';
|
||||
|
||||
function now() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
fs.mkdirSync(TEST_DIR, { recursive: true });
|
||||
const db = initTestDb();
|
||||
runMigrations(db);
|
||||
|
||||
await import('./index.js'); // register hooks
|
||||
|
||||
// Base fixtures: one agent group + owner with a DM on 'telegram'.
|
||||
createAgentGroup({ id: 'ag-1', name: 'Andy', folder: 'andy', agent_provider: null, created_at: now() });
|
||||
|
||||
upsertUser({ id: 'telegram:owner', kind: 'telegram', display_name: 'Owner', created_at: now() });
|
||||
grantRole({
|
||||
user_id: 'telegram:owner',
|
||||
role: 'owner',
|
||||
agent_group_id: null,
|
||||
granted_by: null,
|
||||
granted_at: now(),
|
||||
});
|
||||
|
||||
// Pre-seed owner's DM messaging group + user_dms mapping.
|
||||
createMessagingGroup({
|
||||
id: 'mg-dm-owner',
|
||||
channel_type: 'telegram',
|
||||
platform_id: 'dm-owner',
|
||||
name: 'Owner DM',
|
||||
is_group: 0,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
getDb()
|
||||
.prepare(
|
||||
`INSERT INTO user_dms (user_id, channel_type, messaging_group_id, resolved_at)
|
||||
VALUES (?, ?, ?, ?)`,
|
||||
)
|
||||
.run('telegram:owner', 'telegram', 'mg-dm-owner', now());
|
||||
|
||||
deliverMock.mockClear();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
closeDb();
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
});
|
||||
|
||||
function groupMention(platformId: string, text = '@bot hello') {
|
||||
return {
|
||||
channelType: 'telegram',
|
||||
platformId,
|
||||
threadId: 'thread-1', // non-null → is_group=true per channel-approval default-picker logic
|
||||
message: {
|
||||
id: `msg-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat' as const,
|
||||
content: JSON.stringify({ senderId: 'caller', senderName: 'Caller', text }),
|
||||
timestamp: now(),
|
||||
isMention: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function dmEvent(platformId: string, text = 'hello') {
|
||||
return {
|
||||
channelType: 'telegram',
|
||||
platformId,
|
||||
threadId: null,
|
||||
message: {
|
||||
id: `msg-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat' as const,
|
||||
content: JSON.stringify({ senderId: 'stranger', senderName: 'Stranger', text }),
|
||||
timestamp: now(),
|
||||
isMention: true, // DM bridge sets isMention=true
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe('unknown-channel registration flow', () => {
|
||||
it('delivers an approval card on mention into an unwired group', async () => {
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
await routeInbound(groupMention('chat-new'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
expect(deliverMock).toHaveBeenCalledTimes(1);
|
||||
const [channel, platformId, thread, kind, content] = deliverMock.mock.calls[0];
|
||||
expect(channel).toBe('telegram');
|
||||
expect(platformId).toBe('dm-owner'); // delivered to owner's DM
|
||||
expect(thread).toBeNull();
|
||||
expect(kind).toBe('chat-sdk');
|
||||
const payload = JSON.parse(content as string);
|
||||
expect(payload.type).toBe('ask_question');
|
||||
// Card names the target agent so the owner knows what they're wiring to.
|
||||
expect(payload.question).toContain('Andy');
|
||||
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const rows = getDb().prepare('SELECT * FROM pending_channel_approvals').all() as Array<{
|
||||
messaging_group_id: string;
|
||||
}>;
|
||||
expect(rows).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('delivers a card on DM too (non-threaded event)', async () => {
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
await routeInbound(dmEvent('dm-new-user'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
expect(deliverMock).toHaveBeenCalledTimes(1);
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c;
|
||||
expect(count).toBe(1);
|
||||
});
|
||||
|
||||
it('dedups a second mention while the card is pending', async () => {
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
await routeInbound(groupMention('chat-busy'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
await routeInbound(groupMention('chat-busy', '@bot still here'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
expect(deliverMock).toHaveBeenCalledTimes(1);
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c;
|
||||
expect(count).toBe(1);
|
||||
});
|
||||
|
||||
it('approve → creates wiring, admits triggering sender, replays', async () => {
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
const { getResponseHandlers } = await import('../../response-registry.js');
|
||||
const { wakeContainer } = await import('../../container-runner.js');
|
||||
(wakeContainer as unknown as ReturnType<typeof vi.fn>).mockClear();
|
||||
|
||||
await routeInbound(groupMention('chat-approve'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const pending = getDb()
|
||||
.prepare('SELECT messaging_group_id FROM pending_channel_approvals')
|
||||
.get() as { messaging_group_id: string };
|
||||
expect(pending).toBeDefined();
|
||||
|
||||
// Owner clicks approve.
|
||||
for (const handler of getResponseHandlers()) {
|
||||
const claimed = await handler({
|
||||
questionId: pending.messaging_group_id,
|
||||
value: 'approve',
|
||||
userId: 'owner', // raw platform id — handler namespaces it
|
||||
channelType: 'telegram',
|
||||
platformId: 'dm-owner',
|
||||
threadId: null,
|
||||
});
|
||||
if (claimed) break;
|
||||
}
|
||||
|
||||
// Wiring created with MVP defaults.
|
||||
const mga = getDb()
|
||||
.prepare('SELECT * FROM messaging_group_agents WHERE messaging_group_id = ?')
|
||||
.get(pending.messaging_group_id) as {
|
||||
engage_mode: string;
|
||||
engage_pattern: string | null;
|
||||
sender_scope: string;
|
||||
ignored_message_policy: string;
|
||||
agent_group_id: string;
|
||||
};
|
||||
expect(mga).toBeDefined();
|
||||
expect(mga.engage_mode).toBe('mention-sticky'); // group (threadId != null)
|
||||
expect(mga.engage_pattern).toBeNull();
|
||||
expect(mga.sender_scope).toBe('known');
|
||||
expect(mga.ignored_message_policy).toBe('accumulate');
|
||||
expect(mga.agent_group_id).toBe('ag-1');
|
||||
|
||||
// Triggering sender auto-admitted so sender_scope='known' doesn't
|
||||
// bounce the replay into sender-approval.
|
||||
const member = getDb()
|
||||
.prepare('SELECT 1 AS x FROM agent_group_members WHERE user_id = ? AND agent_group_id = ?')
|
||||
.get('telegram:caller', 'ag-1');
|
||||
expect(member).toBeDefined();
|
||||
|
||||
// Pending row cleared and container woken via replay.
|
||||
const stillPending = (
|
||||
getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }
|
||||
).c;
|
||||
expect(stillPending).toBe(0);
|
||||
expect(wakeContainer).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('approve on a DM wires with pattern="." defaults', async () => {
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
const { getResponseHandlers } = await import('../../response-registry.js');
|
||||
|
||||
await routeInbound(dmEvent('dm-approve-user'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const pending = getDb()
|
||||
.prepare('SELECT messaging_group_id FROM pending_channel_approvals')
|
||||
.get() as { messaging_group_id: string };
|
||||
|
||||
for (const handler of getResponseHandlers()) {
|
||||
const claimed = await handler({
|
||||
questionId: pending.messaging_group_id,
|
||||
value: 'approve',
|
||||
userId: 'owner',
|
||||
channelType: 'telegram',
|
||||
platformId: 'dm-owner',
|
||||
threadId: null,
|
||||
});
|
||||
if (claimed) break;
|
||||
}
|
||||
|
||||
const mga = getDb()
|
||||
.prepare('SELECT engage_mode, engage_pattern FROM messaging_group_agents WHERE messaging_group_id = ?')
|
||||
.get(pending.messaging_group_id) as { engage_mode: string; engage_pattern: string };
|
||||
expect(mga.engage_mode).toBe('pattern');
|
||||
expect(mga.engage_pattern).toBe('.');
|
||||
});
|
||||
|
||||
it('deny → sets denied_at; future mentions drop silently without a second card', async () => {
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
const { getResponseHandlers } = await import('../../response-registry.js');
|
||||
|
||||
await routeInbound(groupMention('chat-deny'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const pending = getDb()
|
||||
.prepare('SELECT messaging_group_id FROM pending_channel_approvals')
|
||||
.get() as { messaging_group_id: string };
|
||||
|
||||
for (const handler of getResponseHandlers()) {
|
||||
const claimed = await handler({
|
||||
questionId: pending.messaging_group_id,
|
||||
value: 'reject',
|
||||
userId: 'owner',
|
||||
channelType: 'telegram',
|
||||
platformId: 'dm-owner',
|
||||
threadId: null,
|
||||
});
|
||||
if (claimed) break;
|
||||
}
|
||||
|
||||
// denied_at set, pending row cleared, no wiring.
|
||||
const mg = getMessagingGroupByPlatform('telegram', 'chat-deny');
|
||||
expect(mg?.denied_at).not.toBeNull();
|
||||
expect(mg?.denied_at).toBeTruthy();
|
||||
const mgaCount = (
|
||||
getDb()
|
||||
.prepare('SELECT COUNT(*) AS c FROM messaging_group_agents WHERE messaging_group_id = ?')
|
||||
.get(pending.messaging_group_id) as { c: number }
|
||||
).c;
|
||||
expect(mgaCount).toBe(0);
|
||||
|
||||
// A follow-up mention on the denied channel: no new card, no new pending row.
|
||||
deliverMock.mockClear();
|
||||
await routeInbound(groupMention('chat-deny', '@bot please'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
expect(deliverMock).not.toHaveBeenCalled();
|
||||
const stillPending = (
|
||||
getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }
|
||||
).c;
|
||||
expect(stillPending).toBe(0);
|
||||
});
|
||||
|
||||
it('rejects clicks from an unauthorized user (prevents self-admit via forwarded card)', async () => {
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
const { getResponseHandlers } = await import('../../response-registry.js');
|
||||
|
||||
await routeInbound(groupMention('chat-unauth'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
const pending = getDb()
|
||||
.prepare('SELECT messaging_group_id FROM pending_channel_approvals')
|
||||
.get() as { messaging_group_id: string };
|
||||
|
||||
for (const handler of getResponseHandlers()) {
|
||||
const claimed = await handler({
|
||||
questionId: pending.messaging_group_id,
|
||||
value: 'approve',
|
||||
userId: 'random-bystander',
|
||||
channelType: 'telegram',
|
||||
platformId: 'dm-random',
|
||||
threadId: null,
|
||||
});
|
||||
if (claimed) break;
|
||||
}
|
||||
|
||||
// No wiring created, pending row preserved so a real approver can act on it.
|
||||
const mgaCount = (
|
||||
getDb()
|
||||
.prepare('SELECT COUNT(*) AS c FROM messaging_group_agents WHERE messaging_group_id = ?')
|
||||
.get(pending.messaging_group_id) as { c: number }
|
||||
).c;
|
||||
expect(mgaCount).toBe(0);
|
||||
const stillPending = (
|
||||
getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }
|
||||
).c;
|
||||
expect(stillPending).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('no-owner / no-agent failure modes', () => {
|
||||
it('no owner → no card, no pending row (fresh-install bootstrap path)', async () => {
|
||||
// Wipe the owner grant set up in the outer beforeEach.
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
getDb().prepare('DELETE FROM user_roles').run();
|
||||
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
await routeInbound(groupMention('chat-noowner'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
expect(deliverMock).not.toHaveBeenCalled();
|
||||
const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c;
|
||||
expect(count).toBe(0);
|
||||
});
|
||||
|
||||
it('no agent groups → no card, no pending row', async () => {
|
||||
const { getDb } = await import('../../db/connection.js');
|
||||
// Drop foreign-key-dependent rows first, then the agent group itself.
|
||||
getDb().prepare('DELETE FROM user_roles').run();
|
||||
getDb().prepare('DELETE FROM agent_groups').run();
|
||||
|
||||
const { routeInbound } = await import('../../router.js');
|
||||
await routeInbound(groupMention('chat-noagent'));
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
expect(deliverMock).not.toHaveBeenCalled();
|
||||
const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c;
|
||||
expect(count).toBe(0);
|
||||
});
|
||||
});
|
||||
159
src/modules/permissions/channel-approval.ts
Normal file
159
src/modules/permissions/channel-approval.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
/**
|
||||
* Unknown-channel registration flow.
|
||||
*
|
||||
* When the router hits an unwired messaging group AND the message was
|
||||
* addressed to the bot (SDK-confirmed mention or DM), it calls
|
||||
* `requestChannelApproval` instead of silently dropping. The flow:
|
||||
*
|
||||
* 1. Pick the target agent group we'd wire to (MVP: first by name).
|
||||
* Multi-agent picker is a follow-up — see ACTION-ITEMS.
|
||||
* 2. Pick an eligible approver (owner / admin) and a reachable DM for
|
||||
* them, reusing the same primitives the sender-approval flow uses.
|
||||
* 3. Deliver an Approve / Ignore card that names the target agent
|
||||
* explicitly so the owner knows what they're wiring to.
|
||||
* 4. Record a `pending_channel_approvals` row holding the original event
|
||||
* so it can be re-routed on approve.
|
||||
*
|
||||
* On approve (handler in index.ts):
|
||||
* - Create `messaging_group_agents` with MVP defaults
|
||||
* (mention-sticky for groups / pattern='.' for DMs,
|
||||
* sender_scope='known', ignored_message_policy='accumulate')
|
||||
* - Add the triggering sender to `agent_group_members` so sender_scope
|
||||
* doesn't bounce the replayed message into a sender-approval cascade
|
||||
* - Delete the pending row, replay the original event
|
||||
*
|
||||
* On ignore:
|
||||
* - Set `messaging_groups.denied_at = now()` so the router stops
|
||||
* escalating on this channel until an admin explicitly re-wires
|
||||
* - Delete the pending row
|
||||
*
|
||||
* Dedup: `pending_channel_approvals` PK on messaging_group_id. Second
|
||||
* mention while pending silently dropped.
|
||||
*
|
||||
* Failure modes (log + no row, so a future attempt can try again):
|
||||
* - No agent groups exist (install never set up a first agent).
|
||||
* - No eligible approver in user_roles (no owner yet).
|
||||
* - Approver has no reachable DM.
|
||||
* - Delivery adapter missing.
|
||||
*/
|
||||
import { normalizeOptions, type RawOption } from '../../channels/ask-question.js';
|
||||
import { getAllAgentGroups } from '../../db/agent-groups.js';
|
||||
import { getMessagingGroup } from '../../db/messaging-groups.js';
|
||||
import { getDeliveryAdapter } from '../../delivery.js';
|
||||
import { log } from '../../log.js';
|
||||
import type { InboundEvent } from '../../router.js';
|
||||
import { pickApprovalDelivery, pickApprover } from '../approvals/primitive.js';
|
||||
import { createPendingChannelApproval, hasInFlightChannelApproval } from './db/pending-channel-approvals.js';
|
||||
|
||||
const APPROVAL_OPTIONS: RawOption[] = [
|
||||
{ label: 'Approve', selectedLabel: '✅ Wired', value: 'approve' },
|
||||
{ label: 'Ignore', selectedLabel: '🙅 Ignored', value: 'reject' },
|
||||
];
|
||||
|
||||
export interface RequestChannelApprovalInput {
|
||||
messagingGroupId: string;
|
||||
event: InboundEvent;
|
||||
}
|
||||
|
||||
export async function requestChannelApproval(input: RequestChannelApprovalInput): Promise<void> {
|
||||
const { messagingGroupId, event } = input;
|
||||
|
||||
// In-flight dedup: don't spam the owner if the same unwired channel
|
||||
// gets more mentions / DMs while a card is already pending.
|
||||
if (hasInFlightChannelApproval(messagingGroupId)) {
|
||||
log.debug('Channel registration already in flight — dropping retry', {
|
||||
messagingGroupId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// MVP: pick the first agent group by name. Multi-agent systems will get
|
||||
// a richer card later (user picks the target from a list).
|
||||
const agentGroups = getAllAgentGroups();
|
||||
if (agentGroups.length === 0) {
|
||||
log.warn('Channel registration skipped — no agent groups configured. Run /init-first-agent.', {
|
||||
messagingGroupId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const target = agentGroups[0];
|
||||
|
||||
// pickApprover takes the target agent group's id — gets scoped admins +
|
||||
// global admins + owners. For fresh installs with only an owner, the
|
||||
// owner is returned.
|
||||
const approvers = pickApprover(target.id);
|
||||
if (approvers.length === 0) {
|
||||
log.warn('Channel registration skipped — no owner or admin configured', {
|
||||
messagingGroupId,
|
||||
targetAgentGroupId: target.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const originMg = getMessagingGroup(messagingGroupId);
|
||||
const originChannelType = originMg?.channel_type ?? '';
|
||||
const delivery = await pickApprovalDelivery(approvers, originChannelType);
|
||||
if (!delivery) {
|
||||
log.warn('Channel registration skipped — no DM channel for any approver', {
|
||||
messagingGroupId,
|
||||
targetAgentGroupId: target.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const originName = originMg?.name ?? originMg?.platform_id ?? 'an unfamiliar chat';
|
||||
const isGroup = originMg?.is_group === 1;
|
||||
|
||||
const title = isGroup ? '📣 Bot mentioned in new chat' : '💬 New direct message';
|
||||
const question = isGroup
|
||||
? `Your agent was mentioned in ${originName} on ${originChannelType}. Wire it to ${target.name} and let it engage?`
|
||||
: `Someone DM'd your agent on ${originChannelType} (${originName}). Wire it to ${target.name} and let it respond?`;
|
||||
|
||||
createPendingChannelApproval({
|
||||
messaging_group_id: messagingGroupId,
|
||||
agent_group_id: target.id,
|
||||
original_message: JSON.stringify(event),
|
||||
approver_user_id: delivery.userId,
|
||||
created_at: new Date().toISOString(),
|
||||
});
|
||||
|
||||
const adapter = getDeliveryAdapter();
|
||||
if (!adapter) {
|
||||
log.error('Channel registration row created but no delivery adapter is wired', {
|
||||
messagingGroupId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await adapter.deliver(
|
||||
delivery.messagingGroup.channel_type,
|
||||
delivery.messagingGroup.platform_id,
|
||||
null,
|
||||
'chat-sdk',
|
||||
JSON.stringify({
|
||||
type: 'ask_question',
|
||||
// Use messaging_group_id as the questionId — it's unique per card
|
||||
// (PK on pending table dedups) and lets the response handler look
|
||||
// up the pending row directly without another index.
|
||||
questionId: messagingGroupId,
|
||||
title,
|
||||
question,
|
||||
options: normalizeOptions(APPROVAL_OPTIONS),
|
||||
}),
|
||||
);
|
||||
log.info('Channel registration card delivered', {
|
||||
messagingGroupId,
|
||||
targetAgentGroupId: target.id,
|
||||
approver: delivery.userId,
|
||||
});
|
||||
} catch (err) {
|
||||
log.error('Channel registration card delivery failed', {
|
||||
messagingGroupId,
|
||||
err,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export const APPROVE_VALUE = 'approve';
|
||||
export const REJECT_VALUE = 'reject';
|
||||
52
src/modules/permissions/db/pending-channel-approvals.ts
Normal file
52
src/modules/permissions/db/pending-channel-approvals.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
/**
|
||||
* CRUD for pending_channel_approvals — the in-flight state for the
|
||||
* unknown-channel registration flow. A row exists while an owner-approval
|
||||
* card is outstanding; it's deleted on approve (after wiring is created)
|
||||
* or deny (after denied_at is set on the messaging_group).
|
||||
*
|
||||
* PRIMARY KEY on messaging_group_id gives free in-flight dedup. A second
|
||||
* mention/DM while a card is pending resolves via
|
||||
* `hasInFlightChannelApproval` in the request flow and drops silently
|
||||
* instead of spamming the owner.
|
||||
*/
|
||||
import { getDb } from '../../../db/connection.js';
|
||||
|
||||
export interface PendingChannelApproval {
|
||||
messaging_group_id: string;
|
||||
agent_group_id: string;
|
||||
original_message: string;
|
||||
approver_user_id: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export function createPendingChannelApproval(row: PendingChannelApproval): void {
|
||||
getDb()
|
||||
.prepare(
|
||||
`INSERT INTO pending_channel_approvals (
|
||||
messaging_group_id, agent_group_id, original_message,
|
||||
approver_user_id, created_at
|
||||
)
|
||||
VALUES (
|
||||
@messaging_group_id, @agent_group_id, @original_message,
|
||||
@approver_user_id, @created_at
|
||||
)`,
|
||||
)
|
||||
.run(row);
|
||||
}
|
||||
|
||||
export function getPendingChannelApproval(messagingGroupId: string): PendingChannelApproval | undefined {
|
||||
return getDb()
|
||||
.prepare('SELECT * FROM pending_channel_approvals WHERE messaging_group_id = ?')
|
||||
.get(messagingGroupId) as PendingChannelApproval | undefined;
|
||||
}
|
||||
|
||||
export function hasInFlightChannelApproval(messagingGroupId: string): boolean {
|
||||
const row = getDb()
|
||||
.prepare('SELECT 1 AS x FROM pending_channel_approvals WHERE messaging_group_id = ?')
|
||||
.get(messagingGroupId) as { x: number } | undefined;
|
||||
return row !== undefined;
|
||||
}
|
||||
|
||||
export function deletePendingChannelApproval(messagingGroupId: string): void {
|
||||
getDb().prepare('DELETE FROM pending_channel_approvals WHERE messaging_group_id = ?').run(messagingGroupId);
|
||||
}
|
||||
@@ -16,9 +16,14 @@
|
||||
* access gate is not registered and core defaults to allow-all.
|
||||
*/
|
||||
import { recordDroppedMessage } from '../../db/dropped-messages.js';
|
||||
import {
|
||||
createMessagingGroupAgent,
|
||||
setMessagingGroupDeniedAt,
|
||||
} from '../../db/messaging-groups.js';
|
||||
import {
|
||||
routeInbound,
|
||||
setAccessGate,
|
||||
setChannelRequestGate,
|
||||
setSenderResolver,
|
||||
setSenderScopeGate,
|
||||
type AccessGateResult,
|
||||
@@ -28,7 +33,12 @@ import { registerResponseHandler, type ResponsePayload } from '../../response-re
|
||||
import { log } from '../../log.js';
|
||||
import type { MessagingGroup, MessagingGroupAgent } from '../../types.js';
|
||||
import { canAccessAgentGroup } from './access.js';
|
||||
import { requestChannelApproval } from './channel-approval.js';
|
||||
import { addMember } from './db/agent-group-members.js';
|
||||
import {
|
||||
deletePendingChannelApproval,
|
||||
getPendingChannelApproval,
|
||||
} from './db/pending-channel-approvals.js';
|
||||
import { deletePendingSenderApproval, getPendingSenderApproval } from './db/pending-sender-approvals.js';
|
||||
import { hasAdminPrivilege } from './db/user-roles.js';
|
||||
import { getUser, upsertUser } from './db/users.js';
|
||||
@@ -253,3 +263,137 @@ async function handleSenderApprovalResponse(payload: ResponsePayload): Promise<b
|
||||
}
|
||||
|
||||
registerResponseHandler(handleSenderApprovalResponse);
|
||||
|
||||
// ── Unknown-channel registration flow ──
|
||||
|
||||
setChannelRequestGate(async (mg, event) => {
|
||||
await requestChannelApproval({ messagingGroupId: mg.id, event });
|
||||
});
|
||||
|
||||
/**
|
||||
* Response handler for the unknown-channel registration card.
|
||||
*
|
||||
* Claim rule: questionId matches a pending_channel_approvals row (keyed
|
||||
* by messaging_group_id). If no such row, return false so downstream
|
||||
* handlers get a shot.
|
||||
*
|
||||
* Approve: create the wiring with MVP defaults (mention-sticky for
|
||||
* groups / pattern='.' for DMs; sender_scope='known';
|
||||
* ignored_message_policy='accumulate'), add the triggering sender as a
|
||||
* member so sender_scope doesn't immediately bounce them into a
|
||||
* sender-approval card, then replay the original event.
|
||||
*
|
||||
* Deny: set `messaging_groups.denied_at = now()` so future mentions on
|
||||
* this channel drop silently until an admin explicitly wires it.
|
||||
*/
|
||||
async function handleChannelApprovalResponse(payload: ResponsePayload): Promise<boolean> {
|
||||
const row = getPendingChannelApproval(payload.questionId);
|
||||
if (!row) return false;
|
||||
|
||||
// Click-auth: same pattern as sender-approval (see commit 68058cb).
|
||||
// Raw platform userId → namespace with channelType → must match the
|
||||
// designated approver OR have admin privilege over the target agent.
|
||||
const clickerId = payload.userId ? `${payload.channelType}:${payload.userId}` : null;
|
||||
const isAuthorized =
|
||||
clickerId !== null && (clickerId === row.approver_user_id || hasAdminPrivilege(clickerId, row.agent_group_id));
|
||||
if (!isAuthorized) {
|
||||
log.warn('Channel registration click rejected — unauthorized clicker', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
clickerId,
|
||||
expectedApprover: row.approver_user_id,
|
||||
});
|
||||
return true; // claim but take no action
|
||||
}
|
||||
const approverId = clickerId;
|
||||
const approved = payload.value === 'approve';
|
||||
|
||||
if (!approved) {
|
||||
setMessagingGroupDeniedAt(row.messaging_group_id, new Date().toISOString());
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
log.info('Channel registration denied', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
agentGroupId: row.agent_group_id,
|
||||
approverId,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
// Rehydrate the original event to know (a) whether it was a DM or group
|
||||
// (chooses engage_mode default), and (b) who the triggering sender was
|
||||
// (auto-member-add so sender_scope='known' doesn't bounce the replay).
|
||||
let event: InboundEvent;
|
||||
try {
|
||||
event = JSON.parse(row.original_message) as InboundEvent;
|
||||
} catch (err) {
|
||||
log.error('Channel registration: failed to parse stored event', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
err,
|
||||
});
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Decide engage_mode from the original event. DMs (`isMention=true` &
|
||||
// not in a group) get `pattern='.'` (always respond). Group mentions
|
||||
// get `mention-sticky` (respond now + follow the thread).
|
||||
//
|
||||
// We can't read `mg.is_group` reliably here because we only auto-create
|
||||
// the mg with `is_group=0` on first sight — the adapter hasn't told us
|
||||
// yet whether it's actually a group. Fall back to the InboundEvent's
|
||||
// `threadId`: a non-null threadId implies a threaded platform (Slack
|
||||
// channel thread, Discord thread), which we treat as a group.
|
||||
const isGroup = event.threadId !== null;
|
||||
const engageMode: MessagingGroupAgent['engage_mode'] = isGroup ? 'mention-sticky' : 'pattern';
|
||||
const engagePattern = isGroup ? null : '.';
|
||||
|
||||
const mgaId = `mga-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
createMessagingGroupAgent({
|
||||
id: mgaId,
|
||||
messaging_group_id: row.messaging_group_id,
|
||||
agent_group_id: row.agent_group_id,
|
||||
engage_mode: engageMode,
|
||||
engage_pattern: engagePattern,
|
||||
sender_scope: 'known',
|
||||
ignored_message_policy: 'accumulate',
|
||||
session_mode: 'shared',
|
||||
priority: 0,
|
||||
created_at: new Date().toISOString(),
|
||||
});
|
||||
log.info('Channel registration approved — wiring created', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
agentGroupId: row.agent_group_id,
|
||||
mgaId,
|
||||
engageMode,
|
||||
approverId,
|
||||
});
|
||||
|
||||
// Auto-admit the triggering sender. Without this, the replay below
|
||||
// would bounce through sender-approval (sender_scope='known' +
|
||||
// sender-is-not-a-member).
|
||||
const senderUserId = extractAndUpsertUser(event);
|
||||
if (senderUserId) {
|
||||
addMember({
|
||||
user_id: senderUserId,
|
||||
agent_group_id: row.agent_group_id,
|
||||
added_by: approverId,
|
||||
added_at: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
// Clear the pending row BEFORE replay so the gate check on the second
|
||||
// attempt sees a wired channel (agentCount > 0) and takes the fan-out
|
||||
// path normally.
|
||||
deletePendingChannelApproval(row.messaging_group_id);
|
||||
|
||||
try {
|
||||
await routeInbound(event);
|
||||
} catch (err) {
|
||||
log.error('Failed to replay message after channel approval', {
|
||||
messagingGroupId: row.messaging_group_id,
|
||||
err,
|
||||
});
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
registerResponseHandler(handleChannelApprovalResponse);
|
||||
|
||||
144
src/router.ts
144
src/router.ts
@@ -20,7 +20,11 @@
|
||||
import { getChannelAdapter } from './channels/channel-registry.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import { recordDroppedMessage } from './db/dropped-messages.js';
|
||||
import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js';
|
||||
import {
|
||||
createMessagingGroup,
|
||||
getMessagingGroupAgents,
|
||||
getMessagingGroupWithAgentCount,
|
||||
} from './db/messaging-groups.js';
|
||||
import { findSessionForAgent } from './db/sessions.js';
|
||||
import { startTypingRefresh } from './modules/typing/index.js';
|
||||
import { log } from './log.js';
|
||||
@@ -123,6 +127,27 @@ export function setSenderScopeGate(fn: SenderScopeGateFn): void {
|
||||
senderScopeGate = fn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Channel-registration hook. Runs when the router sees a mention/DM on a
|
||||
* messaging group that has no wirings AND hasn't been denied. The hook is
|
||||
* expected to escalate to an owner (card, etc.) and arrange for future
|
||||
* replay via routeInbound after approval. Fire-and-forget from the
|
||||
* router's perspective.
|
||||
*
|
||||
* Registered by the permissions module. Without the module the router
|
||||
* silently records the drop with reason='no_agent_wired' and moves on.
|
||||
*/
|
||||
export type ChannelRequestGateFn = (mg: MessagingGroup, event: InboundEvent) => Promise<void>;
|
||||
|
||||
let channelRequestGate: ChannelRequestGateFn | null = null;
|
||||
|
||||
export function setChannelRequestGate(fn: ChannelRequestGateFn): void {
|
||||
if (channelRequestGate) {
|
||||
log.warn('Channel-request gate overwritten');
|
||||
}
|
||||
channelRequestGate = fn;
|
||||
}
|
||||
|
||||
function safeParseContent(raw: string): { text?: string; sender?: string; senderId?: string } {
|
||||
try {
|
||||
return JSON.parse(raw);
|
||||
@@ -143,10 +168,21 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
||||
event = { ...event, threadId: null };
|
||||
}
|
||||
|
||||
// 1. Resolve messaging group
|
||||
let mg = getMessagingGroupByPlatform(event.channelType, event.platformId);
|
||||
const isMention = event.message.isMention === true;
|
||||
|
||||
if (!mg) {
|
||||
// 1. Combined lookup: messaging_group row + count of wired agents in a
|
||||
// single query. Cheap short-circuit for the common "unwired channel"
|
||||
// case — one DB read and we're out, no auto-create, no sender
|
||||
// resolution, no log spam.
|
||||
const found = getMessagingGroupWithAgentCount(event.channelType, event.platformId);
|
||||
|
||||
let mg: MessagingGroup;
|
||||
let agentCount: number;
|
||||
if (!found) {
|
||||
// No messaging_groups row. Auto-create only when the message warrants
|
||||
// attention (the bot was addressed — @mention or DM). Plain chatter in
|
||||
// channels we merely sit in stays silent — no row, no DB writes.
|
||||
if (!isMention) return;
|
||||
const mgId = `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
mg = {
|
||||
id: mgId,
|
||||
@@ -154,10 +190,8 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
||||
platform_id: event.platformId,
|
||||
name: null,
|
||||
is_group: 0,
|
||||
// Let the schema default (currently 'request_approval') apply rather
|
||||
// than hardcoding 'strict' — the schema is the source of truth for
|
||||
// the default policy. See migration 011.
|
||||
unknown_sender_policy: 'request_approval',
|
||||
denied_at: null,
|
||||
created_at: new Date().toISOString(),
|
||||
};
|
||||
createMessagingGroup(mg);
|
||||
@@ -166,6 +200,51 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
||||
channelType: event.channelType,
|
||||
platformId: event.platformId,
|
||||
});
|
||||
agentCount = 0;
|
||||
} else {
|
||||
mg = found.mg;
|
||||
agentCount = found.agentCount;
|
||||
}
|
||||
|
||||
// 1b. No wirings — either silent drop (plain chatter / denied channel) or
|
||||
// escalate to owner for channel-registration approval.
|
||||
if (agentCount === 0) {
|
||||
if (!isMention) return;
|
||||
if (mg.denied_at) {
|
||||
log.debug('Message dropped — channel was denied by owner', {
|
||||
messagingGroupId: mg.id,
|
||||
deniedAt: mg.denied_at,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const parsed = safeParseContent(event.message.content);
|
||||
recordDroppedMessage({
|
||||
channel_type: event.channelType,
|
||||
platform_id: event.platformId,
|
||||
user_id: null,
|
||||
sender_name: parsed.sender ?? null,
|
||||
reason: 'no_agent_wired',
|
||||
messaging_group_id: mg.id,
|
||||
agent_group_id: null,
|
||||
});
|
||||
|
||||
if (channelRequestGate) {
|
||||
// Fire-and-forget escalation. The gate is expected to build a card,
|
||||
// persist pending_channel_approvals, and replay the event via
|
||||
// routeInbound after approval. Errors are logged internally — the
|
||||
// user's message still stays dropped here either way.
|
||||
void channelRequestGate(mg, event).catch((err) =>
|
||||
log.error('Channel-request gate threw', { messagingGroupId: mg.id, err }),
|
||||
);
|
||||
} else {
|
||||
log.warn('MESSAGE DROPPED — no agent groups wired and no channel-request gate registered', {
|
||||
messagingGroupId: mg.id,
|
||||
channelType: event.channelType,
|
||||
platformId: event.platformId,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. Sender resolution (permissions module upserts the users row as a
|
||||
@@ -173,27 +252,9 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
||||
// Without the module, userId is null — downstream tolerates it.
|
||||
const userId: string | null = senderResolver ? senderResolver(event) : null;
|
||||
|
||||
// 3. Resolve agent groups wired to this messaging group. Structural
|
||||
// drops record to dropped_messages for audit.
|
||||
// 3. Fetch wired agents in full (we already know the count is > 0; now
|
||||
// we need their actual rows for fan-out).
|
||||
const agents = getMessagingGroupAgents(mg.id);
|
||||
if (agents.length === 0) {
|
||||
log.warn('MESSAGE DROPPED — no agent groups wired to this channel. Run setup register step to configure.', {
|
||||
messagingGroupId: mg.id,
|
||||
channelType: event.channelType,
|
||||
platformId: event.platformId,
|
||||
});
|
||||
const parsed = safeParseContent(event.message.content);
|
||||
recordDroppedMessage({
|
||||
channel_type: event.channelType,
|
||||
platform_id: event.platformId,
|
||||
user_id: userId,
|
||||
sender_name: parsed.sender ?? null,
|
||||
reason: 'no_agent_wired',
|
||||
messaging_group_id: mg.id,
|
||||
agent_group_id: null,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. Fan-out: evaluate each wired agent independently against engage_mode,
|
||||
// sender_scope, and access gate. An agent that engages gets its own
|
||||
@@ -201,12 +262,18 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
||||
// ignored_message_policy='accumulate' still gets the message stored in
|
||||
// its session (trigger=0) so the context is available when it does
|
||||
// engage later. Drop policy = skip silently.
|
||||
//
|
||||
// Subscribe (for mention-sticky wirings on threaded platforms) fires
|
||||
// once per message from this loop — the first engaging mention-sticky
|
||||
// wiring triggers adapter.subscribe(...); subsequent wirings don't
|
||||
// re-subscribe (chat.subscribe is idempotent anyway, but the flag
|
||||
// avoids the extra await).
|
||||
const parsed = safeParseContent(event.message.content);
|
||||
const messageText = parsed.text ?? '';
|
||||
const isMention = event.message.isMention === true;
|
||||
|
||||
let engagedCount = 0;
|
||||
let accumulatedCount = 0;
|
||||
let subscribed = false;
|
||||
|
||||
for (const agent of agents) {
|
||||
const agentGroup = getAgentGroup(agent.agent_group_id);
|
||||
@@ -220,6 +287,27 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
||||
if (engages && accessOk && scopeOk) {
|
||||
await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, true);
|
||||
engagedCount++;
|
||||
|
||||
// Mention-sticky: ask the adapter to subscribe the thread so the
|
||||
// platform's subscribed-message path carries follow-ups without
|
||||
// requiring another @mention. Threaded-adapter only; DMs and
|
||||
// non-threaded platforms skip.
|
||||
if (
|
||||
!subscribed &&
|
||||
agent.engage_mode === 'mention-sticky' &&
|
||||
adapter?.supportsThreads &&
|
||||
adapter.subscribe &&
|
||||
event.threadId !== null &&
|
||||
mg.is_group !== 0
|
||||
) {
|
||||
subscribed = true;
|
||||
// Fire-and-forget — subscribe is platform-side bookkeeping and
|
||||
// shouldn't block message routing. Errors are logged inside the
|
||||
// adapter (or by the promise rejection handler below).
|
||||
void adapter.subscribe(event.platformId, event.threadId).catch((err) => {
|
||||
log.warn('adapter.subscribe failed', { channelType: event.channelType, threadId: event.threadId, err });
|
||||
});
|
||||
}
|
||||
} else if (agent.ignored_message_policy === 'accumulate') {
|
||||
await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, false);
|
||||
accumulatedCount++;
|
||||
|
||||
10
src/types.ts
10
src/types.ts
@@ -17,6 +17,16 @@ export interface MessagingGroup {
|
||||
name: string | null;
|
||||
is_group: number; // 0 | 1
|
||||
unknown_sender_policy: UnknownSenderPolicy;
|
||||
/**
|
||||
* When set, the owner explicitly denied registering this channel — the
|
||||
* router drops silently and does not re-escalate. Cleared by any explicit
|
||||
* wiring mutation (admin command). See migration 012.
|
||||
*
|
||||
* Optional on the TS type so pre-migration-012 callers that build
|
||||
* MessagingGroup objects in code (fixtures, etc.) don't need to update;
|
||||
* the column itself defaults to NULL in SQLite.
|
||||
*/
|
||||
denied_at?: string | null;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user