refactor(channels,router): move all policy to router; bridge is transport

Follow-up to b159722. That shrank the bridge's shouldEngage to a flood
gate + coarse sticky-subscribe signal. This completes the move —
policy lives exclusively in the router, the bridge is transport-only,
and the conversations map + ChannelSetup.conversations +
ChannelAdapter.updateConversations are all gone.

Key shifts:

1. Subscribe moves from bridge to router.

   Bridge used to call `thread.subscribe()` from its onNewMention /
   onDirectMessage handlers based on a coarse "any mention-sticky wiring
   exists on this channel" check. That forced the decision before the
   router could apply per-wiring engage logic, and it relied on the
   conversations map being current (staleness risk).

   ChannelAdapter gains `subscribe?(platformId, threadId)`. The Chat
   SDK bridge implements it via SqliteStateAdapter.subscribe(threadId)
   (idempotent — a repeat call on an already-subscribed thread is a
   no-op). The router's fan-out loop calls it once per message when
   the first mention-sticky wiring actually engages. Precise, not
   coarse.

2. Short-circuit the drop path with one combined query.

   New `getMessagingGroupWithAgentCount(channelType, platformId)` does
   the messaging_groups lookup AND counts wirings in a single SELECT,
   using the existing UNIQUE(channel_type, platform_id) index on
   messaging_groups and UNIQUE(messaging_group_id, agent_group_id) on
   messaging_group_agents for the JOIN. No new indexes needed.

   routeInbound now short-circuits:
     - No messaging_groups row AND not addressed (no mention/DM)
       → return silently. One DB read, nothing written. This is the
       Discord-bot-in-a-big-guild case; we no longer auto-create rows
       for every plain message in every channel the bot can see.
     - Messaging group exists but no wirings AND not addressed
       → return silently. One DB read.
     - Otherwise fall through to sender resolution + fan-out as before.

   Behavioral change: plain chatter on unwired channels no longer gets
   dropped_messages audit rows, which used to bloat the table. Audit
   still fires on addressed-to-bot drops where the admin cares
   ("someone @-mentioned us but nobody's wired").

3. Bridge is now purely transport.

   Deleted entirely: ConversationConfig, ChannelSetup.conversations,
   ChannelAdapter.updateConversations?, bridge's `conversations` map,
   buildConversationMap, shouldEngage, EngageSource, engageDecision,
   bridge.updateConversations method, src/index.ts
   buildConversationConfigs. Four handlers reduce to "resolve channel
   id, build InboundMessage with isMention, call onInbound". Net
   ~130 LOC deleted from the bridge.

   Collateral: the conversations-map staleness problem is gone. The
   upcoming channel-registration feature doesn't need any map-refresh
   plumbing — when an approval creates a new wiring, the next message
   hits the DB fresh and just works.

Bridge tests prune to the narrow platform-adjacent surface (openDM
delegation, subscribe presence). Host-core test that asserted the
old "auto-create on every unknown message" behavior updates to
reflect the new escalation-gated semantics: plain messages on
unknown channels don't auto-create, mentions do.

159 tests pass (was 172 — net -13, almost entirely from
bridge-engage-mode tests that covered logic now owned by the router
and exercised through host-core.test.ts).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-20 13:55:49 +03:00
parent b15972284b
commit a4061a0012
8 changed files with 173 additions and 309 deletions

View File

@@ -5,45 +5,8 @@
* Two patterns: native adapters (implement directly) or Chat SDK bridge (wrap a Chat SDK adapter).
*/
/** Configuration for a registered conversation (messaging group + agent wiring). */
export interface ConversationConfig {
platformId: string;
agentGroupId: string;
/**
* When does the agent engage on messages from this conversation?
*
* 'pattern' — regex test against message text; engagePattern='.'
* means "always" (match everything)
* 'mention' — fires only on @mention
* 'mention-sticky' — fires on @mention, then auto-subscribes to the thread
* and treats subsequent messages as engage-all.
* Threaded platforms only (Slack/Discord/Linear).
*/
engageMode: 'pattern' | 'mention' | 'mention-sticky';
/** Regex source when engageMode='pattern'. '.' is the "always" sentinel. */
engagePattern?: string | null;
/**
* What to do with messages this wiring doesn't engage on.
*
* 'drop' — discard silently
* 'accumulate' — still forward to the host so the router can store the
* message in this agent's session with trigger=0. It
* rides along as context when the agent next wakes, but
* doesn't wake it on its own.
*
* The bridge reads this to decide whether to forward a non-engaging
* message at all — if any wiring on a conversation has 'accumulate', the
* bridge forwards and lets the router apply the per-wiring decision.
*/
ignoredMessagePolicy?: 'drop' | 'accumulate';
sessionMode: 'shared' | 'per-thread' | 'agent-shared';
}
/** Passed to the adapter at setup time. */
export interface ChannelSetup {
/** Known conversations from central DB. */
conversations: ConversationConfig[];
/** Called when an inbound message arrives from the platform. */
onInbound(platformId: string, threadId: string | null, message: InboundMessage): void | Promise<void>;
@@ -125,7 +88,17 @@ export interface ChannelAdapter {
// Optional
setTyping?(platformId: string, threadId: string | null): Promise<void>;
syncConversations?(): Promise<ConversationInfo[]>;
updateConversations?(conversations: ConversationConfig[]): void;
/**
* Subscribe the bot to a thread so follow-up messages route via the
* platform's "subscribed message" path (onSubscribedMessage in Chat SDK).
* Called by the router when a mention-sticky wiring first engages in a
* thread. Idempotent: calling twice on the same thread is a no-op.
*
* Platforms without a subscription concept can omit this; the router
* treats absence as a no-op.
*/
subscribe?(platformId: string, threadId: string): Promise<void>;
/**
* Open (or fetch) a DM with this user, returning the platform_id of the

View File

@@ -64,8 +64,6 @@ function createMockAdapter(
},
async setTyping() {},
updateConversations() {},
};
}

View File

@@ -2,37 +2,19 @@ import { describe, expect, it } from 'vitest';
import type { Adapter } from 'chat';
import type { ConversationConfig } from './adapter.js';
import { createChatSdkBridge, shouldEngage, type EngageSource } from './chat-sdk-bridge.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
function stubAdapter(partial: Partial<Adapter>): Adapter {
return { name: 'stub', ...partial } as unknown as Adapter;
}
function cfg(
partial: Partial<ConversationConfig> & { engageMode: ConversationConfig['engageMode'] },
): ConversationConfig {
return {
platformId: partial.platformId ?? 'C1',
agentGroupId: partial.agentGroupId ?? 'ag-1',
engageMode: partial.engageMode,
engagePattern: partial.engagePattern ?? null,
ignoredMessagePolicy: partial.ignoredMessagePolicy ?? 'drop',
sessionMode: partial.sessionMode ?? 'shared',
};
}
function mapFor(...configs: ConversationConfig[]): Map<string, ConversationConfig[]> {
const map = new Map<string, ConversationConfig[]>();
for (const c of configs) {
const existing = map.get(c.platformId);
if (existing) existing.push(c);
else map.set(c.platformId, [c]);
}
return map;
}
describe('createChatSdkBridge', () => {
// The bridge is now transport-only: forward inbound events, relay outbound
// ops. All per-wiring engage / accumulate / drop / subscribe decisions live
// in the router (src/router.ts routeInbound / evaluateEngage) and are
// exercised by host-core.test.ts end-to-end. These tests only cover the
// bridge's narrow, platform-adjacent surface.
it('omits openDM when the underlying Chat SDK adapter has none', () => {
const bridge = createChatSdkBridge({
adapter: stubAdapter({}),
@@ -59,76 +41,12 @@ describe('createChatSdkBridge', () => {
expect(openDMCalls).toEqual(['user-42']);
expect(platformId).toBe('stub:user-42');
});
});
describe('shouldEngage (bridge-level flood gate + subscribe signal)', () => {
// Per-wiring engage_mode / engage_pattern / ignored_message_policy
// semantics live in the router (evaluateEngage / routeInbound fan-out).
// These tests only cover the bridge's two responsibilities: should we
// forward at all, and should we call thread.subscribe().
describe('flood gate — unknown conversation', () => {
const empty = new Map<string, ConversationConfig[]>();
const carriedSources: EngageSource[] = ['subscribed', 'mention', 'dm'];
for (const source of carriedSources) {
it(`forwards for source='${source}' (may be a newly-auto-created channel or a channel-registration trigger)`, () => {
expect(shouldEngage(empty, 'C-new', source)).toEqual({ forward: true, stickySubscribe: false });
});
}
it("DROPS for source='new-message' (onNewMessage(/./) fires for every unsubscribed thread the bot can see — would flood)", () => {
expect(shouldEngage(empty, 'C-unwired', 'new-message')).toEqual({
forward: false,
stickySubscribe: false,
});
});
});
describe('known conversation — bridge forwards regardless of engage mode', () => {
// Policy lives in the router now. The bridge only knows "has any wiring".
const conv = mapFor(cfg({ engageMode: 'mention' }));
for (const source of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) {
it(`forwards for source='${source}' — router will decide engage / accumulate / drop per wiring`, () => {
expect(shouldEngage(conv, 'C1', source).forward).toBe(true);
});
}
});
describe('stickySubscribe signal', () => {
it('true when any mention-sticky wiring exists AND source is mention', () => {
const conv = mapFor(cfg({ engageMode: 'mention-sticky' }));
expect(shouldEngage(conv, 'C1', 'mention').stickySubscribe).toBe(true);
});
it('true when any mention-sticky wiring exists AND source is dm', () => {
const conv = mapFor(cfg({ engageMode: 'mention-sticky' }));
expect(shouldEngage(conv, 'C1', 'dm').stickySubscribe).toBe(true);
});
it('false on subscribed — thread is already subscribed, no need to re-subscribe', () => {
const conv = mapFor(cfg({ engageMode: 'mention-sticky' }));
expect(shouldEngage(conv, 'C1', 'subscribed').stickySubscribe).toBe(false);
});
it('false on new-message — mention-sticky requires an explicit mention to start', () => {
const conv = mapFor(cfg({ engageMode: 'mention-sticky' }));
expect(shouldEngage(conv, 'C1', 'new-message').stickySubscribe).toBe(false);
});
it('false for plain mention / pattern wirings (not sticky)', () => {
const mentionConv = mapFor(cfg({ engageMode: 'mention' }));
const patternConv = mapFor(cfg({ engageMode: 'pattern', engagePattern: '.' }));
for (const s of ['subscribed', 'mention', 'dm', 'new-message'] as EngageSource[]) {
expect(shouldEngage(mentionConv, 'C1', s).stickySubscribe).toBe(false);
expect(shouldEngage(patternConv, 'C1', s).stickySubscribe).toBe(false);
}
});
it('fires on coarse union — mixed wirings where any one is mention-sticky', () => {
const conv = mapFor(
cfg({ agentGroupId: 'ag-a', engageMode: 'mention' }),
cfg({ agentGroupId: 'ag-b', engageMode: 'mention-sticky' }),
);
expect(shouldEngage(conv, 'C1', 'mention').stickySubscribe).toBe(true);
it('exposes subscribe (lets the router initiate thread subscription on mention-sticky engage)', () => {
const bridge = createChatSdkBridge({
adapter: stubAdapter({}),
supportsThreads: true,
});
expect(typeof bridge.subscribe).toBe('function');
});
});

View File

@@ -21,7 +21,7 @@ import { SqliteStateAdapter } from '../state-sqlite.js';
import { registerWebhookAdapter } from '../webhook-server.js';
import { getAskQuestionRender } from '../db/sessions.js';
import { normalizeOptions, type NormalizedOption } from './ask-question.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js';
import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js';
/** Adapter with optional gateway support (e.g., Discord). */
interface GatewayAdapter extends Adapter {
@@ -65,99 +65,14 @@ export interface ChatSdkBridgeConfig {
transformOutboundText?: (text: string) => string;
}
/**
* Which Chat SDK handler delivered this message. Determines which engage modes
* can fire.
*
* - `subscribed` — `onSubscribedMessage`. Thread is already subscribed.
* Every wiring mode (mention / mention-sticky / pattern)
* evaluates normally.
* - `mention` — `onNewMention`. Bot was @-mentioned in an unsubscribed
* thread. mention + mention-sticky engage; pattern runs
* the regex.
* - `dm` — `onDirectMessage`. Unsubscribed DM. Treated like a
* mention for engagement purposes.
* - `new-message` — `onNewMessage(/./, …)`. Plain non-mention non-DM
* message in an unsubscribed thread. Only `pattern`
* wirings can fire here. mention / mention-sticky ignore
* this source (they require an explicit mention).
*/
export type EngageSource = 'subscribed' | 'mention' | 'dm' | 'new-message';
/**
* Bridge-level forwarding decision — a coarse flood gate, not policy.
*
* The router owns per-wiring engage_mode / engage_pattern / sender_scope /
* ignored_message_policy (see `evaluateEngage` in src/router.ts). The bridge
* only answers two questions:
*
* 1. `forward` — is this message worth sending to the host at all?
* - Known channel (any wiring): yes. Router will decide what engages /
* accumulates / drops per wiring.
* - Unknown channel: yes for subscribed / mention / DM (triggers the
* router's auto-create or channel-registration flow); no for
* `new-message`. onNewMessage(/./, …) fires for every message in
* every unsubscribed thread the bot can see, including channels the
* bot merely joined but was never wired to — forwarding everything
* would flood the host.
*
* 2. `stickySubscribe` — should the bridge call `thread.subscribe()`?
* - Yes if ANY wiring on this channel is mention-sticky AND the
* source is an actual mention / DM. Coarse (no per-wiring picking)
* but harmless: subscription is idempotent and one call serves
* every mention-sticky wiring on the channel. Once subscribed,
* follow-ups route through onSubscribedMessage.
*
* Exported for testability — see `chat-sdk-bridge.test.ts`.
*/
export function shouldEngage(
conversations: Map<string, ConversationConfig[]>,
channelId: string,
source: EngageSource,
): { forward: boolean; stickySubscribe: boolean } {
const configs = conversations.get(channelId);
if (!configs || configs.length === 0) {
return { forward: source !== 'new-message', stickySubscribe: false };
}
const stickySubscribe =
(source === 'mention' || source === 'dm') && configs.some((cfg) => cfg.engageMode === 'mention-sticky');
return { forward: true, stickySubscribe };
}
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
const { adapter } = config;
const transformText = (t: string): string => (config.transformOutboundText ? config.transformOutboundText(t) : t);
let chat: Chat;
let state: SqliteStateAdapter;
let setupConfig: ChannelSetup;
// Keyed by platformId. Multiple agents may be wired to the same
// conversation — this holds all their configs so the bridge can apply the
// most-permissive engage rule at gate time and only subscribe when at
// least one wiring requested 'mention-sticky'.
//
// STALENESS: populated at setup() and updateConversations(). If wirings
// change after setup, updateConversations() must be called to refresh
// (ACTION-ITEMS item 17).
let conversations: Map<string, ConversationConfig[]>;
let gatewayAbort: AbortController | null = null;
function buildConversationMap(configs: ConversationConfig[]): Map<string, ConversationConfig[]> {
const map = new Map<string, ConversationConfig[]>();
for (const conv of configs) {
const existing = map.get(conv.platformId);
if (existing) existing.push(conv);
else map.set(conv.platformId, [conv]);
}
return map;
}
function engageDecision(channelId: string, source: EngageSource): { forward: boolean; stickySubscribe: boolean } {
return shouldEngage(conversations, channelId, source);
}
async function messageToInbound(message: ChatMessage, isMention: boolean): Promise<InboundMessage> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const serialized = message.toJSON() as Record<string, any>;
@@ -225,7 +140,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
async setup(hostConfig: ChannelSetup) {
setupConfig = hostConfig;
conversations = buildConversationMap(hostConfig.conversations);
state = new SqliteStateAdapter();
@@ -237,29 +151,25 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
logger: 'silent',
});
// Four SDK dispatch paths — bridge just forwards; router does all
// per-wiring engage / accumulate / drop decisions. isMention is the
// load-bearing signal (see evaluateEngage in src/router.ts).
// Four SDK dispatch paths — bridge just forwards. All per-wiring
// engage / accumulate / drop / subscribe decisions live in the host
// router (src/router.ts routeInbound / evaluateEngage). The bridge
// only resolves channel ids and sets the platform-confirmed isMention
// flag that routeInbound evaluates; the router calls back into
// bridge.subscribe(...) when a mention-sticky wiring engages.
// Subscribed threads — every message in a thread we've previously
// engaged. Carry the SDK's `message.isMention` through so mention-mode
// wirings still fire on in-thread mentions.
chat.onSubscribedMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const decision = engageDecision(channelId, 'subscribed');
if (!decision.forward) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, message.isMention === true));
});
// @mention in an unsubscribed thread — SDK-confirmed bot mention.
chat.onNewMention(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const decision = engageDecision(channelId, 'mention');
if (!decision.forward) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true));
if (decision.stickySubscribe) {
await thread.subscribe();
}
});
// DMs — by definition addressed to the bot. Thread id flows through
@@ -268,19 +178,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
// is_group=0 short-circuit.
chat.onDirectMessage(async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const decision = engageDecision(channelId, 'dm');
log.info('Inbound DM received', {
adapter: adapter.name,
channelId,
sender: (message.author as any)?.fullName ?? (message.author as any)?.userId ?? 'unknown',
threadId: thread.id,
forward: decision.forward,
});
if (!decision.forward) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, true));
if (decision.stickySubscribe) {
await thread.subscribe();
}
});
// Plain messages in unsubscribed threads.
@@ -288,14 +192,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
// Chat SDK dispatch (handling-events.mdx §"Handler dispatch order") is
// exclusive: subscribed → onSubscribedMessage; unsubscribed+mention →
// onNewMention; unsubscribed+pattern-match → onNewMessage. Registering
// with `/./` lets the router see every plain message on wired channels
// (needed for engage_mode='pattern' + ignored_message_policy='accumulate'
// wirings). `shouldEngage` drops unknown channels on this source
// specifically so we don't flood from channels the bot merely joined.
// with `/./` lets the router see every plain message on every
// unsubscribed thread the bot can see. The router short-circuits via
// getMessagingGroupWithAgentCount (~1 DB read) for unwired channels,
// so forwarding every one is cheap enough to not need a bridge-side
// flood gate.
chat.onNewMessage(/./, async (thread, message) => {
const channelId = adapter.channelIdFromThreadId(thread.id);
const decision = engageDecision(channelId, 'new-message');
if (!decision.forward) return;
await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, false));
});
@@ -468,8 +371,13 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
return true;
},
updateConversations(configs: ConversationConfig[]) {
conversations = buildConversationMap(configs);
async subscribe(_platformId: string, threadId: string) {
// Chat SDK's subscription state lives on the StateAdapter (not on the
// Chat instance itself). SqliteStateAdapter.subscribe is idempotent —
// a second call on an already-subscribed thread is a no-op. threadId
// is the SDK's thread id, which is what the router already has from
// the original inbound event.
await state.subscribe(threadId);
},
};