From 73b20880ffa27e48e95076ee8ae33cc37a3bd1a5 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Mon, 20 Apr 2026 10:34:15 +0300 Subject: [PATCH] fix(channels): pre-subscribe group threads for pattern / accumulate wirings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The engage modes shipped in #1869 included `pattern` (regex match any message) and the `accumulate` ignored-message policy, but neither could fire in group chats because Chat SDK only surfaces: - DMs (onDirectMessage) - @mentions in unsubscribed threads (onNewMention) - every message in subscribed threads (onSubscribedMessage) A bot sitting in a Discord/Slack channel hears *nothing* from a plain message unless the thread is already subscribed. So `pattern '.'` on a group wiring → silent. `pattern /urgent/i` → silent. `mention + accumulate` → the non-mention messages that should be stored as context were never received, so nothing to accumulate. Fix: call `chat.subscribe(platformId)` at setup time for every wiring whose `engageMode === 'pattern'` or `ignoredMessagePolicy === 'accumulate'`. Failures logged + swallowed per-conversation so one un-subscribable channel doesn't crash startup. ## Knock-on: SDK stops firing onNewMention once subscribed Per SDK types:1468, `onNewMention` only fires in unsubscribed threads. Once we pre-subscribe a channel for a pattern wiring, subsequent mentions arrive as `onSubscribedMessage` with `message.isMention === true`. Before: a `mention` wiring coexisting with a `pattern` wiring in the same channel would silently stop firing after pre-subscribe. After: `shouldEngage` accepts the `isMention` flag independently from `source`, so the `mention` mode matches on (dm OR mention-new OR subscribed-with-isMention). Source shape changed `'subscribed' | 'mention' | 'dm'` → `'subscribed' | 'mention-new' | 'dm'` to make the "unsubscribed-mention event" distinction explicit. ## New fields - `ConversationConfig.ignoredMessagePolicy` — projected from the messaging_group_agents row so the bridge knows which wirings need pre-subscription. buildConversationConfigs in src/index.ts populates it. Tests: host 153/153, container 46/46. No new tests yet — the subscribe call path needs a Chat mock, deferred. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/channels/adapter.ts | 7 ++ src/channels/chat-sdk-bridge.ts | 115 ++++++++++++++++++++++++++------ src/index.ts | 1 + 3 files changed, 101 insertions(+), 22 deletions(-) diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index 33f3825..8786061 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -22,6 +22,13 @@ export interface ConversationConfig { engageMode: 'pattern' | 'mention' | 'mention-sticky'; /** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */ engagePattern?: string | null; + /** + * What to do with non-engaging messages. Projected from the wiring so the + * adapter can decide whether to pre-subscribe to group threads — `accumulate` + * means "store everything as context even when not engaging", which requires + * seeing every message in the thread. + */ + ignoredMessagePolicy?: 'drop' | 'accumulate'; sessionMode: 'shared' | 'per-thread' | 'agent-shared'; } diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 6a480c4..4e33696 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -93,27 +93,83 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter } /** - * Should a message from (channelId, kind) engage any of the wired agents? + * Does any wiring for this conversation need Chat SDK subscription to see + * every message? `pattern` and `accumulate` both require it; plain `mention` + * and `mention-sticky` don't (mentions fire their own event, sticky + * subscribes lazily on first fire). + */ + function needsPreSubscribe(configs: ConversationConfig[]): boolean { + return configs.some( + (c) => c.engageMode === 'pattern' || c.ignoredMessagePolicy === 'accumulate', + ); + } + + /** + * Subscribe to every conversation whose wiring needs to see every message + * (pattern gate or accumulate context). Runs once after `chat.initialize()`. + * Failures are logged and swallowed per-conversation — one un-subscribable + * channel (no permission, not in it yet) shouldn't block startup. * - * - `mention` — engages only when the message actually @-mentions - * the bot (the bridge already sees it here because - * Chat SDK only forwards subscribed / mentioned / - * DM messages) - * - `mention-sticky` — same as `mention` for gating, PLUS we subscribe - * the thread so later messages arrive via the - * subscribed path and fall through to an - * engage-all style treatment - * - `pattern` — regex test against message text; `.` = always + * `threadId` for subscription = the platformId we stored in ConversationConfig. + * This matches the deliver path's `tid = threadId ?? platformId` pattern + * where adapters treat their encoded channel id as the top-level thread id. + */ + async function preSubscribeNeededConversations( + chatInstance: Chat, + conversationsMap: Map, + ): Promise { + for (const [platformId, configs] of conversationsMap.entries()) { + if (!needsPreSubscribe(configs)) continue; + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const chatAny = chatInstance as any; + if (typeof chatAny.isSubscribed === 'function') { + const already = await chatAny.isSubscribed(platformId); + if (already) continue; + } + await chatAny.subscribe(platformId); + log.info('Pre-subscribed conversation', { adapter: adapter.name, platformId }); + } catch (err) { + log.warn('Pre-subscribe failed — pattern/accumulate wirings won\'t see non-mention messages here', { + adapter: adapter.name, + platformId, + err, + }); + } + } + } + + /** + * Should a message from this conversation engage any of the wired agents? * - * We take the union across wired agents — if any one of them would engage, - * the message goes through. Per-agent filtering after that happens in the - * host router (see src/router.ts pickAgents). + * Source meaning: + * - `dm` — Chat SDK `onDirectMessage` + * - `mention-new` — Chat SDK `onNewMention` (mention in an unsubscribed + * thread; SDK never fires this once the thread is + * subscribed — see SDK types :1468) + * - `subscribed` — Chat SDK `onSubscribedMessage` (plain message in a + * subscribed thread). In this case `isMention` is + * distinct: set to true when `message.isMention` is, + * so `mention` wirings keep firing even if a + * co-resident `pattern` wiring subscribed the thread. + * + * Mode semantics: + * - `pattern` — regex test against text; `.` = always + * - `mention` — fire iff mentioned (covers dm, mention-new, and + * subscribed-with-isMention) + * - `mention-sticky` — fire on dm / mention-new (and subscribe the thread); + * in subscribed source always fire — the thread was + * already activated on a prior mention + * + * Result is the union across wired agents — if any engages, the message + * goes through. Per-agent filtering happens host-side (src/router.ts pickAgents). */ function shouldEngage( channelId: string, - source: 'subscribed' | 'mention' | 'dm', - text: string, + source: 'subscribed' | 'mention-new' | 'dm', + opts: { text: string; isMention: boolean }, ): { engage: boolean; stickySubscribe: boolean } { + const { text, isMention } = opts; const configs = conversations.get(channelId); // Unknown conversation — forward anyway (may be a new group that // hasn't been registered yet; central routing will log + drop cleanly). @@ -122,18 +178,19 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter let engage = false; let stickySubscribe = false; + const mentionMatch = source === 'dm' || source === 'mention-new' || (source === 'subscribed' && isMention); + for (const cfg of configs) { switch (cfg.engageMode) { case 'mention': - if (source === 'mention' || source === 'dm') engage = true; + if (mentionMatch) engage = true; break; case 'mention-sticky': - if (source === 'mention' || source === 'dm') { + if (source === 'dm' || source === 'mention-new') { engage = true; stickySubscribe = true; } else if (source === 'subscribed') { - // Thread was already subscribed on a prior mention — treat as - // engage-all so follow-ups in the thread reach the agent. + // Thread already activated on a prior mention — engage all messages. engage = true; } break; @@ -238,7 +295,8 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter chat.onSubscribedMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); const text = typeof message.text === 'string' ? message.text : ''; - const decision = shouldEngage(channelId, 'subscribed', text); + const isMention = Boolean((message as unknown as { isMention?: boolean }).isMention); + const decision = shouldEngage(channelId, 'subscribed', { text, isMention }); if (!decision.engage) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message)); }); @@ -248,7 +306,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter chat.onNewMention(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); const text = typeof message.text === 'string' ? message.text : ''; - const decision = shouldEngage(channelId, 'mention', text); + const decision = shouldEngage(channelId, 'mention-new', { text, isMention: true }); if (!decision.engage) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message)); if (decision.stickySubscribe) { @@ -267,7 +325,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter chat.onDirectMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); const text = typeof message.text === 'string' ? message.text : ''; - const decision = shouldEngage(channelId, 'dm', text); + const decision = shouldEngage(channelId, 'dm', { text, isMention: true }); log.info('Inbound DM received', { adapter: adapter.name, channelId, @@ -312,6 +370,19 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter await chat.initialize(); + // Pre-subscribe to threads where the wiring needs to see every message + // (pattern mode or accumulate policy). Without this, Chat SDK only + // surfaces mentions + DMs for unsubscribed threads — silently dropping + // every plain message in a group, which breaks `engage_mode='pattern'` + // and `ignored_message_policy='accumulate'` for group chats. + // + // For subscription purposes, platformId is used as the thread id. This + // matches the deliver path (see `tid = threadId ?? platformId` below) + // where adapters treat their encoded channel id as the top-level thread. + // Failures are logged and swallowed so one un-subscribable channel + // (e.g., no permission) doesn't block startup. + await preSubscribeNeededConversations(chat, conversations); + // Start Gateway listener for adapters that support it (e.g., Discord) const gatewayAdapter = adapter as GatewayAdapter; if (gatewayAdapter.startGatewayListener) { diff --git a/src/index.ts b/src/index.ts index 9bb51be..4958eef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -163,6 +163,7 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] { agentGroupId: agent.agent_group_id, engageMode: agent.engage_mode, engagePattern: agent.engage_pattern, + ignoredMessagePolicy: agent.ignored_message_policy, sessionMode: agent.session_mode, }); }