Revert "fix(channels): pre-subscribe group threads for pattern / accumulate wirings"

This reverts commit 73b20880ff.
This commit is contained in:
gavrielc
2026-04-20 10:35:33 +03:00
parent 73b20880ff
commit 57e0cda9e5
3 changed files with 22 additions and 101 deletions

View File

@@ -22,13 +22,6 @@ export interface ConversationConfig {
engageMode: 'pattern' | 'mention' | 'mention-sticky';
/** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */
engagePattern?: string | null;
/**
* What to do with non-engaging messages. Projected from the wiring so the
* adapter can decide whether to pre-subscribe to group threads — `accumulate`
* means "store everything as context even when not engaging", which requires
* seeing every message in the thread.
*/
ignoredMessagePolicy?: 'drop' | 'accumulate';
sessionMode: 'shared' | 'per-thread' | 'agent-shared';
}

View File

@@ -93,83 +93,27 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
}
/**
* Does any wiring for this conversation need Chat SDK subscription to see
* every message? `pattern` and `accumulate` both require it; plain `mention`
* and `mention-sticky` don't (mentions fire their own event, sticky
* subscribes lazily on first fire).
*/
function needsPreSubscribe(configs: ConversationConfig[]): boolean {
return configs.some(
(c) => c.engageMode === 'pattern' || c.ignoredMessagePolicy === 'accumulate',
);
}
/**
* Subscribe to every conversation whose wiring needs to see every message
* (pattern gate or accumulate context). Runs once after `chat.initialize()`.
* Failures are logged and swallowed per-conversation — one un-subscribable
* channel (no permission, not in it yet) shouldn't block startup.
* Should a message from (channelId, kind) engage any of the wired agents?
*
* `threadId` for subscription = the platformId we stored in ConversationConfig.
* This matches the deliver path's `tid = threadId ?? platformId` pattern
* where adapters treat their encoded channel id as the top-level thread id.
*/
async function preSubscribeNeededConversations(
chatInstance: Chat,
conversationsMap: Map<string, ConversationConfig[]>,
): Promise<void> {
for (const [platformId, configs] of conversationsMap.entries()) {
if (!needsPreSubscribe(configs)) continue;
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chatAny = chatInstance as any;
if (typeof chatAny.isSubscribed === 'function') {
const already = await chatAny.isSubscribed(platformId);
if (already) continue;
}
await chatAny.subscribe(platformId);
log.info('Pre-subscribed conversation', { adapter: adapter.name, platformId });
} catch (err) {
log.warn('Pre-subscribe failed — pattern/accumulate wirings won\'t see non-mention messages here', {
adapter: adapter.name,
platformId,
err,
});
}
}
}
/**
* Should a message from this conversation engage any of the wired agents?
* - `mention` — engages only when the message actually @-mentions
* the bot (the bridge already sees it here because
* Chat SDK only forwards subscribed / mentioned /
* DM messages)
* - `mention-sticky` — same as `mention` for gating, PLUS we subscribe
* the thread so later messages arrive via the
* subscribed path and fall through to an
* engage-all style treatment
* - `pattern` — regex test against message text; `.` = always
*
* Source meaning:
* - `dm` — Chat SDK `onDirectMessage`
* - `mention-new` — Chat SDK `onNewMention` (mention in an unsubscribed
* thread; SDK never fires this once the thread is
* subscribed — see SDK types :1468)
* - `subscribed` — Chat SDK `onSubscribedMessage` (plain message in a
* subscribed thread). In this case `isMention` is
* distinct: set to true when `message.isMention` is,
* so `mention` wirings keep firing even if a
* co-resident `pattern` wiring subscribed the thread.
*
* Mode semantics:
* - `pattern` — regex test against text; `.` = always
* - `mention` — fire iff mentioned (covers dm, mention-new, and
* subscribed-with-isMention)
* - `mention-sticky` — fire on dm / mention-new (and subscribe the thread);
* in subscribed source always fire — the thread was
* already activated on a prior mention
*
* Result is the union across wired agents — if any engages, the message
* goes through. Per-agent filtering happens host-side (src/router.ts pickAgents).
* We take the union across wired agents — if any one of them would engage,
* the message goes through. Per-agent filtering after that happens in the
* host router (see src/router.ts pickAgents).
*/
function shouldEngage(
channelId: string,
source: 'subscribed' | 'mention-new' | 'dm',
opts: { text: string; isMention: boolean },
source: 'subscribed' | 'mention' | 'dm',
text: string,
): { engage: boolean; stickySubscribe: boolean } {
const { text, isMention } = opts;
const configs = conversations.get(channelId);
// Unknown conversation — forward anyway (may be a new group that
// hasn't been registered yet; central routing will log + drop cleanly).
@@ -178,19 +122,18 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
let engage = false;
let stickySubscribe = false;
const mentionMatch = source === 'dm' || source === 'mention-new' || (source === 'subscribed' && isMention);
for (const cfg of configs) {
switch (cfg.engageMode) {
case 'mention':
if (mentionMatch) engage = true;
if (source === 'mention' || source === 'dm') engage = true;
break;
case 'mention-sticky':
if (source === 'dm' || source === 'mention-new') {
if (source === 'mention' || source === 'dm') {
engage = true;
stickySubscribe = true;
} else if (source === 'subscribed') {
// Thread already activated on a prior mention — engage all messages.
// Thread was already subscribed on a prior mention — treat as
// engage-all so follow-ups in the thread reach the agent.
engage = true;
}
break;
@@ -295,8 +238,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
chat.onSubscribedMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.text === 'string' ? message.text : '';
const isMention = Boolean((message as unknown as { isMention?: boolean }).isMention);
const decision = shouldEngage(channelId, 'subscribed', { text, isMention });
const decision = shouldEngage(channelId, 'subscribed', text);
if (!decision.engage) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
});
@@ -306,7 +248,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
chat.onNewMention(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.text === 'string' ? message.text : '';
const decision = shouldEngage(channelId, 'mention-new', { text, isMention: true });
const decision = shouldEngage(channelId, 'mention', text);
if (!decision.engage) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
if (decision.stickySubscribe) {
@@ -325,7 +267,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
chat.onDirectMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.text === 'string' ? message.text : '';
const decision = shouldEngage(channelId, 'dm', { text, isMention: true });
const decision = shouldEngage(channelId, 'dm', text);
log.info('Inbound DM received', {
adapter: adapter.name,
channelId,
@@ -370,19 +312,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
await chat.initialize();
// Pre-subscribe to threads where the wiring needs to see every message
// (pattern mode or accumulate policy). Without this, Chat SDK only
// surfaces mentions + DMs for unsubscribed threads — silently dropping
// every plain message in a group, which breaks `engage_mode='pattern'`
// and `ignored_message_policy='accumulate'` for group chats.
//
// For subscription purposes, platformId is used as the thread id. This
// matches the deliver path (see `tid = threadId ?? platformId` below)
// where adapters treat their encoded channel id as the top-level thread.
// Failures are logged and swallowed so one un-subscribable channel
// (e.g., no permission) doesn't block startup.
await preSubscribeNeededConversations(chat, conversations);
// Start Gateway listener for adapters that support it (e.g., Discord)
const gatewayAdapter = adapter as GatewayAdapter;
if (gatewayAdapter.startGatewayListener) {

View File

@@ -163,7 +163,6 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] {
agentGroupId: agent.agent_group_id,
engageMode: agent.engage_mode,
engagePattern: agent.engage_pattern,
ignoredMessagePolicy: agent.ignored_message_policy,
sessionMode: agent.session_mode,
});
}