diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index bbf7f37..9343258 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -5,45 +5,8 @@ * Two patterns: native adapters (implement directly) or Chat SDK bridge (wrap a Chat SDK adapter). */ -/** Configuration for a registered conversation (messaging group + agent wiring). */ -export interface ConversationConfig { - platformId: string; - agentGroupId: string; - /** - * When does the agent engage on messages from this conversation? - * - * 'pattern' — regex test against message text; engagePattern='.' - * means "always" (match everything) - * 'mention' — fires only on @mention - * 'mention-sticky' — fires on @mention, then auto-subscribes to the thread - * and treats subsequent messages as engage-all. - * Threaded platforms only (Slack/Discord/Linear). - */ - engageMode: 'pattern' | 'mention' | 'mention-sticky'; - /** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */ - engagePattern?: string | null; - /** - * What to do with messages this wiring doesn't engage on. - * - * 'drop' — discard silently - * 'accumulate' — still forward to the host so the router can store the - * message in this agent's session with trigger=0. It - * rides along as context when the agent next wakes, but - * doesn't wake it on its own. - * - * The bridge reads this to decide whether to forward a non-engaging - * message at all — if any wiring on a conversation has 'accumulate', the - * bridge forwards and lets the router apply the per-wiring decision. - */ - ignoredMessagePolicy?: 'drop' | 'accumulate'; - sessionMode: 'shared' | 'per-thread' | 'agent-shared'; -} - /** Passed to the adapter at setup time. */ export interface ChannelSetup { - /** Known conversations from central DB. */ - conversations: ConversationConfig[]; - /** Called when an inbound message arrives from the platform. */ onInbound(platformId: string, threadId: string | null, message: InboundMessage): void | Promise; @@ -125,7 +88,17 @@ export interface ChannelAdapter { // Optional setTyping?(platformId: string, threadId: string | null): Promise; syncConversations?(): Promise; - updateConversations?(conversations: ConversationConfig[]): void; + + /** + * Subscribe the bot to a thread so follow-up messages route via the + * platform's "subscribed message" path (onSubscribedMessage in Chat SDK). + * Called by the router when a mention-sticky wiring first engages in a + * thread. Idempotent: calling twice on the same thread is a no-op. + * + * Platforms without a subscription concept can omit this; the router + * treats absence as a no-op. + */ + subscribe?(platformId: string, threadId: string): Promise; /** * Open (or fetch) a DM with this user, returning the platform_id of the diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index 265a372..5121c64 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -64,8 +64,6 @@ function createMockAdapter( }, async setTyping() {}, - - updateConversations() {}, }; } diff --git a/src/channels/chat-sdk-bridge.test.ts b/src/channels/chat-sdk-bridge.test.ts index 3c6caa8..7ddad4f 100644 --- a/src/channels/chat-sdk-bridge.test.ts +++ b/src/channels/chat-sdk-bridge.test.ts @@ -2,37 +2,19 @@ import { describe, expect, it } from 'vitest'; import type { Adapter } from 'chat'; -import type { ConversationConfig } from './adapter.js'; -import { createChatSdkBridge, shouldEngage, type EngageSource } from './chat-sdk-bridge.js'; +import { createChatSdkBridge } from './chat-sdk-bridge.js'; function stubAdapter(partial: Partial): Adapter { return { name: 'stub', ...partial } as unknown as Adapter; } -function cfg( - partial: Partial & { engageMode: ConversationConfig['engageMode'] }, -): ConversationConfig { - return { - platformId: partial.platformId ?? 'C1', - agentGroupId: partial.agentGroupId ?? 'ag-1', - engageMode: partial.engageMode, - engagePattern: partial.engagePattern ?? null, - ignoredMessagePolicy: partial.ignoredMessagePolicy ?? 'drop', - sessionMode: partial.sessionMode ?? 'shared', - }; -} - -function mapFor(...configs: ConversationConfig[]): Map { - const map = new Map(); - for (const c of configs) { - const existing = map.get(c.platformId); - if (existing) existing.push(c); - else map.set(c.platformId, [c]); - } - return map; -} - describe('createChatSdkBridge', () => { + // The bridge is now transport-only: forward inbound events, relay outbound + // ops. All per-wiring engage / accumulate / drop / subscribe decisions live + // in the router (src/router.ts routeInbound / evaluateEngage) and are + // exercised by host-core.test.ts end-to-end. These tests only cover the + // bridge's narrow, platform-adjacent surface. + it('omits openDM when the underlying Chat SDK adapter has none', () => { const bridge = createChatSdkBridge({ adapter: stubAdapter({}), @@ -59,76 +41,12 @@ describe('createChatSdkBridge', () => { expect(openDMCalls).toEqual(['user-42']); expect(platformId).toBe('stub:user-42'); }); -}); -describe('shouldEngage (bridge-level flood gate + subscribe signal)', () => { - // Per-wiring engage_mode / engage_pattern / ignored_message_policy - // semantics live in the router (evaluateEngage / routeInbound fan-out). - // These tests only cover the bridge's two responsibilities: should we - // forward at all, and should we call thread.subscribe(). - - describe('flood gate — unknown conversation', () => { - const empty = new Map(); - const carriedSources: EngageSource[] = ['subscribed', 'mention', 'dm']; - for (const source of carriedSources) { - it(`forwards for source='${source}' (may be a newly-auto-created channel or a channel-registration trigger)`, () => { - expect(shouldEngage(empty, 'C-new', source)).toEqual({ forward: true, stickySubscribe: false }); - }); - } - it("DROPS for source='new-message' (onNewMessage(/./) fires for every unsubscribed thread the bot can see — would flood)", () => { - expect(shouldEngage(empty, 'C-unwired', 'new-message')).toEqual({ - forward: false, - stickySubscribe: false, - }); - }); - }); - - describe('known conversation — bridge forwards regardless of engage mode', () => { - // Policy lives in the router now. The bridge only knows "has any wiring". - const conv = mapFor(cfg({ engageMode: 'mention' })); - for (const source of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) { - it(`forwards for source='${source}' — router will decide engage / accumulate / drop per wiring`, () => { - expect(shouldEngage(conv, 'C1', source).forward).toBe(true); - }); - } - }); - - describe('stickySubscribe signal', () => { - it('true when any mention-sticky wiring exists AND source is mention', () => { - const conv = mapFor(cfg({ engageMode: 'mention-sticky' })); - expect(shouldEngage(conv, 'C1', 'mention').stickySubscribe).toBe(true); - }); - - it('true when any mention-sticky wiring exists AND source is dm', () => { - const conv = mapFor(cfg({ engageMode: 'mention-sticky' })); - expect(shouldEngage(conv, 'C1', 'dm').stickySubscribe).toBe(true); - }); - - it('false on subscribed — thread is already subscribed, no need to re-subscribe', () => { - const conv = mapFor(cfg({ engageMode: 'mention-sticky' })); - expect(shouldEngage(conv, 'C1', 'subscribed').stickySubscribe).toBe(false); - }); - - it('false on new-message — mention-sticky requires an explicit mention to start', () => { - const conv = mapFor(cfg({ engageMode: 'mention-sticky' })); - expect(shouldEngage(conv, 'C1', 'new-message').stickySubscribe).toBe(false); - }); - - it('false for plain mention / pattern wirings (not sticky)', () => { - const mentionConv = mapFor(cfg({ engageMode: 'mention' })); - const patternConv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '.' })); - for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) { - expect(shouldEngage(mentionConv, 'C1', s).stickySubscribe).toBe(false); - expect(shouldEngage(patternConv, 'C1', s).stickySubscribe).toBe(false); - } - }); - - it('fires on coarse union — mixed wirings where any one is mention-sticky', () => { - const conv = mapFor( - cfg({ agentGroupId: 'ag-a', engageMode: 'mention' }), - cfg({ agentGroupId: 'ag-b', engageMode: 'mention-sticky' }), - ); - expect(shouldEngage(conv, 'C1', 'mention').stickySubscribe).toBe(true); + it('exposes subscribe (lets the router initiate thread subscription on mention-sticky engage)', () => { + const bridge = createChatSdkBridge({ + adapter: stubAdapter({}), + supportsThreads: true, }); + expect(typeof bridge.subscribe).toBe('function'); }); }); diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 9bed968..ef2195e 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -21,7 +21,7 @@ import { SqliteStateAdapter } from '../state-sqlite.js'; import { registerWebhookAdapter } from '../webhook-server.js'; import { getAskQuestionRender } from '../db/sessions.js'; import { normalizeOptions, type NormalizedOption } from './ask-question.js'; -import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js'; +import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js'; /** Adapter with optional gateway support (e.g., Discord). */ interface GatewayAdapter extends Adapter { @@ -65,99 +65,14 @@ export interface ChatSdkBridgeConfig { transformOutboundText?: (text: string) => string; } -/** - * Which Chat SDK handler delivered this message. Determines which engage modes - * can fire. - * - * - `subscribed` — `onSubscribedMessage`. Thread is already subscribed. - * Every wiring mode (mention / mention-sticky / pattern) - * evaluates normally. - * - `mention` — `onNewMention`. Bot was @-mentioned in an unsubscribed - * thread. mention + mention-sticky engage; pattern runs - * the regex. - * - `dm` — `onDirectMessage`. Unsubscribed DM. Treated like a - * mention for engagement purposes. - * - `new-message` — `onNewMessage(/./, …)`. Plain non-mention non-DM - * message in an unsubscribed thread. Only `pattern` - * wirings can fire here. mention / mention-sticky ignore - * this source (they require an explicit mention). - */ -export type EngageSource = 'subscribed' | 'mention' | 'dm' | 'new-message'; - -/** - * Bridge-level forwarding decision — a coarse flood gate, not policy. - * - * The router owns per-wiring engage_mode / engage_pattern / sender_scope / - * ignored_message_policy (see `evaluateEngage` in src/router.ts). The bridge - * only answers two questions: - * - * 1. `forward` — is this message worth sending to the host at all? - * - Known channel (any wiring): yes. Router will decide what engages / - * accumulates / drops per wiring. - * - Unknown channel: yes for subscribed / mention / DM (triggers the - * router's auto-create or channel-registration flow); no for - * `new-message`. onNewMessage(/./, …) fires for every message in - * every unsubscribed thread the bot can see, including channels the - * bot merely joined but was never wired to — forwarding everything - * would flood the host. - * - * 2. `stickySubscribe` — should the bridge call `thread.subscribe()`? - * - Yes if ANY wiring on this channel is mention-sticky AND the - * source is an actual mention / DM. Coarse (no per-wiring picking) - * but harmless: subscription is idempotent and one call serves - * every mention-sticky wiring on the channel. Once subscribed, - * follow-ups route through onSubscribedMessage. - * - * Exported for testability — see `chat-sdk-bridge.test.ts`. - */ -export function shouldEngage( - conversations: Map, - channelId: string, - source: EngageSource, -): { forward: boolean; stickySubscribe: boolean } { - const configs = conversations.get(channelId); - - if (!configs || configs.length === 0) { - return { forward: source !== 'new-message', stickySubscribe: false }; - } - - const stickySubscribe = - (source === 'mention' || source === 'dm') && configs.some((cfg) => cfg.engageMode === 'mention-sticky'); - - return { forward: true, stickySubscribe }; -} - export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter { const { adapter } = config; const transformText = (t: string): string => (config.transformOutboundText ? config.transformOutboundText(t) : t); let chat: Chat; let state: SqliteStateAdapter; let setupConfig: ChannelSetup; - // Keyed by platformId. Multiple agents may be wired to the same - // conversation — this holds all their configs so the bridge can apply the - // most-permissive engage rule at gate time and only subscribe when at - // least one wiring requested 'mention-sticky'. - // - // STALENESS: populated at setup() and updateConversations(). If wirings - // change after setup, updateConversations() must be called to refresh - // (ACTION-ITEMS item 17). - let conversations: Map; let gatewayAbort: AbortController | null = null; - function buildConversationMap(configs: ConversationConfig[]): Map { - const map = new Map(); - for (const conv of configs) { - const existing = map.get(conv.platformId); - if (existing) existing.push(conv); - else map.set(conv.platformId, [conv]); - } - return map; - } - - function engageDecision(channelId: string, source: EngageSource): { forward: boolean; stickySubscribe: boolean } { - return shouldEngage(conversations, channelId, source); - } - async function messageToInbound(message: ChatMessage, isMention: boolean): Promise { // eslint-disable-next-line @typescript-eslint/no-explicit-any const serialized = message.toJSON() as Record; @@ -225,7 +140,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter async setup(hostConfig: ChannelSetup) { setupConfig = hostConfig; - conversations = buildConversationMap(hostConfig.conversations); state = new SqliteStateAdapter(); @@ -237,29 +151,25 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter logger: 'silent', }); - // Four SDK dispatch paths — bridge just forwards; router does all - // per-wiring engage / accumulate / drop decisions. isMention is the - // load-bearing signal (see evaluateEngage in src/router.ts). + // Four SDK dispatch paths — bridge just forwards. All per-wiring + // engage / accumulate / drop / subscribe decisions live in the host + // router (src/router.ts routeInbound / evaluateEngage). The bridge + // only resolves channel ids and sets the platform-confirmed isMention + // flag that routeInbound evaluates; the router calls back into + // bridge.subscribe(...) when a mention-sticky wiring engages. // Subscribed threads — every message in a thread we've previously // engaged. Carry the SDK's `message.isMention` through so mention-mode // wirings still fire on in-thread mentions. chat.onSubscribedMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const decision = engageDecision(channelId, 'subscribed'); - if (!decision.forward) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, message.isMention === true)); }); // @mention in an unsubscribed thread — SDK-confirmed bot mention. chat.onNewMention(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const decision = engageDecision(channelId, 'mention'); - if (!decision.forward) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true)); - if (decision.stickySubscribe) { - await thread.subscribe(); - } }); // DMs — by definition addressed to the bot. Thread id flows through @@ -268,19 +178,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter // is_group=0 short-circuit. chat.onDirectMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const decision = engageDecision(channelId, 'dm'); log.info('Inbound DM received', { adapter: adapter.name, channelId, sender: (message.author as any)?.fullName ?? (message.author as any)?.userId ?? 'unknown', threadId: thread.id, - forward: decision.forward, }); - if (!decision.forward) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true)); - if (decision.stickySubscribe) { - await thread.subscribe(); - } }); // Plain messages in unsubscribed threads. @@ -288,14 +192,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter // Chat SDK dispatch (handling-events.mdx §"Handler dispatch order") is // exclusive: subscribed → onSubscribedMessage; unsubscribed+mention → // onNewMention; unsubscribed+pattern-match → onNewMessage. Registering - // with `/./` lets the router see every plain message on wired channels - // (needed for engage_mode='pattern' + ignored_message_policy='accumulate' - // wirings). `shouldEngage` drops unknown channels on this source - // specifically so we don't flood from channels the bot merely joined. + // with `/./` lets the router see every plain message on every + // unsubscribed thread the bot can see. The router short-circuits via + // getMessagingGroupWithAgentCount (~1 DB read) for unwired channels, + // so forwarding every one is cheap enough to not need a bridge-side + // flood gate. chat.onNewMessage(/./, async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); - const decision = engageDecision(channelId, 'new-message'); - if (!decision.forward) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, false)); }); @@ -468,8 +371,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter return true; }, - updateConversations(configs: ConversationConfig[]) { - conversations = buildConversationMap(configs); + async subscribe(_platformId: string, threadId: string) { + // Chat SDK's subscription state lives on the StateAdapter (not on the + // Chat instance itself). SqliteStateAdapter.subscribe is idempotent — + // a second call on an already-subscribed thread is a no-op. threadId + // is the SDK's thread id, which is what the router already has from + // the original inbound event. + await state.subscribe(threadId); }, }; diff --git a/src/db/messaging-groups.ts b/src/db/messaging-groups.ts index db12583..3f9b2c4 100644 --- a/src/db/messaging-groups.ts +++ b/src/db/messaging-groups.ts @@ -37,6 +37,37 @@ export function getMessagingGroupByPlatform(channelType: string, platformId: str .get(channelType, platformId) as MessagingGroup | undefined; } +/** + * Combined lookup for the router's fast-drop path. Returns the messaging + * group (if it exists) and a count of wired agents in one query — lets + * `routeInbound` short-circuit messages for unwired / unknown channels + * with a single DB read instead of four (mg lookup, sender upsert, agents + * lookup, dropped_messages insert). + * + * Returns `null` when no messaging_groups row exists for this channel. + * Returns `{ mg, agentCount: 0 }` when the row exists but has no wired + * agents. Uses the `UNIQUE(channel_type, platform_id)` index plus the + * `UNIQUE(messaging_group_id, agent_group_id)` index for the JOIN — both + * covered by existing SQLite auto-indexes from the UNIQUE constraints. + */ +export function getMessagingGroupWithAgentCount( + channelType: string, + platformId: string, +): { mg: MessagingGroup; agentCount: number } | null { + const row = getDb() + .prepare( + `SELECT mg.*, COUNT(mga.id) AS agent_count + FROM messaging_groups mg + LEFT JOIN messaging_group_agents mga ON mga.messaging_group_id = mg.id + WHERE mg.channel_type = ? AND mg.platform_id = ? + GROUP BY mg.id`, + ) + .get(channelType, platformId) as (MessagingGroup & { agent_count: number }) | undefined; + if (!row) return null; + const { agent_count, ...mg } = row; + return { mg: mg as MessagingGroup, agentCount: agent_count }; +} + export function getAllMessagingGroups(): MessagingGroup[] { return getDb().prepare('SELECT * FROM messaging_groups ORDER BY name').all() as MessagingGroup[]; } diff --git a/src/host-core.test.ts b/src/host-core.test.ts index 33d37ff..da2fd37 100644 --- a/src/host-core.test.ts +++ b/src/host-core.test.ts @@ -244,26 +244,42 @@ describe('router', () => { expect(wakeContainer).toHaveBeenCalled(); }); - it('should auto-create messaging group for unknown platform', async () => { + it('auto-creates messaging group only when the bot is addressed (mention/DM)', async () => { + // The router's no-mg branch is escalation-gated: plain chatter on an + // unknown channel stays silent (no DB writes) so a bot that sits in + // many unwired channels doesn't bloat messaging_groups. Only explicit + // mentions and DMs trigger auto-create. const { routeInbound } = await import('./router.js'); + const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js'); - const event: InboundEvent = { + // Plain message on unknown channel — should NOT auto-create. + await routeInbound({ channelType: 'slack', - platformId: 'C-NEW-CHANNEL', + platformId: 'C-PLAIN', threadId: null, message: { - id: 'msg-2', + id: 'msg-plain', kind: 'chat', content: JSON.stringify({ sender: 'User', text: 'Hi' }), timestamp: now(), }, - }; + }); + expect(getMessagingGroupByPlatform('slack', 'C-PLAIN')).toBeUndefined(); - await routeInbound(event); - - const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js'); - const mg = getMessagingGroupByPlatform('slack', 'C-NEW-CHANNEL'); - expect(mg).toBeDefined(); + // Mention on unknown channel — SHOULD auto-create (next step: channel-registration flow). + await routeInbound({ + channelType: 'slack', + platformId: 'C-MENTIONED', + threadId: null, + message: { + id: 'msg-mentioned', + kind: 'chat', + content: JSON.stringify({ sender: 'User', text: '@bot hi' }), + timestamp: now(), + isMention: true, + }, + }); + expect(getMessagingGroupByPlatform('slack', 'C-MENTIONED')).toBeDefined(); }); it('should route multiple messages to the same session', async () => { diff --git a/src/index.ts b/src/index.ts index 595ba1b..7c5ab24 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,7 +9,6 @@ import path from 'path'; import { DATA_DIR } from './config.js'; import { initDb } from './db/connection.js'; import { runMigrations } from './db/migrations/index.js'; -import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messaging-groups.js'; import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js'; import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js'; import { startHostSweep, stopHostSweep } from './host-sweep.js'; @@ -52,7 +51,7 @@ import './channels/index.js'; // append registry-based modules. Imported for side effects (registrations). import './modules/index.js'; -import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js'; +import type { ChannelAdapter, ChannelSetup } from './channels/adapter.js'; import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js'; async function main(): Promise { @@ -70,9 +69,7 @@ async function main(): Promise { // 3. Channel adapters await initChannelAdapters((adapter: ChannelAdapter): ChannelSetup => { - const conversations = buildConversationConfigs(adapter.channelType); return { - conversations, onInbound(platformId, threadId, message) { routeInbound({ channelType: adapter.channelType, @@ -151,28 +148,6 @@ async function main(): Promise { log.info('NanoClaw running'); } -/** Build ConversationConfig[] for a channel type from the central DB. */ -function buildConversationConfigs(channelType: string): ConversationConfig[] { - const groups = getMessagingGroupsByChannel(channelType); - const configs: ConversationConfig[] = []; - - for (const mg of groups) { - const agents = getMessagingGroupAgents(mg.id); - for (const agent of agents) { - configs.push({ - platformId: mg.platform_id, - agentGroupId: agent.agent_group_id, - engageMode: agent.engage_mode, - engagePattern: agent.engage_pattern, - ignoredMessagePolicy: agent.ignored_message_policy, - sessionMode: agent.session_mode, - }); - } - } - - return configs; -} - /** Graceful shutdown. */ async function shutdown(signal: string): Promise { log.info('Shutdown signal received', { signal }); diff --git a/src/router.ts b/src/router.ts index a3e8f06..1d819c0 100644 --- a/src/router.ts +++ b/src/router.ts @@ -20,7 +20,11 @@ import { getChannelAdapter } from './channels/channel-registry.js'; import { getAgentGroup } from './db/agent-groups.js'; import { recordDroppedMessage } from './db/dropped-messages.js'; -import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js'; +import { + createMessagingGroup, + getMessagingGroupAgents, + getMessagingGroupWithAgentCount, +} from './db/messaging-groups.js'; import { findSessionForAgent } from './db/sessions.js'; import { startTypingRefresh } from './modules/typing/index.js'; import { log } from './log.js'; @@ -143,10 +147,21 @@ export async function routeInbound(event: InboundEvent): Promise { event = { ...event, threadId: null }; } - // 1. Resolve messaging group - let mg = getMessagingGroupByPlatform(event.channelType, event.platformId); + const isMention = event.message.isMention === true; + + // 1. Combined lookup: messaging_group row + count of wired agents in a + // single query. Cheap short-circuit for the common "unwired channel" + // case — one DB read and we're out, no auto-create, no sender + // resolution, no log spam. + const found = getMessagingGroupWithAgentCount(event.channelType, event.platformId); + + let mg: MessagingGroup; + if (!found) { + // No messaging_groups row. Auto-create only when the message warrants + // attention (the bot was addressed — @mention or DM). Plain chatter in + // channels we merely sit in stays silent — no row, no DB writes. + if (!isMention) return; - if (!mg) { const mgId = `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; mg = { id: mgId, @@ -154,9 +169,6 @@ export async function routeInbound(event: InboundEvent): Promise { platform_id: event.platformId, name: null, is_group: 0, - // Let the schema default (currently 'request_approval') apply rather - // than hardcoding 'strict' — the schema is the source of truth for - // the default policy. See migration 011. unknown_sender_policy: 'request_approval', created_at: new Date().toISOString(), }; @@ -166,6 +178,30 @@ export async function routeInbound(event: InboundEvent): Promise { channelType: event.channelType, platformId: event.platformId, }); + } else { + mg = found.mg; + if (found.agentCount === 0) { + // Messaging group exists but has no wirings. Stay silent for plain + // messages; only log + record on explicit mention/DM so admins can + // see that someone tried to reach the bot on an unwired channel. + if (!isMention) return; + log.warn('MESSAGE DROPPED — no agent groups wired to this channel. Run setup register step to configure.', { + messagingGroupId: mg.id, + channelType: event.channelType, + platformId: event.platformId, + }); + const parsed = safeParseContent(event.message.content); + recordDroppedMessage({ + channel_type: event.channelType, + platform_id: event.platformId, + user_id: null, + sender_name: parsed.sender ?? null, + reason: 'no_agent_wired', + messaging_group_id: mg.id, + agent_group_id: null, + }); + return; + } } // 2. Sender resolution (permissions module upserts the users row as a @@ -173,27 +209,9 @@ export async function routeInbound(event: InboundEvent): Promise { // Without the module, userId is null — downstream tolerates it. const userId: string | null = senderResolver ? senderResolver(event) : null; - // 3. Resolve agent groups wired to this messaging group. Structural - // drops record to dropped_messages for audit. + // 3. Fetch wired agents in full (we already know the count is > 0; now + // we need their actual rows for fan-out). const agents = getMessagingGroupAgents(mg.id); - if (agents.length === 0) { - log.warn('MESSAGE DROPPED — no agent groups wired to this channel. Run setup register step to configure.', { - messagingGroupId: mg.id, - channelType: event.channelType, - platformId: event.platformId, - }); - const parsed = safeParseContent(event.message.content); - recordDroppedMessage({ - channel_type: event.channelType, - platform_id: event.platformId, - user_id: userId, - sender_name: parsed.sender ?? null, - reason: 'no_agent_wired', - messaging_group_id: mg.id, - agent_group_id: null, - }); - return; - } // 4. Fan-out: evaluate each wired agent independently against engage_mode, // sender_scope, and access gate. An agent that engages gets its own @@ -201,12 +219,18 @@ export async function routeInbound(event: InboundEvent): Promise { // ignored_message_policy='accumulate' still gets the message stored in // its session (trigger=0) so the context is available when it does // engage later. Drop policy = skip silently. + // + // Subscribe (for mention-sticky wirings on threaded platforms) fires + // once per message from this loop — the first engaging mention-sticky + // wiring triggers adapter.subscribe(...); subsequent wirings don't + // re-subscribe (chat.subscribe is idempotent anyway, but the flag + // avoids the extra await). const parsed = safeParseContent(event.message.content); const messageText = parsed.text ?? ''; - const isMention = event.message.isMention === true; let engagedCount = 0; let accumulatedCount = 0; + let subscribed = false; for (const agent of agents) { const agentGroup = getAgentGroup(agent.agent_group_id); @@ -220,6 +244,27 @@ export async function routeInbound(event: InboundEvent): Promise { if (engages && accessOk && scopeOk) { await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, true); engagedCount++; + + // Mention-sticky: ask the adapter to subscribe the thread so the + // platform's subscribed-message path carries follow-ups without + // requiring another @mention. Threaded-adapter only; DMs and + // non-threaded platforms skip. + if ( + !subscribed && + agent.engage_mode === 'mention-sticky' && + adapter?.supportsThreads && + adapter.subscribe && + event.threadId !== null && + mg.is_group !== 0 + ) { + subscribed = true; + // Fire-and-forget — subscribe is platform-side bookkeeping and + // shouldn't block message routing. Errors are logged inside the + // adapter (or by the promise rejection handler below). + void adapter.subscribe(event.platformId, event.threadId).catch((err) => { + log.warn('adapter.subscribe failed', { channelType: event.channelType, threadId: event.threadId, err }); + }); + } } else if (agent.ignored_message_policy === 'accumulate') { await deliverToAgent(agent, agentGroup, mg, event, userId, adapter?.supportsThreads === true, false); accumulatedCount++;