Replaces the opaque trigger_rules JSON + response_scope enum on
messaging_group_agents with four explicit orthogonal columns:
engage_mode 'pattern' | 'mention' | 'mention-sticky'
engage_pattern regex source; required when mode='pattern';
'.' is the "always" sentinel
sender_scope 'all' | 'known'
ignored_message_policy 'drop' | 'accumulate'
Inbound routing becomes a fan-out — every wired agent is evaluated
independently. A match gets its own session + container wake. A miss
with accumulate keeps the message as context-only (trigger=0) in that
agent's session, so when the agent does eventually engage it sees the
prior chatter.
## Schema
- Migration 010 (`engage-modes`): adds the 4 new columns, backfills
from trigger_rules.pattern + requiresTrigger + response_scope, drops
the legacy columns.
- messages_in gains `trigger INTEGER NOT NULL DEFAULT 1` (session DB
schema + `migrateMessagesInTable` forward-compat).
- countDueMessages gates waking on `trigger = 1`.
## Routing
- `pickAgent` (returns one) → loop over all wired agents. Per agent:
evaluate engage_mode; run access gate + sender-scope gate; on full
match → resolveSession + writeSessionMessage(trigger=1) + wake. On
miss with accumulate → writeSessionMessage(trigger=0), no wake. On
miss with drop → skip.
- New `findSessionForAgent(agentGroupId, mgId, threadId)` scopes
session lookup by agent so fan-out doesn't cross sessions.
- `messageIdForAgent` namespaces inbound message ids by agent_group_id
so PRIMARY KEY doesn't collide across per-agent session DBs.
## Adapter layer
- `ConversationConfig` replaces `triggerPattern` + `requiresTrigger`
with `engageMode` + `engagePattern`.
- Chat SDK bridge stores `Map<platformId, ConversationConfig[]>` (multi-
agent per conversation) and applies union gating pre-onInbound:
* onSubscribedMessage: engage if any wiring keeps firing in
subscribed state (mention-sticky or pattern)
* onNewMention: engage on mention; only subscribes the thread if
at least one wiring is `mention-sticky`
* onDirectMessage: engage per mode; sticky follows same rule
- Bridge no longer unconditionally calls `thread.subscribe()`.
## Sender scope
- Permissions module registers a second hook `setSenderScopeGate` that
runs per-wiring after the existing access gate. `sender_scope='known'`
requires canAccessAgentGroup(); `'all'` is a no-op. Not installed →
no-op everywhere (default allow).
## Container side
- Host passes `NANOCLAW_MAX_MESSAGES_PER_PROMPT` (reuses existing
MAX_MESSAGES_PER_PROMPT config; was dead code from v1).
- `getPendingMessages` queries `ORDER BY seq DESC LIMIT N`, reverses to
chronological order for the prompt — accumulated context rides along
with trigger rows up to the cap.
- `MessageInRow` gains `trigger: number` so the container can tell them
apart in downstream code (container still processes both; only the
host uses `trigger=0` for don't-wake).
## Defaults (per ACTION-ITEMS item 1 decision)
- DM (is_group=0): `engage_mode='pattern'`, `engage_pattern='.'` (always)
- Threaded group: `engage_mode='mention-sticky'` (seed-discord)
- Non-threaded group / CLI: pattern '.' in bootstrap scripts
## Tests
- src/host-core.test.ts: 3 new cases — fan-out (2 agents, 2 sessions,
2 wakes), accumulate (trigger=0 + no wake), drop (no session created).
- Existing 10 host-core tests still pass.
- Migration 010 runs on an empty DB in 0-row path — verified.
Closes: ACTION-ITEMS items 1, 4.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
198 lines
7.3 KiB
TypeScript
198 lines
7.3 KiB
TypeScript
import type { PendingApproval, PendingQuestion, Session } from '../types.js';
|
|
import { getDb } from './connection.js';
|
|
|
|
// ── Sessions ──
|
|
|
|
export function createSession(session: Session): void {
|
|
getDb()
|
|
.prepare(
|
|
`INSERT INTO sessions (id, agent_group_id, messaging_group_id, thread_id, agent_provider, status, container_status, last_active, created_at)
|
|
VALUES (@id, @agent_group_id, @messaging_group_id, @thread_id, @agent_provider, @status, @container_status, @last_active, @created_at)`,
|
|
)
|
|
.run(session);
|
|
}
|
|
|
|
export function getSession(id: string): Session | undefined {
|
|
return getDb().prepare('SELECT * FROM sessions WHERE id = ?').get(id) as Session | undefined;
|
|
}
|
|
|
|
export function findSession(messagingGroupId: string, threadId: string | null): Session | undefined {
|
|
if (threadId) {
|
|
return getDb()
|
|
.prepare('SELECT * FROM sessions WHERE messaging_group_id = ? AND thread_id = ? AND status = ?')
|
|
.get(messagingGroupId, threadId, 'active') as Session | undefined;
|
|
}
|
|
return getDb()
|
|
.prepare('SELECT * FROM sessions WHERE messaging_group_id = ? AND thread_id IS NULL AND status = ?')
|
|
.get(messagingGroupId, 'active') as Session | undefined;
|
|
}
|
|
|
|
/**
|
|
* Session lookup scoped to a specific agent group. Needed when multiple
|
|
* agents are wired to the same messaging group + thread (fan-out) — the
|
|
* plain `findSession` would return whichever agent's session happened to
|
|
* be first and route to the wrong container.
|
|
*/
|
|
export function findSessionForAgent(
|
|
agentGroupId: string,
|
|
messagingGroupId: string,
|
|
threadId: string | null,
|
|
): Session | undefined {
|
|
if (threadId) {
|
|
return getDb()
|
|
.prepare(
|
|
"SELECT * FROM sessions WHERE agent_group_id = ? AND messaging_group_id = ? AND thread_id = ? AND status = 'active'",
|
|
)
|
|
.get(agentGroupId, messagingGroupId, threadId) as Session | undefined;
|
|
}
|
|
return getDb()
|
|
.prepare(
|
|
"SELECT * FROM sessions WHERE agent_group_id = ? AND messaging_group_id = ? AND thread_id IS NULL AND status = 'active'",
|
|
)
|
|
.get(agentGroupId, messagingGroupId) as Session | undefined;
|
|
}
|
|
|
|
/** Find an active session scoped to an agent group (ignoring messaging group). */
|
|
export function findSessionByAgentGroup(agentGroupId: string): Session | undefined {
|
|
return getDb()
|
|
.prepare("SELECT * FROM sessions WHERE agent_group_id = ? AND status = 'active' ORDER BY created_at DESC LIMIT 1")
|
|
.get(agentGroupId) as Session | undefined;
|
|
}
|
|
|
|
export function getSessionsByAgentGroup(agentGroupId: string): Session[] {
|
|
return getDb().prepare('SELECT * FROM sessions WHERE agent_group_id = ?').all(agentGroupId) as Session[];
|
|
}
|
|
|
|
export function getActiveSessions(): Session[] {
|
|
return getDb().prepare("SELECT * FROM sessions WHERE status = 'active'").all() as Session[];
|
|
}
|
|
|
|
export function getRunningSessions(): Session[] {
|
|
return getDb().prepare("SELECT * FROM sessions WHERE container_status IN ('running', 'idle')").all() as Session[];
|
|
}
|
|
|
|
export function updateSession(
|
|
id: string,
|
|
updates: Partial<Pick<Session, 'status' | 'container_status' | 'last_active' | 'agent_provider'>>,
|
|
): void {
|
|
const fields: string[] = [];
|
|
const values: Record<string, unknown> = { id };
|
|
|
|
for (const [key, value] of Object.entries(updates)) {
|
|
if (value !== undefined) {
|
|
fields.push(`${key} = @${key}`);
|
|
values[key] = value;
|
|
}
|
|
}
|
|
if (fields.length === 0) return;
|
|
|
|
getDb()
|
|
.prepare(`UPDATE sessions SET ${fields.join(', ')} WHERE id = @id`)
|
|
.run(values);
|
|
}
|
|
|
|
export function deleteSession(id: string): void {
|
|
getDb().prepare('DELETE FROM sessions WHERE id = ?').run(id);
|
|
}
|
|
|
|
// ── Pending Questions ──
|
|
|
|
export function createPendingQuestion(pq: PendingQuestion): void {
|
|
getDb()
|
|
.prepare(
|
|
`INSERT INTO pending_questions (question_id, session_id, message_out_id, platform_id, channel_type, thread_id, title, options_json, created_at)
|
|
VALUES (@question_id, @session_id, @message_out_id, @platform_id, @channel_type, @thread_id, @title, @options_json, @created_at)`,
|
|
)
|
|
.run({
|
|
question_id: pq.question_id,
|
|
session_id: pq.session_id,
|
|
message_out_id: pq.message_out_id,
|
|
platform_id: pq.platform_id,
|
|
channel_type: pq.channel_type,
|
|
thread_id: pq.thread_id,
|
|
title: pq.title,
|
|
options_json: JSON.stringify(pq.options),
|
|
created_at: pq.created_at,
|
|
});
|
|
}
|
|
|
|
export function getPendingQuestion(questionId: string): PendingQuestion | undefined {
|
|
const row = getDb().prepare('SELECT * FROM pending_questions WHERE question_id = ?').get(questionId) as
|
|
| (Omit<PendingQuestion, 'options'> & { options_json: string })
|
|
| undefined;
|
|
if (!row) return undefined;
|
|
const { options_json, ...rest } = row;
|
|
return { ...rest, options: JSON.parse(options_json) };
|
|
}
|
|
|
|
export function deletePendingQuestion(questionId: string): void {
|
|
getDb().prepare('DELETE FROM pending_questions WHERE question_id = ?').run(questionId);
|
|
}
|
|
|
|
// ── Pending Approvals ──
|
|
|
|
export function createPendingApproval(
|
|
pa: Partial<PendingApproval> &
|
|
Pick<
|
|
PendingApproval,
|
|
'approval_id' | 'request_id' | 'action' | 'payload' | 'created_at' | 'title' | 'options_json'
|
|
>,
|
|
): void {
|
|
getDb()
|
|
.prepare(
|
|
`INSERT INTO pending_approvals
|
|
(approval_id, session_id, request_id, action, payload, created_at,
|
|
agent_group_id, channel_type, platform_id, platform_message_id, expires_at, status,
|
|
title, options_json)
|
|
VALUES
|
|
(@approval_id, @session_id, @request_id, @action, @payload, @created_at,
|
|
@agent_group_id, @channel_type, @platform_id, @platform_message_id, @expires_at, @status,
|
|
@title, @options_json)`,
|
|
)
|
|
.run({
|
|
session_id: null,
|
|
agent_group_id: null,
|
|
channel_type: null,
|
|
platform_id: null,
|
|
platform_message_id: null,
|
|
expires_at: null,
|
|
status: 'pending',
|
|
...pa,
|
|
});
|
|
}
|
|
|
|
export function getPendingApproval(approvalId: string): PendingApproval | undefined {
|
|
return getDb().prepare('SELECT * FROM pending_approvals WHERE approval_id = ?').get(approvalId) as
|
|
| PendingApproval
|
|
| undefined;
|
|
}
|
|
|
|
export function updatePendingApprovalStatus(approvalId: string, status: PendingApproval['status']): void {
|
|
getDb().prepare('UPDATE pending_approvals SET status = ? WHERE approval_id = ?').run(status, approvalId);
|
|
}
|
|
|
|
export function deletePendingApproval(approvalId: string): void {
|
|
getDb().prepare('DELETE FROM pending_approvals WHERE approval_id = ?').run(approvalId);
|
|
}
|
|
|
|
export function getPendingApprovalsByAction(action: string): PendingApproval[] {
|
|
return getDb().prepare('SELECT * FROM pending_approvals WHERE action = ?').all(action) as PendingApproval[];
|
|
}
|
|
|
|
/**
|
|
* Resolve ask_question render metadata (title + normalized options) for any
|
|
* card, regardless of whether it was persisted as a pending_question (generic
|
|
* ask_user_question) or a pending_approval (self-mod / OneCLI credential).
|
|
*/
|
|
export function getAskQuestionRender(
|
|
id: string,
|
|
): { title: string; options: import('../channels/ask-question.js').NormalizedOption[] } | undefined {
|
|
const q = getPendingQuestion(id);
|
|
if (q) return { title: q.title, options: q.options };
|
|
const a = getDb().prepare('SELECT title, options_json FROM pending_approvals WHERE approval_id = ?').get(id) as
|
|
| { title: string; options_json: string }
|
|
| undefined;
|
|
if (!a || !a.title) return undefined;
|
|
return { title: a.title, options: JSON.parse(a.options_json) };
|
|
}
|