diff --git a/container/agent-runner/src/formatter.ts b/container/agent-runner/src/formatter.ts index b03f5bd..2e90720 100644 --- a/container/agent-runner/src/formatter.ts +++ b/container/agent-runner/src/formatter.ts @@ -12,7 +12,7 @@ import { TIMEZONE, formatLocalTime } from './timezone.js'; export type CommandCategory = 'admin' | 'filtered' | 'passthrough' | 'none'; const ADMIN_COMMANDS = new Set(['/remote-control', '/clear', '/compact', '/context', '/cost', '/files']); -const FILTERED_COMMANDS = new Set(['/help', '/login', '/logout', '/doctor', '/config']); +const FILTERED_COMMANDS = new Set(['/help', '/login', '/logout', '/doctor', '/config', '/start']); export interface CommandInfo { category: CommandCategory; diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index bbf7f37..9343258 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -5,45 +5,8 @@ * Two patterns: native adapters (implement directly) or Chat SDK bridge (wrap a Chat SDK adapter). */ -/** Configuration for a registered conversation (messaging group + agent wiring). */ -export interface ConversationConfig { - platformId: string; - agentGroupId: string; - /** - * When does the agent engage on messages from this conversation? - * - * 'pattern' — regex test against message text; engagePattern='.' - * means "always" (match everything) - * 'mention' — fires only on @mention - * 'mention-sticky' — fires on @mention, then auto-subscribes to the thread - * and treats subsequent messages as engage-all. - * Threaded platforms only (Slack/Discord/Linear). - */ - engageMode: 'pattern' | 'mention' | 'mention-sticky'; - /** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */ - engagePattern?: string | null; - /** - * What to do with messages this wiring doesn't engage on. - * - * 'drop' — discard silently - * 'accumulate' — still forward to the host so the router can store the - * message in this agent's session with trigger=0. It - * rides along as context when the agent next wakes, but - * doesn't wake it on its own. - * - * The bridge reads this to decide whether to forward a non-engaging - * message at all — if any wiring on a conversation has 'accumulate', the - * bridge forwards and lets the router apply the per-wiring decision. - */ - ignoredMessagePolicy?: 'drop' | 'accumulate'; - sessionMode: 'shared' | 'per-thread' | 'agent-shared'; -} - /** Passed to the adapter at setup time. */ export interface ChannelSetup { - /** Known conversations from central DB. */ - conversations: ConversationConfig[]; - /** Called when an inbound message arrives from the platform. */ onInbound(platformId: string, threadId: string | null, message: InboundMessage): void | Promise; @@ -125,7 +88,17 @@ export interface ChannelAdapter { // Optional setTyping?(platformId: string, threadId: string | null): Promise; syncConversations?(): Promise; - updateConversations?(conversations: ConversationConfig[]): void; + + /** + * Subscribe the bot to a thread so follow-up messages route via the + * platform's "subscribed message" path (onSubscribedMessage in Chat SDK). + * Called by the router when a mention-sticky wiring first engages in a + * thread. Idempotent: calling twice on the same thread is a no-op. + * + * Platforms without a subscription concept can omit this; the router + * treats absence as a no-op. + */ + subscribe?(platformId: string, threadId: string): Promise; /** * Open (or fetch) a DM with this user, returning the platform_id of the diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index 265a372..5121c64 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -64,8 +64,6 @@ function createMockAdapter( }, async setTyping() {}, - - updateConversations() {}, }; } diff --git a/src/channels/chat-sdk-bridge.test.ts b/src/channels/chat-sdk-bridge.test.ts index aad8d0a..7ddad4f 100644 --- a/src/channels/chat-sdk-bridge.test.ts +++ b/src/channels/chat-sdk-bridge.test.ts @@ -2,37 +2,19 @@ import { describe, expect, it } from 'vitest'; import type { Adapter } from 'chat'; -import type { ConversationConfig } from './adapter.js'; -import { createChatSdkBridge, shouldEngage, type EngageSource } from './chat-sdk-bridge.js'; +import { createChatSdkBridge } from './chat-sdk-bridge.js'; function stubAdapter(partial: Partial): Adapter { return { name: 'stub', ...partial } as unknown as Adapter; } -function cfg( - partial: Partial & { engageMode: ConversationConfig['engageMode'] }, -): ConversationConfig { - return { - platformId: partial.platformId ?? 'C1', - agentGroupId: partial.agentGroupId ?? 'ag-1', - engageMode: partial.engageMode, - engagePattern: partial.engagePattern ?? null, - ignoredMessagePolicy: partial.ignoredMessagePolicy ?? 'drop', - sessionMode: partial.sessionMode ?? 'shared', - }; -} - -function mapFor(...configs: ConversationConfig[]): Map { - const map = new Map(); - for (const c of configs) { - const existing = map.get(c.platformId); - if (existing) existing.push(c); - else map.set(c.platformId, [c]); - } - return map; -} - describe('createChatSdkBridge', () => { + // The bridge is now transport-only: forward inbound events, relay outbound + // ops. All per-wiring engage / accumulate / drop / subscribe decisions live + // in the router (src/router.ts routeInbound / evaluateEngage) and are + // exercised by host-core.test.ts end-to-end. These tests only cover the + // bridge's narrow, platform-adjacent surface. + it('omits openDM when the underlying Chat SDK adapter has none', () => { const bridge = createChatSdkBridge({ adapter: stubAdapter({}), @@ -59,162 +41,12 @@ describe('createChatSdkBridge', () => { expect(openDMCalls).toEqual(['user-42']); expect(platformId).toBe('stub:user-42'); }); -}); -describe('shouldEngage', () => { - describe('unknown conversation', () => { - const empty = new Map(); - const sources: EngageSource[] = ['subscribed', 'mention', 'dm']; - for (const source of sources) { - it(`forwards for source='${source}' (may be a not-yet-wired group)`, () => { - expect(shouldEngage(empty, 'C1', source, '')).toEqual({ forward: true, stickySubscribe: false }); - }); - } - it("DROPS for source='new-message' (would flood from unwired channels)", () => { - expect(shouldEngage(empty, 'C1', 'new-message', 'hello')).toEqual({ - forward: false, - stickySubscribe: false, - }); - }); - }); - - describe("engageMode='mention' + ignoredMessagePolicy='drop' (default)", () => { - const conv = mapFor(cfg({ engageMode: 'mention' })); - it('forwards on mention + dm', () => { - expect(shouldEngage(conv, 'C1', 'mention', '').forward).toBe(true); - expect(shouldEngage(conv, 'C1', 'dm', '').forward).toBe(true); - }); - it('does NOT forward on subscribed or new-message (prevents keep-firing + flooding)', () => { - expect(shouldEngage(conv, 'C1', 'subscribed', '').forward).toBe(false); - expect(shouldEngage(conv, 'C1', 'new-message', '').forward).toBe(false); - }); - it('never asks to subscribe', () => { - for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) { - expect(shouldEngage(conv, 'C1', s, '').stickySubscribe).toBe(false); - } - }); - }); - - describe("engageMode='mention-sticky'", () => { - const conv = mapFor(cfg({ engageMode: 'mention-sticky' })); - it('forwards on mention + dm with stickySubscribe=true', () => { - expect(shouldEngage(conv, 'C1', 'mention', '')).toEqual({ forward: true, stickySubscribe: true }); - expect(shouldEngage(conv, 'C1', 'dm', '')).toEqual({ forward: true, stickySubscribe: true }); - }); - it('forwards subscribed follow-ups without re-subscribing', () => { - expect(shouldEngage(conv, 'C1', 'subscribed', '')).toEqual({ forward: true, stickySubscribe: false }); - }); - it('does NOT forward on new-message (explicit mention required to start)', () => { - expect(shouldEngage(conv, 'C1', 'new-message', '').forward).toBe(false); - }); - }); - - describe("engageMode='pattern'", () => { - it('pattern="." forwards on every source (when conversation is known)', () => { - const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '.' })); - for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) { - expect(shouldEngage(conv, 'C1', s, 'anything').forward).toBe(true); - } - }); - - it('tests regex against text on new-message (the main bug fix)', () => { - const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '^!report' })); - expect(shouldEngage(conv, 'C1', 'new-message', '!report now').forward).toBe(true); - expect(shouldEngage(conv, 'C1', 'new-message', 'nothing to see').forward).toBe(false); - }); - - it('pattern regex applies on every source (symmetry)', () => { - const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: 'deploy' })); - for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) { - expect(shouldEngage(conv, 'C1', s, 'time to deploy').forward).toBe(true); - expect(shouldEngage(conv, 'C1', s, 'weather today').forward).toBe(false); - } - }); - - it('pattern never triggers sticky-subscribe', () => { - const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '.' })); - for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) { - expect(shouldEngage(conv, 'C1', s, 'hi').stickySubscribe).toBe(false); - } - }); - - it('invalid regex fails open (admin sees something rather than silent drop)', () => { - const conv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '[unclosed' })); - expect(shouldEngage(conv, 'C1', 'new-message', 'x').forward).toBe(true); - }); - }); - - describe("ignoredMessagePolicy='accumulate'", () => { - it('forwards non-engaging new-message so the router can store it as context (trigger=0)', () => { - const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'accumulate' })); - // Plain message in unsubscribed group — mention rule says no engage, - // but accumulate says forward anyway. - expect(shouldEngage(conv, 'C1', 'new-message', 'chit chat')).toEqual({ - forward: true, - stickySubscribe: false, - }); - }); - - it('forwards non-engaging subscribed messages for accumulation', () => { - // mention wiring in a subscribed thread: the mention handler already - // fired once, thread is now subscribed, follow-ups route here. The - // base 'mention' rule wouldn't engage without an @-mention, but - // accumulate says capture the context. - const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'accumulate' })); - expect(shouldEngage(conv, 'C1', 'subscribed', 'follow up talk').forward).toBe(true); - }); - - it('does NOT set stickySubscribe purely from accumulate (avoid misleading bot presence)', () => { - const conv = mapFor(cfg({ engageMode: 'mention-sticky', ignoredMessagePolicy: 'accumulate' })); - expect(shouldEngage(conv, 'C1', 'new-message', 'plain').stickySubscribe).toBe(false); - }); - - it("accumulate doesn't override the 'unknown conversation → drop new-message' rule", () => { - // Unknown conversation (not in map): accumulate can't be read because - // there's no config to read from. We still drop. - const empty = new Map(); - expect(shouldEngage(empty, 'C-unknown', 'new-message', 'x').forward).toBe(false); - }); - - it("drop policy + non-engaging message → doesn't forward", () => { - const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'drop' })); - expect(shouldEngage(conv, 'C1', 'new-message', 'plain').forward).toBe(false); - }); - - it('engaging message with drop policy still forwards (engage wins regardless)', () => { - const conv = mapFor(cfg({ engageMode: 'mention', ignoredMessagePolicy: 'drop' })); - expect(shouldEngage(conv, 'C1', 'mention', '@bot hi').forward).toBe(true); - }); - }); - - describe('multiple wirings on one conversation', () => { - it('takes the union across wirings (any-engage wins)', () => { - // mention wiring + pattern wiring on the same channel. A plain message - // should engage via the pattern wiring even though the mention wiring - // alone would reject it. - const conv = mapFor( - cfg({ agentGroupId: 'ag-a', engageMode: 'mention' }), - cfg({ agentGroupId: 'ag-b', engageMode: 'pattern', engagePattern: '^hi' }), - ); - expect(shouldEngage(conv, 'C1', 'new-message', 'hi there').forward).toBe(true); - expect(shouldEngage(conv, 'C1', 'new-message', 'something else').forward).toBe(false); - }); - - it('any accumulate wiring causes forward even if all others would drop', () => { - const conv = mapFor( - cfg({ agentGroupId: 'ag-a', engageMode: 'mention', ignoredMessagePolicy: 'drop' }), - cfg({ agentGroupId: 'ag-b', engageMode: 'mention', ignoredMessagePolicy: 'accumulate' }), - ); - // Plain message: ag-a would drop, ag-b would accumulate → forward. - expect(shouldEngage(conv, 'C1', 'new-message', 'plain').forward).toBe(true); - }); - - it('stickySubscribe from any mention-sticky wiring wins', () => { - const conv = mapFor( - cfg({ agentGroupId: 'ag-a', engageMode: 'mention' }), - cfg({ agentGroupId: 'ag-b', engageMode: 'mention-sticky' }), - ); - expect(shouldEngage(conv, 'C1', 'mention', '')).toEqual({ forward: true, stickySubscribe: true }); + it('exposes subscribe (lets the router initiate thread subscription on mention-sticky engage)', () => { + const bridge = createChatSdkBridge({ + adapter: stubAdapter({}), + supportsThreads: true, }); + expect(typeof bridge.subscribe).toBe('function'); }); }); diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index bea4c16..ef2195e 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -21,7 +21,7 @@ import { SqliteStateAdapter } from '../state-sqlite.js'; import { registerWebhookAdapter } from '../webhook-server.js'; import { getAskQuestionRender } from '../db/sessions.js'; import { normalizeOptions, type NormalizedOption } from './ask-question.js'; -import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js'; +import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js'; /** Adapter with optional gateway support (e.g., Discord). */ interface GatewayAdapter extends Adapter { @@ -65,151 +65,14 @@ export interface ChatSdkBridgeConfig { transformOutboundText?: (text: string) => string; } -/** - * Which Chat SDK handler delivered this message. Determines which engage modes - * can fire. - * - * - `subscribed` — `onSubscribedMessage`. Thread is already subscribed. - * Every wiring mode (mention / mention-sticky / pattern) - * evaluates normally. - * - `mention` — `onNewMention`. Bot was @-mentioned in an unsubscribed - * thread. mention + mention-sticky engage; pattern runs - * the regex. - * - `dm` — `onDirectMessage`. Unsubscribed DM. Treated like a - * mention for engagement purposes. - * - `new-message` — `onNewMessage(/./, …)`. Plain non-mention non-DM - * message in an unsubscribed thread. Only `pattern` - * wirings can fire here. mention / mention-sticky ignore - * this source (they require an explicit mention). - */ -export type EngageSource = 'subscribed' | 'mention' | 'dm' | 'new-message'; - -/** - * Should a message from (channelId, source, text) be forwarded to the host, - * and if so, should the bridge subscribe the thread? - * - * Exported for testability — see `chat-sdk-bridge.test.ts`. - * - * We take the union across wired agents: if any wiring would engage OR any - * wiring has `ignoredMessagePolicy='accumulate'`, the message is forwarded. - * The host router then does the per-wiring decision in `deliverToAgent` — - * engaging agents get `trigger=1` (wake), accumulating agents get - * `trigger=0` (store as context, don't wake), drop-policy agents are - * skipped (see `src/router.ts` routeInbound fan-out). - * - * `stickySubscribe` is only set when an actual engage happens (not just - * accumulate) — subscribing a thread we'd only silently accumulate on would - * misrepresent the bot's presence to other users. - */ -export function shouldEngage( - conversations: Map, - channelId: string, - source: EngageSource, - text: string, -): { forward: boolean; stickySubscribe: boolean } { - const configs = conversations.get(channelId); - - // Unknown conversation — behavior diverges by source: - // - subscribed/mention/dm: forward anyway. These paths imply some - // prior engagement (subscribe, @mention, DM open) and may be a new - // group that hasn't been registered yet; central routing will log + - // drop cleanly. - // - new-message: DROP. `onNewMessage(/./, …)` fires for every message - // in every unsubscribed thread the bot can see — including channels - // the bot is merely *present* in but not wired to. Forwarding - // everything would flood the host. - if (!configs || configs.length === 0) { - return { forward: source !== 'new-message', stickySubscribe: false }; - } - - let engage = false; - let accumulate = false; - let stickySubscribe = false; - - for (const cfg of configs) { - let cfgEngages = false; - switch (cfg.engageMode) { - case 'mention': - if (source === 'mention' || source === 'dm') cfgEngages = true; - break; - case 'mention-sticky': - if (source === 'mention' || source === 'dm') { - cfgEngages = true; - stickySubscribe = true; - } else if (source === 'subscribed') { - // Thread was already subscribed on a prior mention — treat as - // engage-all so follow-ups in the thread reach the agent. - cfgEngages = true; - } - // source='new-message' → does not engage (requires explicit mention - // to start). Accumulate policy is evaluated below if set. - break; - case 'pattern': { - // Pattern evaluates on any source that delivers a plain message — - // including new-message, which is the whole reason we registered - // onNewMessage(/./). For mention/dm-delivered messages we still - // test the regex (historical behavior), so pattern='foo' wirings - // only fire on mentions whose text contains 'foo'. - const pattern = cfg.engagePattern ?? '.'; - try { - if (pattern === '.' || new RegExp(pattern).test(text)) cfgEngages = true; - } catch { - // Invalid regex → fail open so the admin can see something is - // happening and fix the pattern. - cfgEngages = true; - } - break; - } - } - - if (cfgEngages) { - engage = true; - } else if (cfg.ignoredMessagePolicy === 'accumulate') { - // Wiring doesn't engage on this message but wants it captured as - // context for its session — forward so the router can write it with - // trigger=0. - accumulate = true; - } - } - - return { forward: engage || accumulate, stickySubscribe }; -} - export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter { const { adapter } = config; const transformText = (t: string): string => (config.transformOutboundText ? config.transformOutboundText(t) : t); let chat: Chat; let state: SqliteStateAdapter; let setupConfig: ChannelSetup; - // Keyed by platformId. Multiple agents may be wired to the same - // conversation — this holds all their configs so the bridge can apply the - // most-permissive engage rule at gate time and only subscribe when at - // least one wiring requested 'mention-sticky'. - // - // STALENESS: populated at setup() and updateConversations(). If wirings - // change after setup, updateConversations() must be called to refresh - // (ACTION-ITEMS item 17). - let conversations: Map; let gatewayAbort: AbortController | null = null; - function buildConversationMap(configs: ConversationConfig[]): Map { - const map = new Map(); - for (const conv of configs) { - const existing = map.get(conv.platformId); - if (existing) existing.push(conv); - else map.set(conv.platformId, [conv]); - } - return map; - } - - function engageDecision( - channelId: string, - source: EngageSource, - text: string, - ): { forward: boolean; stickySubscribe: boolean } { - return shouldEngage(conversations, channelId, source, text); - } - async function messageToInbound(message: ChatMessage, isMention: boolean): Promise { // eslint-disable-next-line @typescript-eslint/no-explicit-any const serialized = message.toJSON() as Record; @@ -277,7 +140,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter async setup(hostConfig: ChannelSetup) { setupConfig = hostConfig; - conversations = buildConversationMap(hostConfig.conversations); state = new SqliteStateAdapter(); @@ -289,83 +151,54 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter logger: 'silent', }); - // Subscribed threads — the conversation is already active (via prior - // mention-sticky engagement or admin wiring). Gate on engageMode so a - // plain 'mention' wiring doesn't keep firing after a one-off mention. + // Four SDK dispatch paths — bridge just forwards. All per-wiring + // engage / accumulate / drop / subscribe decisions live in the host + // router (src/router.ts routeInbound / evaluateEngage). The bridge + // only resolves channel ids and sets the platform-confirmed isMention + // flag that routeInbound evaluates; the router calls back into + // bridge.subscribe(...) when a mention-sticky wiring engages. + + // Subscribed threads — every message in a thread we've previously + // engaged. Carry the SDK's `message.isMention` through so mention-mode + // wirings still fire on in-thread mentions. chat.onSubscribedMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const text = typeof message.text === 'string' ? message.text : ''; - const decision = engageDecision(channelId, 'subscribed', text); - if (!decision.forward) return; - // Subscribed path: the SDK sets message.isMention when the bot was - // @-mentioned in an already-subscribed thread (docs at - // handling-events.mdx). Forward it verbatim. await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, message.isMention === true)); }); - // @mention in an unsubscribed thread — always engage; subscribe only - // if the wiring is 'mention-sticky'. + // @mention in an unsubscribed thread — SDK-confirmed bot mention. chat.onNewMention(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const text = typeof message.text === 'string' ? message.text : ''; - const decision = engageDecision(channelId, 'mention', text); - if (!decision.forward) return; - // onNewMention only fires when the SDK confirms the bot was mentioned. await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true)); - if (decision.stickySubscribe) { - await thread.subscribe(); - } }); - // DMs — apply engage rules too, but DMs typically default to pattern='.' - // at setup time so this is a pass-through in practice. sticky subscribe - // follows the same rule as a group mention. - // - // Thread id is passed through so sub-thread context reaches delivery - // (Slack users can open threads inside a DM). The router collapses DM - // sub-threads to one session (is_group=0 short-circuits the per-thread - // escalation). + // DMs — by definition addressed to the bot. Thread id flows through + // so sub-thread context reaches delivery (Slack users can open threads + // inside a DM). Router collapses DM sub-threads to one session via + // is_group=0 short-circuit. chat.onDirectMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const text = typeof message.text === 'string' ? message.text : ''; - const decision = engageDecision(channelId, 'dm', text); log.info('Inbound DM received', { adapter: adapter.name, channelId, sender: (message.author as any)?.fullName ?? (message.author as any)?.userId ?? 'unknown', threadId: thread.id, - forward: decision.forward, }); - if (!decision.forward) return; - // A DM is by definition addressed to the bot — treat as a mention - // for routing purposes. `mention` / `mention-sticky` wirings fire. await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true)); - if (decision.stickySubscribe) { - await thread.subscribe(); - } }); - // Plain (non-mention, non-DM) messages in unsubscribed threads. + // Plain messages in unsubscribed threads. // - // Chat SDK dispatch (handling-events.mdx §"Handler dispatch order"): - // subscribed threads → onSubscribedMessage; unsubscribed + mention → - // onNewMention; unsubscribed + pattern match → onNewMessage. Dispatch - // is exclusive — at most one handler fires per message. - // - // Without this handler, `engage_mode='pattern'` is silently dropped in - // unsubscribed group threads because the SDK never surfaces the - // message anywhere else. Registering with `/./` lets every wired - // conversation's regex be evaluated in our `shouldEngage` — unknown - // conversations are dropped there (see the source='new-message' - // branch) so this doesn't flood the host on channels the bot isn't - // wired to. + // Chat SDK dispatch (handling-events.mdx §"Handler dispatch order") is + // exclusive: subscribed → onSubscribedMessage; unsubscribed+mention → + // onNewMention; unsubscribed+pattern-match → onNewMessage. Registering + // with `/./` lets the router see every plain message on every + // unsubscribed thread the bot can see. The router short-circuits via + // getMessagingGroupWithAgentCount (~1 DB read) for unwired channels, + // so forwarding every one is cheap enough to not need a bridge-side + // flood gate. chat.onNewMessage(/./, async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const text = typeof message.text === 'string' ? message.text : ''; - const decision = engageDecision(channelId, 'new-message', text); - if (!decision.forward) return; - // SDK dispatch guarantees this is a non-mention non-DM message in an - // unsubscribed thread — isMention is definitively false here. await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, false)); }); @@ -538,8 +371,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter return true; }, - updateConversations(configs: ConversationConfig[]) { - conversations = buildConversationMap(configs); + async subscribe(_platformId: string, threadId: string) { + // Chat SDK's subscription state lives on the StateAdapter (not on the + // Chat instance itself). SqliteStateAdapter.subscribe is idempotent — + // a second call on an already-subscribed thread is a no-op. threadId + // is the SDK's thread id, which is what the router already has from + // the original inbound event. + await state.subscribe(threadId); }, }; diff --git a/src/db/messaging-groups.ts b/src/db/messaging-groups.ts index db12583..33c8715 100644 --- a/src/db/messaging-groups.ts +++ b/src/db/messaging-groups.ts @@ -37,6 +37,37 @@ export function getMessagingGroupByPlatform(channelType: string, platformId: str .get(channelType, platformId) as MessagingGroup | undefined; } +/** + * Combined lookup for the router's fast-drop path. Returns the messaging + * group (if it exists) and a count of wired agents in one query — lets + * `routeInbound` short-circuit messages for unwired / unknown channels + * with a single DB read instead of four (mg lookup, sender upsert, agents + * lookup, dropped_messages insert). + * + * Returns `null` when no messaging_groups row exists for this channel. + * Returns `{ mg, agentCount: 0 }` when the row exists but has no wired + * agents. Uses the `UNIQUE(channel_type, platform_id)` index plus the + * `UNIQUE(messaging_group_id, agent_group_id)` index for the JOIN — both + * covered by existing SQLite auto-indexes from the UNIQUE constraints. + */ +export function getMessagingGroupWithAgentCount( + channelType: string, + platformId: string, +): { mg: MessagingGroup; agentCount: number } | null { + const row = getDb() + .prepare( + `SELECT mg.*, COUNT(mga.id) AS agent_count + FROM messaging_groups mg + LEFT JOIN messaging_group_agents mga ON mga.messaging_group_id = mg.id + WHERE mg.channel_type = ? AND mg.platform_id = ? + GROUP BY mg.id`, + ) + .get(channelType, platformId) as (MessagingGroup & { agent_count: number }) | undefined; + if (!row) return null; + const { agent_count, ...mg } = row; + return { mg: mg as MessagingGroup, agentCount: agent_count }; +} + export function getAllMessagingGroups(): MessagingGroup[] { return getDb().prepare('SELECT * FROM messaging_groups ORDER BY name').all() as MessagingGroup[]; } @@ -69,6 +100,20 @@ export function deleteMessagingGroup(id: string): void { getDb().prepare('DELETE FROM messaging_groups WHERE id = ?').run(id); } +/** + * Mark a messaging group as denied by the owner (channel-registration flow). + * Future mentions on this channel silently drop until an admin explicitly + * wires it via `createMessagingGroupAgent`, which implicitly clears the + * denied state by making `agentCount > 0` — the router's denied-channel + * check sits on the `agentCount === 0` branch. + * + * Passing null unsets the flag (used by tests or a future "unblock channel" + * admin command). + */ +export function setMessagingGroupDeniedAt(id: string, deniedAt: string | null): void { + getDb().prepare('UPDATE messaging_groups SET denied_at = ? WHERE id = ?').run(deniedAt, id); +} + // ── Messaging Group Agents ── /** diff --git a/src/db/migrations/012-channel-registration.ts b/src/db/migrations/012-channel-registration.ts new file mode 100644 index 0000000..eca8911 --- /dev/null +++ b/src/db/migrations/012-channel-registration.ts @@ -0,0 +1,48 @@ +/** + * Unknown-channel registration flow. + * + * When a channel that isn't wired to any agent group receives a mention or + * DM, the router escalates to the owner for approval before wiring. Approve + * creates a `messaging_group_agents` row (with conservative defaults) and + * replays the triggering event. Deny marks the channel denied forever + * (stored as a timestamp on `messaging_groups.denied_at`) so future + * messages on that channel drop silently without re-prompting. + * + * Two changes: + * 1. `messaging_groups.denied_at TEXT NULL` — set on deny, checked in the + * router before re-escalating. ALTER TABLE ADD COLUMN is FK-safe + * unlike the table rebuild that bit us in migration 011. + * 2. `pending_channel_approvals` table. PRIMARY KEY on + * `messaging_group_id` gives free in-flight dedup — a second mention + * while the card is pending is silently dropped by INSERT OR IGNORE, + * preventing card spam. + */ +import type Database from 'better-sqlite3'; +import type { Migration } from './index.js'; + +export const migration012: Migration = { + version: 12, + name: 'channel-registration', + up: (db: Database.Database) => { + // 1. Add denied_at to messaging_groups. Idempotent guard in case the + // column was added by some other path before this migration ran. + const cols = db.prepare("PRAGMA table_info('messaging_groups')").all() as Array<{ name: string }>; + if (!cols.some((c) => c.name === 'denied_at')) { + db.exec(`ALTER TABLE messaging_groups ADD COLUMN denied_at TEXT`); + } + + // 2. pending_channel_approvals. + db.exec(` + CREATE TABLE IF NOT EXISTS pending_channel_approvals ( + messaging_group_id TEXT PRIMARY KEY REFERENCES messaging_groups(id), + agent_group_id TEXT NOT NULL REFERENCES agent_groups(id), + -- The agent the approved wiring will target. + -- Picked at request time (currently: earliest + -- agent_group by created_at). + original_message TEXT NOT NULL, -- JSON serialized InboundEvent + approver_user_id TEXT NOT NULL, + created_at TEXT NOT NULL + ); + `); + }, +}; diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts index 1015f40..33e6963 100644 --- a/src/db/migrations/index.ts +++ b/src/db/migrations/index.ts @@ -8,6 +8,7 @@ import { migration008 } from './008-dropped-messages.js'; import { migration009 } from './009-drop-pending-credentials.js'; import { migration010 } from './010-engage-modes.js'; import { migration011 } from './011-pending-sender-approvals.js'; +import { migration012 } from './012-channel-registration.js'; import { moduleApprovalsPendingApprovals } from './module-approvals-pending-approvals.js'; import { moduleApprovalsTitleOptions } from './module-approvals-title-options.js'; @@ -27,6 +28,7 @@ const migrations: Migration[] = [ migration009, migration010, migration011, + migration012, ]; export function runMigrations(db: Database.Database): void { diff --git a/src/host-core.test.ts b/src/host-core.test.ts index 33d37ff..da2fd37 100644 --- a/src/host-core.test.ts +++ b/src/host-core.test.ts @@ -244,26 +244,42 @@ describe('router', () => { expect(wakeContainer).toHaveBeenCalled(); }); - it('should auto-create messaging group for unknown platform', async () => { + it('auto-creates messaging group only when the bot is addressed (mention/DM)', async () => { + // The router's no-mg branch is escalation-gated: plain chatter on an + // unknown channel stays silent (no DB writes) so a bot that sits in + // many unwired channels doesn't bloat messaging_groups. Only explicit + // mentions and DMs trigger auto-create. const { routeInbound } = await import('./router.js'); + const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js'); - const event: InboundEvent = { + // Plain message on unknown channel — should NOT auto-create. + await routeInbound({ channelType: 'slack', - platformId: 'C-NEW-CHANNEL', + platformId: 'C-PLAIN', threadId: null, message: { - id: 'msg-2', + id: 'msg-plain', kind: 'chat', content: JSON.stringify({ sender: 'User', text: 'Hi' }), timestamp: now(), }, - }; + }); + expect(getMessagingGroupByPlatform('slack', 'C-PLAIN')).toBeUndefined(); - await routeInbound(event); - - const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js'); - const mg = getMessagingGroupByPlatform('slack', 'C-NEW-CHANNEL'); - expect(mg).toBeDefined(); + // Mention on unknown channel — SHOULD auto-create (next step: channel-registration flow). + await routeInbound({ + channelType: 'slack', + platformId: 'C-MENTIONED', + threadId: null, + message: { + id: 'msg-mentioned', + kind: 'chat', + content: JSON.stringify({ sender: 'User', text: '@bot hi' }), + timestamp: now(), + isMention: true, + }, + }); + expect(getMessagingGroupByPlatform('slack', 'C-MENTIONED')).toBeDefined(); }); it('should route multiple messages to the same session', async () => { diff --git a/src/index.ts b/src/index.ts index 595ba1b..7c5ab24 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,7 +9,6 @@ import path from 'path'; import { DATA_DIR } from './config.js'; import { initDb } from './db/connection.js'; import { runMigrations } from './db/migrations/index.js'; -import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messaging-groups.js'; import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js'; import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js'; import { startHostSweep, stopHostSweep } from './host-sweep.js'; @@ -52,7 +51,7 @@ import './channels/index.js'; // append registry-based modules. Imported for side effects (registrations). import './modules/index.js'; -import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js'; +import type { ChannelAdapter, ChannelSetup } from './channels/adapter.js'; import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js'; async function main(): Promise { @@ -70,9 +69,7 @@ async function main(): Promise { // 3. Channel adapters await initChannelAdapters((adapter: ChannelAdapter): ChannelSetup => { - const conversations = buildConversationConfigs(adapter.channelType); return { - conversations, onInbound(platformId, threadId, message) { routeInbound({ channelType: adapter.channelType, @@ -151,28 +148,6 @@ async function main(): Promise { log.info('NanoClaw running'); } -/** Build ConversationConfig[] for a channel type from the central DB. */ -function buildConversationConfigs(channelType: string): ConversationConfig[] { - const groups = getMessagingGroupsByChannel(channelType); - const configs: ConversationConfig[] = []; - - for (const mg of groups) { - const agents = getMessagingGroupAgents(mg.id); - for (const agent of agents) { - configs.push({ - platformId: mg.platform_id, - agentGroupId: agent.agent_group_id, - engageMode: agent.engage_mode, - engagePattern: agent.engage_pattern, - ignoredMessagePolicy: agent.ignored_message_policy, - sessionMode: agent.session_mode, - }); - } - } - - return configs; -} - /** Graceful shutdown. */ async function shutdown(signal: string): Promise { log.info('Shutdown signal received', { signal }); diff --git a/src/modules/permissions/channel-approval.test.ts b/src/modules/permissions/channel-approval.test.ts new file mode 100644 index 0000000..f3ea7e9 --- /dev/null +++ b/src/modules/permissions/channel-approval.test.ts @@ -0,0 +1,392 @@ +/** + * Integration tests for the unknown-channel registration flow (ACTION-ITEMS + * item 22). + * + * Covers: + * - Mention on an unwired channel fires an owner-approval card + * - DM on an unwired channel fires a card (engage_mode will default to pattern='.') + * - In-flight dedup: second mention while a card is pending doesn't spam + * - Approve: wiring created with correct defaults, triggering sender added + * as member, replay wakes the container + * - Deny: messaging_groups.denied_at set, future mentions drop silently + * - Unauthorized clicker is rejected (same pattern as sender-approval) + * - No-owner install: no card, no row + * - No agent groups configured: no card, no row + */ +import fs from 'fs'; +import { beforeEach, afterEach, describe, expect, it, vi } from 'vitest'; + +import { initTestDb, closeDb, runMigrations } from '../../db/index.js'; +import { createAgentGroup } from '../../db/agent-groups.js'; +import { createMessagingGroup, getMessagingGroupByPlatform } from '../../db/messaging-groups.js'; +import { upsertUser } from './db/users.js'; +import { grantRole } from './db/user-roles.js'; + +// Mock container runner — prevent actual docker spawn. +vi.mock('../../container-runner.js', () => ({ + wakeContainer: vi.fn().mockResolvedValue(undefined), + isContainerRunning: vi.fn().mockReturnValue(false), + getActiveContainerCount: vi.fn().mockReturnValue(0), + killContainer: vi.fn(), +})); + +// Mock delivery adapter. +const deliverMock = vi.fn().mockResolvedValue('plat-msg-id'); +vi.mock('../../delivery.js', () => ({ + getDeliveryAdapter: () => ({ deliver: deliverMock }), +})); + +// Mock ensureUserDm — look up the owner's preconfigured DM row instead of +// hitting a real openDM RPC. +vi.mock('./user-dm.js', () => ({ + ensureUserDm: vi.fn(async (userId: string) => { + const { getDb } = await import('../../db/connection.js'); + const row = getDb() + .prepare( + `SELECT mg.* FROM messaging_groups mg + JOIN user_dms ud ON ud.messaging_group_id = mg.id + WHERE ud.user_id = ?`, + ) + .get(userId); + return row; + }), +})); + +vi.mock('../../config.js', async () => { + const actual = await vi.importActual('../../config.js'); + return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-channel-approval' }; +}); + +const TEST_DIR = '/tmp/nanoclaw-test-channel-approval'; + +function now() { + return new Date().toISOString(); +} + +beforeEach(async () => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + const db = initTestDb(); + runMigrations(db); + + await import('./index.js'); // register hooks + + // Base fixtures: one agent group + owner with a DM on 'telegram'. + createAgentGroup({ id: 'ag-1', name: 'Andy', folder: 'andy', agent_provider: null, created_at: now() }); + + upsertUser({ id: 'telegram:owner', kind: 'telegram', display_name: 'Owner', created_at: now() }); + grantRole({ + user_id: 'telegram:owner', + role: 'owner', + agent_group_id: null, + granted_by: null, + granted_at: now(), + }); + + // Pre-seed owner's DM messaging group + user_dms mapping. + createMessagingGroup({ + id: 'mg-dm-owner', + channel_type: 'telegram', + platform_id: 'dm-owner', + name: 'Owner DM', + is_group: 0, + unknown_sender_policy: 'public', + created_at: now(), + }); + const { getDb } = await import('../../db/connection.js'); + getDb() + .prepare( + `INSERT INTO user_dms (user_id, channel_type, messaging_group_id, resolved_at) + VALUES (?, ?, ?, ?)`, + ) + .run('telegram:owner', 'telegram', 'mg-dm-owner', now()); + + deliverMock.mockClear(); +}); + +afterEach(() => { + closeDb(); + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); +}); + +function groupMention(platformId: string, text = '@bot hello') { + return { + channelType: 'telegram', + platformId, + threadId: 'thread-1', // non-null → is_group=true per channel-approval default-picker logic + message: { + id: `msg-${Math.random().toString(36).slice(2, 8)}`, + kind: 'chat' as const, + content: JSON.stringify({ senderId: 'caller', senderName: 'Caller', text }), + timestamp: now(), + isMention: true, + }, + }; +} + +function dmEvent(platformId: string, text = 'hello') { + return { + channelType: 'telegram', + platformId, + threadId: null, + message: { + id: `msg-${Math.random().toString(36).slice(2, 8)}`, + kind: 'chat' as const, + content: JSON.stringify({ senderId: 'stranger', senderName: 'Stranger', text }), + timestamp: now(), + isMention: true, // DM bridge sets isMention=true + }, + }; +} + +describe('unknown-channel registration flow', () => { + it('delivers an approval card on mention into an unwired group', async () => { + const { routeInbound } = await import('../../router.js'); + await routeInbound(groupMention('chat-new')); + await new Promise((r) => setTimeout(r, 10)); + + expect(deliverMock).toHaveBeenCalledTimes(1); + const [channel, platformId, thread, kind, content] = deliverMock.mock.calls[0]; + expect(channel).toBe('telegram'); + expect(platformId).toBe('dm-owner'); // delivered to owner's DM + expect(thread).toBeNull(); + expect(kind).toBe('chat-sdk'); + const payload = JSON.parse(content as string); + expect(payload.type).toBe('ask_question'); + // Card names the target agent so the owner knows what they're wiring to. + expect(payload.question).toContain('Andy'); + + const { getDb } = await import('../../db/connection.js'); + const rows = getDb().prepare('SELECT * FROM pending_channel_approvals').all() as Array<{ + messaging_group_id: string; + }>; + expect(rows).toHaveLength(1); + }); + + it('delivers a card on DM too (non-threaded event)', async () => { + const { routeInbound } = await import('../../router.js'); + await routeInbound(dmEvent('dm-new-user')); + await new Promise((r) => setTimeout(r, 10)); + + expect(deliverMock).toHaveBeenCalledTimes(1); + const { getDb } = await import('../../db/connection.js'); + const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c; + expect(count).toBe(1); + }); + + it('dedups a second mention while the card is pending', async () => { + const { routeInbound } = await import('../../router.js'); + await routeInbound(groupMention('chat-busy')); + await new Promise((r) => setTimeout(r, 10)); + await routeInbound(groupMention('chat-busy', '@bot still here')); + await new Promise((r) => setTimeout(r, 10)); + + expect(deliverMock).toHaveBeenCalledTimes(1); + const { getDb } = await import('../../db/connection.js'); + const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c; + expect(count).toBe(1); + }); + + it('approve → creates wiring, admits triggering sender, replays', async () => { + const { routeInbound } = await import('../../router.js'); + const { getResponseHandlers } = await import('../../response-registry.js'); + const { wakeContainer } = await import('../../container-runner.js'); + (wakeContainer as unknown as ReturnType).mockClear(); + + await routeInbound(groupMention('chat-approve')); + await new Promise((r) => setTimeout(r, 10)); + + const { getDb } = await import('../../db/connection.js'); + const pending = getDb() + .prepare('SELECT messaging_group_id FROM pending_channel_approvals') + .get() as { messaging_group_id: string }; + expect(pending).toBeDefined(); + + // Owner clicks approve. + for (const handler of getResponseHandlers()) { + const claimed = await handler({ + questionId: pending.messaging_group_id, + value: 'approve', + userId: 'owner', // raw platform id — handler namespaces it + channelType: 'telegram', + platformId: 'dm-owner', + threadId: null, + }); + if (claimed) break; + } + + // Wiring created with MVP defaults. + const mga = getDb() + .prepare('SELECT * FROM messaging_group_agents WHERE messaging_group_id = ?') + .get(pending.messaging_group_id) as { + engage_mode: string; + engage_pattern: string | null; + sender_scope: string; + ignored_message_policy: string; + agent_group_id: string; + }; + expect(mga).toBeDefined(); + expect(mga.engage_mode).toBe('mention-sticky'); // group (threadId != null) + expect(mga.engage_pattern).toBeNull(); + expect(mga.sender_scope).toBe('known'); + expect(mga.ignored_message_policy).toBe('accumulate'); + expect(mga.agent_group_id).toBe('ag-1'); + + // Triggering sender auto-admitted so sender_scope='known' doesn't + // bounce the replay into sender-approval. + const member = getDb() + .prepare('SELECT 1 AS x FROM agent_group_members WHERE user_id = ? AND agent_group_id = ?') + .get('telegram:caller', 'ag-1'); + expect(member).toBeDefined(); + + // Pending row cleared and container woken via replay. + const stillPending = ( + getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number } + ).c; + expect(stillPending).toBe(0); + expect(wakeContainer).toHaveBeenCalled(); + }); + + it('approve on a DM wires with pattern="." defaults', async () => { + const { routeInbound } = await import('../../router.js'); + const { getResponseHandlers } = await import('../../response-registry.js'); + + await routeInbound(dmEvent('dm-approve-user')); + await new Promise((r) => setTimeout(r, 10)); + + const { getDb } = await import('../../db/connection.js'); + const pending = getDb() + .prepare('SELECT messaging_group_id FROM pending_channel_approvals') + .get() as { messaging_group_id: string }; + + for (const handler of getResponseHandlers()) { + const claimed = await handler({ + questionId: pending.messaging_group_id, + value: 'approve', + userId: 'owner', + channelType: 'telegram', + platformId: 'dm-owner', + threadId: null, + }); + if (claimed) break; + } + + const mga = getDb() + .prepare('SELECT engage_mode, engage_pattern FROM messaging_group_agents WHERE messaging_group_id = ?') + .get(pending.messaging_group_id) as { engage_mode: string; engage_pattern: string }; + expect(mga.engage_mode).toBe('pattern'); + expect(mga.engage_pattern).toBe('.'); + }); + + it('deny → sets denied_at; future mentions drop silently without a second card', async () => { + const { routeInbound } = await import('../../router.js'); + const { getResponseHandlers } = await import('../../response-registry.js'); + + await routeInbound(groupMention('chat-deny')); + await new Promise((r) => setTimeout(r, 10)); + const { getDb } = await import('../../db/connection.js'); + const pending = getDb() + .prepare('SELECT messaging_group_id FROM pending_channel_approvals') + .get() as { messaging_group_id: string }; + + for (const handler of getResponseHandlers()) { + const claimed = await handler({ + questionId: pending.messaging_group_id, + value: 'reject', + userId: 'owner', + channelType: 'telegram', + platformId: 'dm-owner', + threadId: null, + }); + if (claimed) break; + } + + // denied_at set, pending row cleared, no wiring. + const mg = getMessagingGroupByPlatform('telegram', 'chat-deny'); + expect(mg?.denied_at).not.toBeNull(); + expect(mg?.denied_at).toBeTruthy(); + const mgaCount = ( + getDb() + .prepare('SELECT COUNT(*) AS c FROM messaging_group_agents WHERE messaging_group_id = ?') + .get(pending.messaging_group_id) as { c: number } + ).c; + expect(mgaCount).toBe(0); + + // A follow-up mention on the denied channel: no new card, no new pending row. + deliverMock.mockClear(); + await routeInbound(groupMention('chat-deny', '@bot please')); + await new Promise((r) => setTimeout(r, 10)); + expect(deliverMock).not.toHaveBeenCalled(); + const stillPending = ( + getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number } + ).c; + expect(stillPending).toBe(0); + }); + + it('rejects clicks from an unauthorized user (prevents self-admit via forwarded card)', async () => { + const { routeInbound } = await import('../../router.js'); + const { getResponseHandlers } = await import('../../response-registry.js'); + + await routeInbound(groupMention('chat-unauth')); + await new Promise((r) => setTimeout(r, 10)); + const { getDb } = await import('../../db/connection.js'); + const pending = getDb() + .prepare('SELECT messaging_group_id FROM pending_channel_approvals') + .get() as { messaging_group_id: string }; + + for (const handler of getResponseHandlers()) { + const claimed = await handler({ + questionId: pending.messaging_group_id, + value: 'approve', + userId: 'random-bystander', + channelType: 'telegram', + platformId: 'dm-random', + threadId: null, + }); + if (claimed) break; + } + + // No wiring created, pending row preserved so a real approver can act on it. + const mgaCount = ( + getDb() + .prepare('SELECT COUNT(*) AS c FROM messaging_group_agents WHERE messaging_group_id = ?') + .get(pending.messaging_group_id) as { c: number } + ).c; + expect(mgaCount).toBe(0); + const stillPending = ( + getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number } + ).c; + expect(stillPending).toBe(1); + }); +}); + +describe('no-owner / no-agent failure modes', () => { + it('no owner → no card, no pending row (fresh-install bootstrap path)', async () => { + // Wipe the owner grant set up in the outer beforeEach. + const { getDb } = await import('../../db/connection.js'); + getDb().prepare('DELETE FROM user_roles').run(); + + const { routeInbound } = await import('../../router.js'); + await routeInbound(groupMention('chat-noowner')); + await new Promise((r) => setTimeout(r, 10)); + + expect(deliverMock).not.toHaveBeenCalled(); + const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c; + expect(count).toBe(0); + }); + + it('no agent groups → no card, no pending row', async () => { + const { getDb } = await import('../../db/connection.js'); + // Drop foreign-key-dependent rows first, then the agent group itself. + getDb().prepare('DELETE FROM user_roles').run(); + getDb().prepare('DELETE FROM agent_groups').run(); + + const { routeInbound } = await import('../../router.js'); + await routeInbound(groupMention('chat-noagent')); + await new Promise((r) => setTimeout(r, 10)); + + expect(deliverMock).not.toHaveBeenCalled(); + const count = (getDb().prepare('SELECT COUNT(*) AS c FROM pending_channel_approvals').get() as { c: number }).c; + expect(count).toBe(0); + }); +}); diff --git a/src/modules/permissions/channel-approval.ts b/src/modules/permissions/channel-approval.ts new file mode 100644 index 0000000..9c65f8e --- /dev/null +++ b/src/modules/permissions/channel-approval.ts @@ -0,0 +1,159 @@ +/** + * Unknown-channel registration flow. + * + * When the router hits an unwired messaging group AND the message was + * addressed to the bot (SDK-confirmed mention or DM), it calls + * `requestChannelApproval` instead of silently dropping. The flow: + * + * 1. Pick the target agent group we'd wire to (MVP: first by name). + * Multi-agent picker is a follow-up — see ACTION-ITEMS. + * 2. Pick an eligible approver (owner / admin) and a reachable DM for + * them, reusing the same primitives the sender-approval flow uses. + * 3. Deliver an Approve / Ignore card that names the target agent + * explicitly so the owner knows what they're wiring to. + * 4. Record a `pending_channel_approvals` row holding the original event + * so it can be re-routed on approve. + * + * On approve (handler in index.ts): + * - Create `messaging_group_agents` with MVP defaults + * (mention-sticky for groups / pattern='.' for DMs, + * sender_scope='known', ignored_message_policy='accumulate') + * - Add the triggering sender to `agent_group_members` so sender_scope + * doesn't bounce the replayed message into a sender-approval cascade + * - Delete the pending row, replay the original event + * + * On ignore: + * - Set `messaging_groups.denied_at = now()` so the router stops + * escalating on this channel until an admin explicitly re-wires + * - Delete the pending row + * + * Dedup: `pending_channel_approvals` PK on messaging_group_id. Second + * mention while pending silently dropped. + * + * Failure modes (log + no row, so a future attempt can try again): + * - No agent groups exist (install never set up a first agent). + * - No eligible approver in user_roles (no owner yet). + * - Approver has no reachable DM. + * - Delivery adapter missing. + */ +import { normalizeOptions, type RawOption } from '../../channels/ask-question.js'; +import { getAllAgentGroups } from '../../db/agent-groups.js'; +import { getMessagingGroup } from '../../db/messaging-groups.js'; +import { getDeliveryAdapter } from '../../delivery.js'; +import { log } from '../../log.js'; +import type { InboundEvent } from '../../router.js'; +import { pickApprovalDelivery, pickApprover } from '../approvals/primitive.js'; +import { createPendingChannelApproval, hasInFlightChannelApproval } from './db/pending-channel-approvals.js'; + +const APPROVAL_OPTIONS: RawOption[] = [ + { label: 'Approve', selectedLabel: '✅ Wired', value: 'approve' }, + { label: 'Ignore', selectedLabel: '🙅 Ignored', value: 'reject' }, +]; + +export interface RequestChannelApprovalInput { + messagingGroupId: string; + event: InboundEvent; +} + +export async function requestChannelApproval(input: RequestChannelApprovalInput): Promise { + const { messagingGroupId, event } = input; + + // In-flight dedup: don't spam the owner if the same unwired channel + // gets more mentions / DMs while a card is already pending. + if (hasInFlightChannelApproval(messagingGroupId)) { + log.debug('Channel registration already in flight — dropping retry', { + messagingGroupId, + }); + return; + } + + // MVP: pick the first agent group by name. Multi-agent systems will get + // a richer card later (user picks the target from a list). + const agentGroups = getAllAgentGroups(); + if (agentGroups.length === 0) { + log.warn('Channel registration skipped — no agent groups configured. Run /init-first-agent.', { + messagingGroupId, + }); + return; + } + const target = agentGroups[0]; + + // pickApprover takes the target agent group's id — gets scoped admins + + // global admins + owners. For fresh installs with only an owner, the + // owner is returned. + const approvers = pickApprover(target.id); + if (approvers.length === 0) { + log.warn('Channel registration skipped — no owner or admin configured', { + messagingGroupId, + targetAgentGroupId: target.id, + }); + return; + } + + const originMg = getMessagingGroup(messagingGroupId); + const originChannelType = originMg?.channel_type ?? ''; + const delivery = await pickApprovalDelivery(approvers, originChannelType); + if (!delivery) { + log.warn('Channel registration skipped — no DM channel for any approver', { + messagingGroupId, + targetAgentGroupId: target.id, + }); + return; + } + + const originName = originMg?.name ?? originMg?.platform_id ?? 'an unfamiliar chat'; + const isGroup = originMg?.is_group === 1; + + const title = isGroup ? '📣 Bot mentioned in new chat' : '💬 New direct message'; + const question = isGroup + ? `Your agent was mentioned in ${originName} on ${originChannelType}. Wire it to ${target.name} and let it engage?` + : `Someone DM'd your agent on ${originChannelType} (${originName}). Wire it to ${target.name} and let it respond?`; + + createPendingChannelApproval({ + messaging_group_id: messagingGroupId, + agent_group_id: target.id, + original_message: JSON.stringify(event), + approver_user_id: delivery.userId, + created_at: new Date().toISOString(), + }); + + const adapter = getDeliveryAdapter(); + if (!adapter) { + log.error('Channel registration row created but no delivery adapter is wired', { + messagingGroupId, + }); + return; + } + + try { + await adapter.deliver( + delivery.messagingGroup.channel_type, + delivery.messagingGroup.platform_id, + null, + 'chat-sdk', + JSON.stringify({ + type: 'ask_question', + // Use messaging_group_id as the questionId — it's unique per card + // (PK on pending table dedups) and lets the response handler look + // up the pending row directly without another index. + questionId: messagingGroupId, + title, + question, + options: normalizeOptions(APPROVAL_OPTIONS), + }), + ); + log.info('Channel registration card delivered', { + messagingGroupId, + targetAgentGroupId: target.id, + approver: delivery.userId, + }); + } catch (err) { + log.error('Channel registration card delivery failed', { + messagingGroupId, + err, + }); + } +} + +export const APPROVE_VALUE = 'approve'; +export const REJECT_VALUE = 'reject'; diff --git a/src/modules/permissions/db/pending-channel-approvals.ts b/src/modules/permissions/db/pending-channel-approvals.ts new file mode 100644 index 0000000..d3e665a --- /dev/null +++ b/src/modules/permissions/db/pending-channel-approvals.ts @@ -0,0 +1,52 @@ +/** + * CRUD for pending_channel_approvals — the in-flight state for the + * unknown-channel registration flow. A row exists while an owner-approval + * card is outstanding; it's deleted on approve (after wiring is created) + * or deny (after denied_at is set on the messaging_group). + * + * PRIMARY KEY on messaging_group_id gives free in-flight dedup. A second + * mention/DM while a card is pending resolves via + * `hasInFlightChannelApproval` in the request flow and drops silently + * instead of spamming the owner. + */ +import { getDb } from '../../../db/connection.js'; + +export interface PendingChannelApproval { + messaging_group_id: string; + agent_group_id: string; + original_message: string; + approver_user_id: string; + created_at: string; +} + +export function createPendingChannelApproval(row: PendingChannelApproval): void { + getDb() + .prepare( + `INSERT INTO pending_channel_approvals ( + messaging_group_id, agent_group_id, original_message, + approver_user_id, created_at + ) + VALUES ( + @messaging_group_id, @agent_group_id, @original_message, + @approver_user_id, @created_at + )`, + ) + .run(row); +} + +export function getPendingChannelApproval(messagingGroupId: string): PendingChannelApproval | undefined { + return getDb() + .prepare('SELECT * FROM pending_channel_approvals WHERE messaging_group_id = ?') + .get(messagingGroupId) as PendingChannelApproval | undefined; +} + +export function hasInFlightChannelApproval(messagingGroupId: string): boolean { + const row = getDb() + .prepare('SELECT 1 AS x FROM pending_channel_approvals WHERE messaging_group_id = ?') + .get(messagingGroupId) as { x: number } | undefined; + return row !== undefined; +} + +export function deletePendingChannelApproval(messagingGroupId: string): void { + getDb().prepare('DELETE FROM pending_channel_approvals WHERE messaging_group_id = ?').run(messagingGroupId); +} diff --git a/src/modules/permissions/index.ts b/src/modules/permissions/index.ts index d13797b..e2f100c 100644 --- a/src/modules/permissions/index.ts +++ b/src/modules/permissions/index.ts @@ -16,9 +16,14 @@ * access gate is not registered and core defaults to allow-all. */ import { recordDroppedMessage } from '../../db/dropped-messages.js'; +import { + createMessagingGroupAgent, + setMessagingGroupDeniedAt, +} from '../../db/messaging-groups.js'; import { routeInbound, setAccessGate, + setChannelRequestGate, setSenderResolver, setSenderScopeGate, type AccessGateResult, @@ -28,7 +33,12 @@ import { registerResponseHandler, type ResponsePayload } from '../../response-re import { log } from '../../log.js'; import type { MessagingGroup, MessagingGroupAgent } from '../../types.js'; import { canAccessAgentGroup } from './access.js'; +import { requestChannelApproval } from './channel-approval.js'; import { addMember } from './db/agent-group-members.js'; +import { + deletePendingChannelApproval, + getPendingChannelApproval, +} from './db/pending-channel-approvals.js'; import { deletePendingSenderApproval, getPendingSenderApproval } from './db/pending-sender-approvals.js'; import { hasAdminPrivilege } from './db/user-roles.js'; import { getUser, upsertUser } from './db/users.js'; @@ -253,3 +263,137 @@ async function handleSenderApprovalResponse(payload: ResponsePayload): Promise { + await requestChannelApproval({ messagingGroupId: mg.id, event }); +}); + +/** + * Response handler for the unknown-channel registration card. + * + * Claim rule: questionId matches a pending_channel_approvals row (keyed + * by messaging_group_id). If no such row, return false so downstream + * handlers get a shot. + * + * Approve: create the wiring with MVP defaults (mention-sticky for + * groups / pattern='.' for DMs; sender_scope='known'; + * ignored_message_policy='accumulate'), add the triggering sender as a + * member so sender_scope doesn't immediately bounce them into a + * sender-approval card, then replay the original event. + * + * Deny: set `messaging_groups.denied_at = now()` so future mentions on + * this channel drop silently until an admin explicitly wires it. + */ +async function handleChannelApprovalResponse(payload: ResponsePayload): Promise { + const row = getPendingChannelApproval(payload.questionId); + if (!row) return false; + + // Click-auth: same pattern as sender-approval (see commit 68058cb). + // Raw platform userId → namespace with channelType → must match the + // designated approver OR have admin privilege over the target agent. + const clickerId = payload.userId ? `${payload.channelType}:${payload.userId}` : null; + const isAuthorized = + clickerId !== null && (clickerId === row.approver_user_id || hasAdminPrivilege(clickerId, row.agent_group_id)); + if (!isAuthorized) { + log.warn('Channel registration click rejected — unauthorized clicker', { + messagingGroupId: row.messaging_group_id, + clickerId, + expectedApprover: row.approver_user_id, + }); + return true; // claim but take no action + } + const approverId = clickerId; + const approved = payload.value === 'approve'; + + if (!approved) { + setMessagingGroupDeniedAt(row.messaging_group_id, new Date().toISOString()); + deletePendingChannelApproval(row.messaging_group_id); + log.info('Channel registration denied', { + messagingGroupId: row.messaging_group_id, + agentGroupId: row.agent_group_id, + approverId, + }); + return true; + } + + // Rehydrate the original event to know (a) whether it was a DM or group + // (chooses engage_mode default), and (b) who the triggering sender was + // (auto-member-add so sender_scope='known' doesn't bounce the replay). + let event: InboundEvent; + try { + event = JSON.parse(row.original_message) as InboundEvent; + } catch (err) { + log.error('Channel registration: failed to parse stored event', { + messagingGroupId: row.messaging_group_id, + err, + }); + deletePendingChannelApproval(row.messaging_group_id); + return true; + } + + // Decide engage_mode from the original event. DMs (`isMention=true` & + // not in a group) get `pattern='.'` (always respond). Group mentions + // get `mention-sticky` (respond now + follow the thread). + // + // We can't read `mg.is_group` reliably here because we only auto-create + // the mg with `is_group=0` on first sight — the adapter hasn't told us + // yet whether it's actually a group. Fall back to the InboundEvent's + // `threadId`: a non-null threadId implies a threaded platform (Slack + // channel thread, Discord thread), which we treat as a group. + const isGroup = event.threadId !== null; + const engageMode: MessagingGroupAgent['engage_mode'] = isGroup ? 'mention-sticky' : 'pattern'; + const engagePattern = isGroup ? null : '.'; + + const mgaId = `mga-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + createMessagingGroupAgent({ + id: mgaId, + messaging_group_id: row.messaging_group_id, + agent_group_id: row.agent_group_id, + engage_mode: engageMode, + engage_pattern: engagePattern, + sender_scope: 'known', + ignored_message_policy: 'accumulate', + session_mode: 'shared', + priority: 0, + created_at: new Date().toISOString(), + }); + log.info('Channel registration approved — wiring created', { + messagingGroupId: row.messaging_group_id, + agentGroupId: row.agent_group_id, + mgaId, + engageMode, + approverId, + }); + + // Auto-admit the triggering sender. Without this, the replay below + // would bounce through sender-approval (sender_scope='known' + + // sender-is-not-a-member). + const senderUserId = extractAndUpsertUser(event); + if (senderUserId) { + addMember({ + user_id: senderUserId, + agent_group_id: row.agent_group_id, + added_by: approverId, + added_at: new Date().toISOString(), + }); + } + + // Clear the pending row BEFORE replay so the gate check on the second + // attempt sees a wired channel (agentCount > 0) and takes the fan-out + // path normally. + deletePendingChannelApproval(row.messaging_group_id); + + try { + await routeInbound(event); + } catch (err) { + log.error('Failed to replay message after channel approval', { + messagingGroupId: row.messaging_group_id, + err, + }); + } + return true; +} + +registerResponseHandler(handleChannelApprovalResponse); diff --git a/src/router.ts b/src/router.ts index a3e8f06..4289f1f 100644 --- a/src/router.ts +++ b/src/router.ts @@ -20,7 +20,11 @@ import { getChannelAdapter } from './channels/channel-registry.js'; import { getAgentGroup } from './db/agent-groups.js'; import { recordDroppedMessage } from './db/dropped-messages.js'; -import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js'; +import { + createMessagingGroup, + getMessagingGroupAgents, + getMessagingGroupWithAgentCount, +} from './db/messaging-groups.js'; import { findSessionForAgent } from './db/sessions.js'; import { startTypingRefresh } from './modules/typing/index.js'; import { log } from './log.js'; @@ -123,6 +127,27 @@ export function setSenderScopeGate(fn: SenderScopeGateFn): void { senderScopeGate = fn; } +/** + * Channel-registration hook. Runs when the router sees a mention/DM on a + * messaging group that has no wirings AND hasn't been denied. The hook is + * expected to escalate to an owner (card, etc.) and arrange for future + * replay via routeInbound after approval. Fire-and-forget from the + * router's perspective. + * + * Registered by the permissions module. Without the module the router + * silently records the drop with reason='no_agent_wired' and moves on. + */ +export type ChannelRequestGateFn = (mg: MessagingGroup, event: InboundEvent) => Promise; + +let channelRequestGate: ChannelRequestGateFn | null = null; + +export function setChannelRequestGate(fn: ChannelRequestGateFn): void { + if (channelRequestGate) { + log.warn('Channel-request gate overwritten'); + } + channelRequestGate = fn; +} + function safeParseContent(raw: string): { text?: string; sender?: string; senderId?: string } { try { return JSON.parse(raw); @@ -143,10 +168,21 @@ export async function routeInbound(event: InboundEvent): Promise { event = { ...event, threadId: null }; } - // 1. Resolve messaging group - let mg = getMessagingGroupByPlatform(event.channelType, event.platformId); + const isMention = event.message.isMention === true; - if (!mg) { + // 1. Combined lookup: messaging_group row + count of wired agents in a + // single query. Cheap short-circuit for the common "unwired channel" + // case — one DB read and we're out, no auto-create, no sender + // resolution, no log spam. + const found = getMessagingGroupWithAgentCount(event.channelType, event.platformId); + + let mg: MessagingGroup; + let agentCount: number; + if (!found) { + // No messaging_groups row. Auto-create only when the message warrants + // attention (the bot was addressed — @mention or DM). Plain chatter in + // channels we merely sit in stays silent — no row, no DB writes. + if (!isMention) return; const mgId = `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; mg = { id: mgId, @@ -154,10 +190,8 @@ export async function routeInbound(event: InboundEvent): Promise { platform_id: event.platformId, name: null, is_group: 0, - // Let the schema default (currently 'request_approval') apply rather - // than hardcoding 'strict' — the schema is the source of truth for - // the default policy. See migration 011. unknown_sender_policy: 'request_approval', + denied_at: null, created_at: new Date().toISOString(), }; createMessagingGroup(mg); @@ -166,6 +200,51 @@ export async function routeInbound(event: InboundEvent): Promise { channelType: event.channelType, platformId: event.platformId, }); + agentCount = 0; + } else { + mg = found.mg; + agentCount = found.agentCount; + } + + // 1b. No wirings — either silent drop (plain chatter / denied channel) or + // escalate to owner for channel-registration approval. + if (agentCount === 0) { + if (!isMention) return; + if (mg.denied_at) { + log.debug('Message dropped — channel was denied by owner', { + messagingGroupId: mg.id, + deniedAt: mg.denied_at, + }); + return; + } + + const parsed = safeParseContent(event.message.content); + recordDroppedMessage({ + channel_type: event.channelType, + platform_id: event.platformId, + user_id: null, + sender_name: parsed.sender ?? null, + reason: 'no_agent_wired', + messaging_group_id: mg.id, + agent_group_id: null, + }); + + if (channelRequestGate) { + // Fire-and-forget escalation. The gate is expected to build a card, + // persist pending_channel_approvals, and replay the event via + // routeInbound after approval. Errors are logged internally — the + // user's message still stays dropped here either way. + void channelRequestGate(mg, event).catch((err) => + log.error('Channel-request gate threw', { messagingGroupId: mg.id, err }), + ); + } else { + log.warn('MESSAGE DROPPED — no agent groups wired and no channel-request gate registered', { + messagingGroupId: mg.id, + channelType: event.channelType, + platformId: event.platformId, + }); + } + return; } // 2. Sender resolution (permissions module upserts the users row as a @@ -173,27 +252,9 @@ export async function routeInbound(event: InboundEvent): Promise { // Without the module, userId is null — downstream tolerates it. const userId: string | null = senderResolver ? senderResolver(event) : null; - // 3. Resolve agent groups wired to this messaging group. Structural - // drops record to dropped_messages for audit. + // 3. Fetch wired agents in full (we already know the count is > 0; now + // we need their actual rows for fan-out). const agents = getMessagingGroupAgents(mg.id); - if (agents.length === 0) { - log.warn('MESSAGE DROPPED — no agent groups wired to this channel. Run setup register step to configure.', { - messagingGroupId: mg.id, - channelType: event.channelType, - platformId: event.platformId, - }); - const parsed = safeParseContent(event.message.content); - recordDroppedMessage({ - channel_type: event.channelType, - platform_id: event.platformId, - user_id: userId, - sender_name: parsed.sender ?? null, - reason: 'no_agent_wired', - messaging_group_id: mg.id, - agent_group_id: null, - }); - return; - } // 4. Fan-out: evaluate each wired agent independently against engage_mode, // sender_scope, and access gate. An agent that engages gets its own @@ -201,12 +262,18 @@ export async function routeInbound(event: InboundEvent): Promise { // ignored_message_policy='accumulate' still gets the message stored in // its session (trigger=0) so the context is available when it does // engage later. Drop policy = skip silently. + // + // Subscribe (for mention-sticky wirings on threaded platforms) fires + // once per message from this loop — the first engaging mention-sticky + // wiring triggers adapter.subscribe(...); subsequent wirings don't + // re-subscribe (chat.subscribe is idempotent anyway, but the flag + // avoids the extra await). const parsed = safeParseContent(event.message.content); const messageText = parsed.text ?? ''; - const isMention = event.message.isMention === true; let engagedCount = 0; let accumulatedCount = 0; + let subscribed = false; for (const agent of agents) { const agentGroup = getAgentGroup(agent.agent_group_id); @@ -220,6 +287,27 @@ export async function routeInbound(event: InboundEvent): Promise { if (engages && accessOk && scopeOk) { await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, true); engagedCount++; + + // Mention-sticky: ask the adapter to subscribe the thread so the + // platform's subscribed-message path carries follow-ups without + // requiring another @mention. Threaded-adapter only; DMs and + // non-threaded platforms skip. + if ( + !subscribed && + agent.engage_mode === 'mention-sticky' && + adapter?.supportsThreads && + adapter.subscribe && + event.threadId !== null && + mg.is_group !== 0 + ) { + subscribed = true; + // Fire-and-forget — subscribe is platform-side bookkeeping and + // shouldn't block message routing. Errors are logged inside the + // adapter (or by the promise rejection handler below). + void adapter.subscribe(event.platformId, event.threadId).catch((err) => { + log.warn('adapter.subscribe failed', { channelType: event.channelType, threadId: event.threadId, err }); + }); + } } else if (agent.ignored_message_policy === 'accumulate') { await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, false); accumulatedCount++; diff --git a/src/types.ts b/src/types.ts index b2674da..b3e2470 100644 --- a/src/types.ts +++ b/src/types.ts @@ -17,6 +17,16 @@ export interface MessagingGroup { name: string | null; is_group: number; // 0 | 1 unknown_sender_policy: UnknownSenderPolicy; + /** + * When set, the owner explicitly denied registering this channel — the + * router drops silently and does not re-escalate. Cleared by any explicit + * wiring mutation (admin command). See migration 012. + * + * Optional on the TS type so pre-migration-012 callers that build + * MessagingGroup objects in code (fixtures, etc.) don't need to update; + * the column itself defaults to NULL in SQLite. + */ + denied_at?: string | null; created_at: string; }