From 57e0cda9e5d8347b93d767222f052b025d3f4572 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Mon, 20 Apr 2026 10:35:33 +0300 Subject: [PATCH] Revert "fix(channels): pre-subscribe group threads for pattern / accumulate wirings" This reverts commit 73b20880ffa27e48e95076ee8ae33cc37a3bd1a5. --- src/channels/adapter.ts | 7 -- src/channels/chat-sdk-bridge.ts | 115 ++++++-------------------------- src/index.ts | 1 - 3 files changed, 22 insertions(+), 101 deletions(-) diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index 8786061..33f3825 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -22,13 +22,6 @@ export interface ConversationConfig { engageMode: 'pattern' | 'mention' | 'mention-sticky'; /** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */ engagePattern?: string | null; - /** - * What to do with non-engaging messages. Projected from the wiring so the - * adapter can decide whether to pre-subscribe to group threads — `accumulate` - * means "store everything as context even when not engaging", which requires - * seeing every message in the thread. - */ - ignoredMessagePolicy?: 'drop' | 'accumulate'; sessionMode: 'shared' | 'per-thread' | 'agent-shared'; } diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 4e33696..6a480c4 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -93,83 +93,27 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter } /** - * Does any wiring for this conversation need Chat SDK subscription to see - * every message? `pattern` and `accumulate` both require it; plain `mention` - * and `mention-sticky` don't (mentions fire their own event, sticky - * subscribes lazily on first fire). - */ - function needsPreSubscribe(configs: ConversationConfig[]): boolean { - return configs.some( - (c) => c.engageMode === 'pattern' || c.ignoredMessagePolicy === 'accumulate', - ); - } - - /** - * Subscribe to every conversation whose wiring needs to see every message - * (pattern gate or accumulate context). Runs once after `chat.initialize()`. - * Failures are logged and swallowed per-conversation — one un-subscribable - * channel (no permission, not in it yet) shouldn't block startup. + * Should a message from (channelId, kind) engage any of the wired agents? * - * `threadId` for subscription = the platformId we stored in ConversationConfig. - * This matches the deliver path's `tid = threadId ?? platformId` pattern - * where adapters treat their encoded channel id as the top-level thread id. - */ - async function preSubscribeNeededConversations( - chatInstance: Chat, - conversationsMap: Map, - ): Promise { - for (const [platformId, configs] of conversationsMap.entries()) { - if (!needsPreSubscribe(configs)) continue; - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const chatAny = chatInstance as any; - if (typeof chatAny.isSubscribed === 'function') { - const already = await chatAny.isSubscribed(platformId); - if (already) continue; - } - await chatAny.subscribe(platformId); - log.info('Pre-subscribed conversation', { adapter: adapter.name, platformId }); - } catch (err) { - log.warn('Pre-subscribe failed — pattern/accumulate wirings won\'t see non-mention messages here', { - adapter: adapter.name, - platformId, - err, - }); - } - } - } - - /** - * Should a message from this conversation engage any of the wired agents? + * - `mention` — engages only when the message actually @-mentions + * the bot (the bridge already sees it here because + * Chat SDK only forwards subscribed / mentioned / + * DM messages) + * - `mention-sticky` — same as `mention` for gating, PLUS we subscribe + * the thread so later messages arrive via the + * subscribed path and fall through to an + * engage-all style treatment + * - `pattern` — regex test against message text; `.` = always * - * Source meaning: - * - `dm` — Chat SDK `onDirectMessage` - * - `mention-new` — Chat SDK `onNewMention` (mention in an unsubscribed - * thread; SDK never fires this once the thread is - * subscribed — see SDK types :1468) - * - `subscribed` — Chat SDK `onSubscribedMessage` (plain message in a - * subscribed thread). In this case `isMention` is - * distinct: set to true when `message.isMention` is, - * so `mention` wirings keep firing even if a - * co-resident `pattern` wiring subscribed the thread. - * - * Mode semantics: - * - `pattern` — regex test against text; `.` = always - * - `mention` — fire iff mentioned (covers dm, mention-new, and - * subscribed-with-isMention) - * - `mention-sticky` — fire on dm / mention-new (and subscribe the thread); - * in subscribed source always fire — the thread was - * already activated on a prior mention - * - * Result is the union across wired agents — if any engages, the message - * goes through. Per-agent filtering happens host-side (src/router.ts pickAgents). + * We take the union across wired agents — if any one of them would engage, + * the message goes through. Per-agent filtering after that happens in the + * host router (see src/router.ts pickAgents). */ function shouldEngage( channelId: string, - source: 'subscribed' | 'mention-new' | 'dm', - opts: { text: string; isMention: boolean }, + source: 'subscribed' | 'mention' | 'dm', + text: string, ): { engage: boolean; stickySubscribe: boolean } { - const { text, isMention } = opts; const configs = conversations.get(channelId); // Unknown conversation — forward anyway (may be a new group that // hasn't been registered yet; central routing will log + drop cleanly). @@ -178,19 +122,18 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter let engage = false; let stickySubscribe = false; - const mentionMatch = source === 'dm' || source === 'mention-new' || (source === 'subscribed' && isMention); - for (const cfg of configs) { switch (cfg.engageMode) { case 'mention': - if (mentionMatch) engage = true; + if (source === 'mention' || source === 'dm') engage = true; break; case 'mention-sticky': - if (source === 'dm' || source === 'mention-new') { + if (source === 'mention' || source === 'dm') { engage = true; stickySubscribe = true; } else if (source === 'subscribed') { - // Thread already activated on a prior mention — engage all messages. + // Thread was already subscribed on a prior mention — treat as + // engage-all so follow-ups in the thread reach the agent. engage = true; } break; @@ -295,8 +238,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter chat.onSubscribedMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); const text = typeof message.text === 'string' ? message.text : ''; - const isMention = Boolean((message as unknown as { isMention?: boolean }).isMention); - const decision = shouldEngage(channelId, 'subscribed', { text, isMention }); + const decision = shouldEngage(channelId, 'subscribed', text); if (!decision.engage) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message)); }); @@ -306,7 +248,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter chat.onNewMention(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); const text = typeof message.text === 'string' ? message.text : ''; - const decision = shouldEngage(channelId, 'mention-new', { text, isMention: true }); + const decision = shouldEngage(channelId, 'mention', text); if (!decision.engage) return; await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message)); if (decision.stickySubscribe) { @@ -325,7 +267,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter chat.onDirectMessage(async (thread, message) => { const channelId = adapter.channelIdFromThreadId(thread.id); const text = typeof message.text === 'string' ? message.text : ''; - const decision = shouldEngage(channelId, 'dm', { text, isMention: true }); + const decision = shouldEngage(channelId, 'dm', text); log.info('Inbound DM received', { adapter: adapter.name, channelId, @@ -370,19 +312,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter await chat.initialize(); - // Pre-subscribe to threads where the wiring needs to see every message - // (pattern mode or accumulate policy). Without this, Chat SDK only - // surfaces mentions + DMs for unsubscribed threads — silently dropping - // every plain message in a group, which breaks `engage_mode='pattern'` - // and `ignored_message_policy='accumulate'` for group chats. - // - // For subscription purposes, platformId is used as the thread id. This - // matches the deliver path (see `tid = threadId ?? platformId` below) - // where adapters treat their encoded channel id as the top-level thread. - // Failures are logged and swallowed so one un-subscribable channel - // (e.g., no permission) doesn't block startup. - await preSubscribeNeededConversations(chat, conversations); - // Start Gateway listener for adapters that support it (e.g., Discord) const gatewayAdapter = adapter as GatewayAdapter; if (gatewayAdapter.startGatewayListener) { diff --git a/src/index.ts b/src/index.ts index 4958eef..9bb51be 100644 --- a/src/index.ts +++ b/src/index.ts @@ -163,7 +163,6 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] { agentGroupId: agent.agent_group_id, engageMode: agent.engage_mode, engagePattern: agent.engage_pattern, - ignoredMessagePolicy: agent.ignored_message_policy, sessionMode: agent.session_mode, }); }