refactor(modules): extract agent-to-agent as registry-based module
Last extraction of Phase 3. Moves inter-agent messaging + create_agent +
destination projection into src/modules/agent-to-agent/. Core retains:
- `channel_type === 'agent'` dispatch in delivery.ts, guarded by
hasTable('agent_destinations') + dynamic import into module.
- Channel-permission ACL in delivery.ts, guarded by hasTable, with
inlined SQL (no module import from core).
- writeDestinations call in container-runner.ts, guarded by hasTable +
dynamic import into module.
- createMessagingGroupAgent's destination side effect in db/messaging-groups.ts,
guarded by hasTable. This is a documented transitional tier violation
(core imports from optional module), analogous to src/access.ts.
Migration `004-agent-destinations.ts` renamed to `module-agent-to-agent-
destinations.ts` preserving `name: 'agent-destinations'` so existing DBs
don't re-run it.
delivery.ts: 600 → 449 lines. handleSystemAction's last switch case gone
(just registry + default log-and-drop). notifyAgent helper removed (only
create_agent used it).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
201
src/delivery.ts
201
src/delivery.ts
@@ -11,10 +11,8 @@ import type Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { GROUPS_DIR } from './config.js';
|
||||
import { getRunningSessions, getActiveSessions, createPendingQuestion, getSession } from './db/sessions.js';
|
||||
import { getAgentGroup, createAgentGroup, updateAgentGroup, getAgentGroupByFolder } from './db/agent-groups.js';
|
||||
import { createDestination, getDestinationByName, hasDestination, normalizeName } from './db/agent-destinations.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import { getDb, hasTable } from './db/connection.js';
|
||||
import { getMessagingGroupByPlatform } from './db/messaging-groups.js';
|
||||
import {
|
||||
@@ -26,19 +24,11 @@ import {
|
||||
} from './db/session-db.js';
|
||||
import { log } from './log.js';
|
||||
import { normalizeOptions } from './channels/ask-question.js';
|
||||
import {
|
||||
openInboundDb,
|
||||
openOutboundDb,
|
||||
sessionDir,
|
||||
resolveSession,
|
||||
writeDestinations,
|
||||
writeSessionMessage,
|
||||
} from './session-manager.js';
|
||||
import { openInboundDb, openOutboundDb, sessionDir, writeSessionMessage } from './session-manager.js';
|
||||
import { resetContainerIdleTimer, wakeContainer } from './container-runner.js';
|
||||
import { initGroupFilesystem } from './group-init.js';
|
||||
import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js';
|
||||
import type { OutboundFile } from './channels/adapter.js';
|
||||
import type { AgentGroup, Session } from './types.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
const ACTIVE_POLL_MS = 1000;
|
||||
const SWEEP_POLL_MS = 60_000;
|
||||
@@ -117,29 +107,6 @@ export function setDeliveryAdapter(adapter: ChannelDeliveryAdapter): void {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deliver a system notification to an agent as a regular chat message.
|
||||
* Used for fire-and-forget responses from host actions (create_agent result,
|
||||
* approval outcomes, etc.). The agent sees it as an inbound chat message
|
||||
* with sender="system".
|
||||
*/
|
||||
function notifyAgent(session: Session, text: string): void {
|
||||
writeSessionMessage(session.agent_group_id, session.id, {
|
||||
id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: session.agent_group_id,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: JSON.stringify({ text, sender: 'system', senderId: 'system' }),
|
||||
});
|
||||
// Wake the container so it picks up the notification promptly
|
||||
const fresh = getSession(session.id);
|
||||
if (fresh) {
|
||||
wakeContainer(fresh).catch((err) => log.error('Failed to wake container after notification', { err }));
|
||||
}
|
||||
}
|
||||
|
||||
/** Start the active container poll loop (~1s). */
|
||||
export function startActiveDeliveryPoll(): void {
|
||||
if (activePolling) return;
|
||||
@@ -293,45 +260,16 @@ async function deliverMessage(
|
||||
return;
|
||||
}
|
||||
|
||||
// Agent-to-agent — route to target session (with permission check).
|
||||
// Permission is enforced via agent_destinations — the source agent must have
|
||||
// a row for the target. Content is copied verbatim; the target's formatter
|
||||
// will look up the source agent in its own local map to display a name.
|
||||
// Agent-to-agent — route to target session via the agent-to-agent module.
|
||||
// Guarded by the channel_type check. If the module isn't installed the
|
||||
// `agent_destinations` table won't exist and `routeAgentMessage`'s permission
|
||||
// check will throw, which falls into the normal retry → mark-failed path.
|
||||
if (msg.channel_type === 'agent') {
|
||||
const targetAgentGroupId = msg.platform_id;
|
||||
if (!targetAgentGroupId) {
|
||||
throw new Error(`agent-to-agent message ${msg.id} is missing a target agent group id`);
|
||||
if (!hasTable(getDb(), 'agent_destinations')) {
|
||||
throw new Error(`agent-to-agent module not installed — cannot route message ${msg.id}`);
|
||||
}
|
||||
// Self-messages are always allowed — used for system notes injected back
|
||||
// into an agent's own session (e.g. post-approval follow-up prompts).
|
||||
if (
|
||||
targetAgentGroupId !== session.agent_group_id &&
|
||||
!hasDestination(session.agent_group_id, 'agent', targetAgentGroupId)
|
||||
) {
|
||||
throw new Error(
|
||||
`unauthorized agent-to-agent: ${session.agent_group_id} has no destination for ${targetAgentGroupId}`,
|
||||
);
|
||||
}
|
||||
if (!getAgentGroup(targetAgentGroupId)) {
|
||||
throw new Error(`target agent group ${targetAgentGroupId} not found for message ${msg.id}`);
|
||||
}
|
||||
const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared');
|
||||
writeSessionMessage(targetAgentGroupId, targetSession.id, {
|
||||
id: `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
kind: 'chat',
|
||||
timestamp: new Date().toISOString(),
|
||||
platformId: session.agent_group_id,
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: msg.content,
|
||||
});
|
||||
log.info('Agent message routed', {
|
||||
from: session.agent_group_id,
|
||||
to: targetAgentGroupId,
|
||||
targetSession: targetSession.id,
|
||||
});
|
||||
const fresh = getSession(targetSession.id);
|
||||
if (fresh) await wakeContainer(fresh);
|
||||
const { routeAgentMessage } = await import('./modules/agent-to-agent/agent-route.js');
|
||||
await routeAgentMessage(msg, session);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -359,12 +297,19 @@ async function deliverMessage(
|
||||
const isOriginChat = session.messaging_group_id === mg.id;
|
||||
// Guarded: without the agent-to-agent module, `agent_destinations`
|
||||
// doesn't exist and we permit all non-origin channel sends (the
|
||||
// origin-chat case is always allowed regardless).
|
||||
const checkDestinations = hasTable(getDb(), 'agent_destinations');
|
||||
if (!isOriginChat && checkDestinations && !hasDestination(session.agent_group_id, 'channel', mg.id)) {
|
||||
throw new Error(
|
||||
`unauthorized channel destination: ${session.agent_group_id} cannot send to ${mg.channel_type}/${mg.platform_id}`,
|
||||
);
|
||||
// origin-chat case is always allowed regardless). Inlined SQL instead
|
||||
// of importing `hasDestination` so core doesn't depend on the module.
|
||||
if (!isOriginChat && hasTable(getDb(), 'agent_destinations')) {
|
||||
const row = getDb()
|
||||
.prepare(
|
||||
'SELECT 1 FROM agent_destinations WHERE agent_group_id = ? AND target_type = ? AND target_id = ? LIMIT 1',
|
||||
)
|
||||
.get(session.agent_group_id, 'channel', mg.id);
|
||||
if (!row) {
|
||||
throw new Error(
|
||||
`unauthorized channel destination: ${session.agent_group_id} cannot send to ${mg.channel_type}/${mg.platform_id}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -495,103 +440,7 @@ async function handleSystemAction(
|
||||
return;
|
||||
}
|
||||
|
||||
switch (action) {
|
||||
case 'create_agent': {
|
||||
const requestId = content.requestId as string;
|
||||
const name = content.name as string;
|
||||
const instructions = content.instructions as string | null;
|
||||
|
||||
const sourceGroup = getAgentGroup(session.agent_group_id);
|
||||
if (!sourceGroup) {
|
||||
notifyAgent(session, `create_agent failed: source agent group not found.`);
|
||||
log.warn('create_agent failed: missing source group', { sessionAgentGroup: session.agent_group_id, name });
|
||||
break;
|
||||
}
|
||||
|
||||
const localName = normalizeName(name);
|
||||
|
||||
// Collision in the creator's destination namespace
|
||||
if (getDestinationByName(sourceGroup.id, localName)) {
|
||||
notifyAgent(session, `Cannot create agent "${name}": you already have a destination named "${localName}".`);
|
||||
break;
|
||||
}
|
||||
|
||||
// Derive a safe folder name, deduplicated globally across agent_groups.folder
|
||||
let folder = localName;
|
||||
let suffix = 2;
|
||||
while (getAgentGroupByFolder(folder)) {
|
||||
folder = `${localName}-${suffix}`;
|
||||
suffix++;
|
||||
}
|
||||
|
||||
const groupPath = path.join(GROUPS_DIR, folder);
|
||||
const resolvedPath = path.resolve(groupPath);
|
||||
const resolvedGroupsDir = path.resolve(GROUPS_DIR);
|
||||
if (!resolvedPath.startsWith(resolvedGroupsDir + path.sep)) {
|
||||
notifyAgent(session, `Cannot create agent "${name}": invalid folder path.`);
|
||||
log.error('create_agent path traversal attempt', { folder, resolvedPath });
|
||||
break;
|
||||
}
|
||||
|
||||
const agentGroupId = `ag-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const newGroup: AgentGroup = {
|
||||
id: agentGroupId,
|
||||
name,
|
||||
folder,
|
||||
agent_provider: null,
|
||||
created_at: now,
|
||||
};
|
||||
createAgentGroup(newGroup);
|
||||
initGroupFilesystem(newGroup, { instructions: instructions ?? undefined });
|
||||
|
||||
// Insert bidirectional destination rows (= ACL grants).
|
||||
// Creator refers to child by the name it chose; child refers to creator as "parent".
|
||||
createDestination({
|
||||
agent_group_id: sourceGroup.id,
|
||||
local_name: localName,
|
||||
target_type: 'agent',
|
||||
target_id: agentGroupId,
|
||||
created_at: now,
|
||||
});
|
||||
// Handle the unlikely case where the child already has a "parent" destination
|
||||
// (shouldn't happen for a brand-new agent, but be safe).
|
||||
let parentName = 'parent';
|
||||
let parentSuffix = 2;
|
||||
while (getDestinationByName(agentGroupId, parentName)) {
|
||||
parentName = `parent-${parentSuffix}`;
|
||||
parentSuffix++;
|
||||
}
|
||||
createDestination({
|
||||
agent_group_id: agentGroupId,
|
||||
local_name: parentName,
|
||||
target_type: 'agent',
|
||||
target_id: sourceGroup.id,
|
||||
created_at: now,
|
||||
});
|
||||
|
||||
// REQUIRED: project the new destination into the running
|
||||
// container's inbound.db. See the top-of-file invariant in
|
||||
// src/db/agent-destinations.ts — forgetting this causes
|
||||
// "dropped: unknown destination" when the parent tries to send
|
||||
// to the newly-created child.
|
||||
writeDestinations(session.agent_group_id, session.id);
|
||||
|
||||
// Fire-and-forget notification back to the creator
|
||||
notifyAgent(
|
||||
session,
|
||||
`Agent "${localName}" created. You can now message it with <message to="${localName}">...</message>.`,
|
||||
);
|
||||
log.info('Agent group created', { agentGroupId, name, localName, folder, parent: sourceGroup.id });
|
||||
// Note: requestId is unused — this is fire-and-forget, not request/response.
|
||||
void requestId;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
log.warn('Unknown system action', { action });
|
||||
}
|
||||
log.warn('Unknown system action', { action });
|
||||
}
|
||||
|
||||
export function stopDeliveryPolls(): void {
|
||||
|
||||
Reference in New Issue
Block a user