fix(channels): pre-subscribe group threads for pattern / accumulate wirings

The engage modes shipped in #1869 included `pattern` (regex match any
message) and the `accumulate` ignored-message policy, but neither could
fire in group chats because Chat SDK only surfaces:

  - DMs (onDirectMessage)
  - @mentions in unsubscribed threads (onNewMention)
  - every message in subscribed threads (onSubscribedMessage)

A bot sitting in a Discord/Slack channel hears *nothing* from a plain
message unless the thread is already subscribed. So `pattern '.'` on a
group wiring → silent. `pattern /urgent/i` → silent. `mention +
accumulate` → the non-mention messages that should be stored as context
were never received, so nothing to accumulate.

Fix: call `chat.subscribe(platformId)` at setup time for every wiring
whose `engageMode === 'pattern'` or `ignoredMessagePolicy === 'accumulate'`.
Failures logged + swallowed per-conversation so one un-subscribable
channel doesn't crash startup.

## Knock-on: SDK stops firing onNewMention once subscribed

Per SDK types:1468, `onNewMention` only fires in unsubscribed threads.
Once we pre-subscribe a channel for a pattern wiring, subsequent mentions
arrive as `onSubscribedMessage` with `message.isMention === true`.

Before: a `mention` wiring coexisting with a `pattern` wiring in the
same channel would silently stop firing after pre-subscribe.

After: `shouldEngage` accepts the `isMention` flag independently from
`source`, so the `mention` mode matches on (dm OR mention-new OR
subscribed-with-isMention). Source shape changed
`'subscribed' | 'mention' | 'dm'` → `'subscribed' | 'mention-new' | 'dm'`
to make the "unsubscribed-mention event" distinction explicit.

## New fields

- `ConversationConfig.ignoredMessagePolicy` — projected from the
  messaging_group_agents row so the bridge knows which wirings need
  pre-subscription. buildConversationConfigs in src/index.ts populates
  it.

Tests: host 153/153, container 46/46. No new tests yet — the subscribe
call path needs a Chat mock, deferred.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-20 10:34:15 +03:00
parent fca3d8de70
commit 73b20880ff
3 changed files with 101 additions and 22 deletions

View File

@@ -22,6 +22,13 @@ export interface ConversationConfig {
engageMode: 'pattern' | 'mention' | 'mention-sticky';
/** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */
engagePattern?: string | null;
/**
* What to do with non-engaging messages. Projected from the wiring so the
* adapter can decide whether to pre-subscribe to group threads — `accumulate`
* means "store everything as context even when not engaging", which requires
* seeing every message in the thread.
*/
ignoredMessagePolicy?: 'drop' | 'accumulate';
sessionMode: 'shared' | 'per-thread' | 'agent-shared';
}

View File

@@ -93,27 +93,83 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
}
/**
* Should a message from (channelId, kind) engage any of the wired agents?
* Does any wiring for this conversation need Chat SDK subscription to see
* every message? `pattern` and `accumulate` both require it; plain `mention`
* and `mention-sticky` don't (mentions fire their own event, sticky
* subscribes lazily on first fire).
*/
function needsPreSubscribe(configs: ConversationConfig[]): boolean {
return configs.some(
(c) => c.engageMode === 'pattern' || c.ignoredMessagePolicy === 'accumulate',
);
}
/**
* Subscribe to every conversation whose wiring needs to see every message
* (pattern gate or accumulate context). Runs once after `chat.initialize()`.
* Failures are logged and swallowed per-conversation — one un-subscribable
* channel (no permission, not in it yet) shouldn't block startup.
*
* - `mention` — engages only when the message actually @-mentions
* the bot (the bridge already sees it here because
* Chat SDK only forwards subscribed / mentioned /
* DM messages)
* - `mention-sticky` — same as `mention` for gating, PLUS we subscribe
* the thread so later messages arrive via the
* subscribed path and fall through to an
* engage-all style treatment
* - `pattern` — regex test against message text; `.` = always
* `threadId` for subscription = the platformId we stored in ConversationConfig.
* This matches the deliver path's `tid = threadId ?? platformId` pattern
* where adapters treat their encoded channel id as the top-level thread id.
*/
async function preSubscribeNeededConversations(
chatInstance: Chat,
conversationsMap: Map<string, ConversationConfig[]>,
): Promise<void> {
for (const [platformId, configs] of conversationsMap.entries()) {
if (!needsPreSubscribe(configs)) continue;
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chatAny = chatInstance as any;
if (typeof chatAny.isSubscribed === 'function') {
const already = await chatAny.isSubscribed(platformId);
if (already) continue;
}
await chatAny.subscribe(platformId);
log.info('Pre-subscribed conversation', { adapter: adapter.name, platformId });
} catch (err) {
log.warn('Pre-subscribe failed — pattern/accumulate wirings won\'t see non-mention messages here', {
adapter: adapter.name,
platformId,
err,
});
}
}
}
/**
* Should a message from this conversation engage any of the wired agents?
*
* We take the union across wired agents — if any one of them would engage,
* the message goes through. Per-agent filtering after that happens in the
* host router (see src/router.ts pickAgents).
* Source meaning:
* - `dm` — Chat SDK `onDirectMessage`
* - `mention-new` — Chat SDK `onNewMention` (mention in an unsubscribed
* thread; SDK never fires this once the thread is
* subscribed — see SDK types :1468)
* - `subscribed` — Chat SDK `onSubscribedMessage` (plain message in a
* subscribed thread). In this case `isMention` is
* distinct: set to true when `message.isMention` is,
* so `mention` wirings keep firing even if a
* co-resident `pattern` wiring subscribed the thread.
*
* Mode semantics:
* - `pattern` — regex test against text; `.` = always
* - `mention` — fire iff mentioned (covers dm, mention-new, and
* subscribed-with-isMention)
* - `mention-sticky` — fire on dm / mention-new (and subscribe the thread);
* in subscribed source always fire — the thread was
* already activated on a prior mention
*
* Result is the union across wired agents — if any engages, the message
* goes through. Per-agent filtering happens host-side (src/router.ts pickAgents).
*/
function shouldEngage(
channelId: string,
source: 'subscribed' | 'mention' | 'dm',
text: string,
source: 'subscribed' | 'mention-new' | 'dm',
opts: { text: string; isMention: boolean },
): { engage: boolean; stickySubscribe: boolean } {
const { text, isMention } = opts;
const configs = conversations.get(channelId);
// Unknown conversation — forward anyway (may be a new group that
// hasn't been registered yet; central routing will log + drop cleanly).
@@ -122,18 +178,19 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
let engage = false;
let stickySubscribe = false;
const mentionMatch = source === 'dm' || source === 'mention-new' || (source === 'subscribed' && isMention);
for (const cfg of configs) {
switch (cfg.engageMode) {
case 'mention':
if (source === 'mention' || source === 'dm') engage = true;
if (mentionMatch) engage = true;
break;
case 'mention-sticky':
if (source === 'mention' || source === 'dm') {
if (source === 'dm' || source === 'mention-new') {
engage = true;
stickySubscribe = true;
} else if (source === 'subscribed') {
// Thread was already subscribed on a prior mention — treat as
// engage-all so follow-ups in the thread reach the agent.
// Thread already activated on a prior mention — engage all messages.
engage = true;
}
break;
@@ -238,7 +295,8 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
chat.onSubscribedMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.text === 'string' ? message.text : '';
const decision = shouldEngage(channelId, 'subscribed', text);
const isMention = Boolean((message as unknown as { isMention?: boolean }).isMention);
const decision = shouldEngage(channelId, 'subscribed', { text, isMention });
if (!decision.engage) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
});
@@ -248,7 +306,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
chat.onNewMention(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.text === 'string' ? message.text : '';
const decision = shouldEngage(channelId, 'mention', text);
const decision = shouldEngage(channelId, 'mention-new', { text, isMention: true });
if (!decision.engage) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message));
if (decision.stickySubscribe) {
@@ -267,7 +325,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
chat.onDirectMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const text = typeof message.text === 'string' ? message.text : '';
const decision = shouldEngage(channelId, 'dm', text);
const decision = shouldEngage(channelId, 'dm', { text, isMention: true });
log.info('Inbound DM received', {
adapter: adapter.name,
channelId,
@@ -312,6 +370,19 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
await chat.initialize();
// Pre-subscribe to threads where the wiring needs to see every message
// (pattern mode or accumulate policy). Without this, Chat SDK only
// surfaces mentions + DMs for unsubscribed threads — silently dropping
// every plain message in a group, which breaks `engage_mode='pattern'`
// and `ignored_message_policy='accumulate'` for group chats.
//
// For subscription purposes, platformId is used as the thread id. This
// matches the deliver path (see `tid = threadId ?? platformId` below)
// where adapters treat their encoded channel id as the top-level thread.
// Failures are logged and swallowed so one un-subscribable channel
// (e.g., no permission) doesn't block startup.
await preSubscribeNeededConversations(chat, conversations);
// Start Gateway listener for adapters that support it (e.g., Discord)
const gatewayAdapter = adapter as GatewayAdapter;
if (gatewayAdapter.startGatewayListener) {

View File

@@ -163,6 +163,7 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] {
agentGroupId: agent.agent_group_id,
engageMode: agent.engage_mode,
engagePattern: agent.engage_pattern,
ignoredMessagePolicy: agent.ignored_message_policy,
sessionMode: agent.session_mode,
});
}