refactor: scaffold module registries and default-module layout
Additive change — existing code paths still run via inline fallbacks. Prepares core for per-module extractions in PR #3 onward. Four registries added with empty defaults: - delivery action handlers (delivery.ts) - router inbound gate (router.ts) - response dispatcher (index.ts) - MCP tool self-registration (container/agent-runner/src/mcp-tools/server.ts) Default modules moved to src/modules/ for signaling: - src/modules/typing/ (extracted from delivery.ts) - src/modules/mount-security/ (moved from src/mount-security.ts) Both are imported directly by core — no hook, no registry. Removal requires editing core imports. Migrator now keys applied rows by name (uniqueness) so module migrations can pick arbitrary version numbers. Stored version column is auto-assigned as an applied-order sequence. sqlite_master guards added around core calls into module-owned tables (user_roles, agent_destinations, pending_questions). No-ops today; load-bearing after the owning modules are extracted. MODULE-HOOK markers placed at scheduling's two skill-edit sites (host-sweep.ts recurrence call, poll-loop.ts pre-task gate). PR #4 replaces the marked blocks when scheduling moves to its module. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
197
src/delivery.ts
197
src/delivery.ts
@@ -21,6 +21,7 @@ import {
|
||||
} from './db/sessions.js';
|
||||
import { getAgentGroup, createAgentGroup, updateAgentGroup, getAgentGroupByFolder } from './db/agent-groups.js';
|
||||
import { createDestination, getDestinationByName, hasDestination, normalizeName } from './db/agent-destinations.js';
|
||||
import { getDb, hasTable } from './db/connection.js';
|
||||
import { getMessagingGroup, getMessagingGroupByPlatform } from './db/messaging-groups.js';
|
||||
import { pickApprovalDelivery, pickApprover } from './access.js';
|
||||
import {
|
||||
@@ -38,18 +39,16 @@ import {
|
||||
import { log } from './log.js';
|
||||
import { normalizeOptions, type RawOption } from './channels/ask-question.js';
|
||||
import {
|
||||
heartbeatPath,
|
||||
openInboundDb,
|
||||
openOutboundDb,
|
||||
sessionDir,
|
||||
inboundDbPath,
|
||||
resolveSession,
|
||||
writeDestinations,
|
||||
writeSessionMessage,
|
||||
writeSystemResponse,
|
||||
} from './session-manager.js';
|
||||
import { resetContainerIdleTimer, wakeContainer } from './container-runner.js';
|
||||
import { initGroupFilesystem } from './group-init.js';
|
||||
import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js';
|
||||
import type { OutboundFile } from './channels/adapter.js';
|
||||
import type { AgentGroup, Session } from './types.js';
|
||||
|
||||
@@ -93,6 +92,9 @@ let sweepPolling = false;
|
||||
|
||||
export function setDeliveryAdapter(adapter: ChannelDeliveryAdapter): void {
|
||||
deliveryAdapter = adapter;
|
||||
// Forward to the typing module so it can fire setTyping on its own
|
||||
// interval. Direct call, not a registry — typing is a default module.
|
||||
setTypingAdapter(adapter);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -194,148 +196,6 @@ async function requestApproval(
|
||||
log.info('Approval requested', { action, approvalId, agentName, approver: target.userId });
|
||||
}
|
||||
|
||||
/** Show typing indicator on a channel. Called when a message is routed to the agent. */
|
||||
export async function triggerTyping(channelType: string, platformId: string, threadId: string | null): Promise<void> {
|
||||
try {
|
||||
await deliveryAdapter?.setTyping?.(channelType, platformId, threadId);
|
||||
} catch {
|
||||
// Typing is best-effort — don't fail routing if it errors
|
||||
}
|
||||
}
|
||||
|
||||
// ── Typing refresh ──
|
||||
// Most platforms expire a typing indicator after 5–10s, so a one-shot call
|
||||
// on message arrival goes stale long before the agent finishes thinking.
|
||||
// We keep it alive by re-firing setTyping on a short interval — but only
|
||||
// while the agent is actually WORKING, not just while the container is
|
||||
// alive. The agent-runner touches `heartbeat` on every SDK event, so we
|
||||
// gate each tick on "is the heartbeat file fresh?". If it goes stale (agent
|
||||
// finished its turn and is idle-polling), the refresh stops on its own
|
||||
// without waiting for the container to exit.
|
||||
//
|
||||
// After delivering a user-facing message, the refresh is paused for
|
||||
// POST_DELIVERY_PAUSE_MS — long enough for the client-side typing
|
||||
// indicator to visually clear (Discord ~10s, Telegram ~5s). If the agent
|
||||
// keeps touching heartbeat past the pause window, typing resumes
|
||||
// naturally on the next refresh tick.
|
||||
//
|
||||
// `startTypingRefresh` is idempotent per session. `stopTypingRefresh` is
|
||||
// called from container-runner.ts on container exit as a fast-path cleanup
|
||||
// (the heartbeat-staleness path would catch it within one tick anyway).
|
||||
const TYPING_REFRESH_MS = 4000;
|
||||
// Grace window from startTypingRefresh: fire typing unconditionally for
|
||||
// this long regardless of heartbeat state. Covers container spawn/wake
|
||||
// latency, which can be 5–12s on a cold start before the first heartbeat
|
||||
// touch lands.
|
||||
const TYPING_GRACE_MS = 15000;
|
||||
// After the grace window, a heartbeat must be mtimed within this many
|
||||
// milliseconds of now to count as "agent is working." Heartbeats are
|
||||
// touched on every SDK event (tool calls, result chunks), so during
|
||||
// active work they land every few hundred ms. 6s is well above that
|
||||
// while still being small enough to stop typing quickly when the agent
|
||||
// goes idle.
|
||||
const HEARTBEAT_FRESH_MS = 6000;
|
||||
// After we deliver a user-facing message, pause typing for this long so
|
||||
// the client-side indicator has time to visually clear. Tuned for the
|
||||
// longest common client expiry (Discord ~10s). The interval stays
|
||||
// running; ticks inside the pause just skip the setTyping call.
|
||||
const POST_DELIVERY_PAUSE_MS = 10000;
|
||||
|
||||
interface TypingTarget {
|
||||
agentGroupId: string;
|
||||
channelType: string;
|
||||
platformId: string;
|
||||
threadId: string | null;
|
||||
interval: NodeJS.Timeout;
|
||||
startedAt: number;
|
||||
pausedUntil: number; // epoch ms; 0 = not paused
|
||||
}
|
||||
|
||||
const typingRefreshers = new Map<string, TypingTarget>();
|
||||
|
||||
function isHeartbeatFresh(agentGroupId: string, sessionId: string): boolean {
|
||||
const hbPath = heartbeatPath(agentGroupId, sessionId);
|
||||
try {
|
||||
const stat = fs.statSync(hbPath);
|
||||
return Date.now() - stat.mtimeMs < HEARTBEAT_FRESH_MS;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export function startTypingRefresh(
|
||||
sessionId: string,
|
||||
agentGroupId: string,
|
||||
channelType: string,
|
||||
platformId: string,
|
||||
threadId: string | null,
|
||||
): void {
|
||||
const existing = typingRefreshers.get(sessionId);
|
||||
if (existing) {
|
||||
// Already refreshing. Fire an immediate tick for the new inbound
|
||||
// event and reset the grace window — the new message restarts the
|
||||
// container-wake latency budget. Also clear any lingering
|
||||
// post-delivery pause: a new inbound means the user expects typing
|
||||
// to show immediately.
|
||||
triggerTyping(channelType, platformId, threadId).catch(() => {});
|
||||
existing.startedAt = Date.now();
|
||||
existing.pausedUntil = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
// Immediate tick + periodic refresh.
|
||||
triggerTyping(channelType, platformId, threadId).catch(() => {});
|
||||
const startedAt = Date.now();
|
||||
const interval = setInterval(() => {
|
||||
const entry = typingRefreshers.get(sessionId);
|
||||
if (!entry) return; // stopped externally since this tick was scheduled
|
||||
|
||||
// Inside a post-delivery pause: skip setTyping but keep the interval
|
||||
// running so we resume automatically once the pause expires.
|
||||
if (entry.pausedUntil > Date.now()) return;
|
||||
|
||||
const withinGrace = Date.now() - entry.startedAt < TYPING_GRACE_MS;
|
||||
if (withinGrace || isHeartbeatFresh(entry.agentGroupId, sessionId)) {
|
||||
triggerTyping(entry.channelType, entry.platformId, entry.threadId).catch(() => {});
|
||||
return;
|
||||
}
|
||||
|
||||
// Out of grace AND heartbeat stale — agent is idle, stop refreshing.
|
||||
clearInterval(entry.interval);
|
||||
typingRefreshers.delete(sessionId);
|
||||
}, TYPING_REFRESH_MS);
|
||||
// unref so a stale refresher can't hold the event loop alive.
|
||||
interval.unref();
|
||||
typingRefreshers.set(sessionId, {
|
||||
agentGroupId,
|
||||
channelType,
|
||||
platformId,
|
||||
threadId,
|
||||
interval,
|
||||
startedAt,
|
||||
pausedUntil: 0,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Pause the typing refresh for POST_DELIVERY_PAUSE_MS. Called after a
|
||||
* user-facing message is delivered so the client-side indicator has a
|
||||
* chance to visually clear before the agent's next SDK event pushes it
|
||||
* back on. No-op if no refresh is active for this session.
|
||||
*/
|
||||
export function pauseTypingRefreshAfterDelivery(sessionId: string): void {
|
||||
const entry = typingRefreshers.get(sessionId);
|
||||
if (!entry) return;
|
||||
entry.pausedUntil = Date.now() + POST_DELIVERY_PAUSE_MS;
|
||||
}
|
||||
|
||||
export function stopTypingRefresh(sessionId: string): void {
|
||||
const entry = typingRefreshers.get(sessionId);
|
||||
if (!entry) return;
|
||||
clearInterval(entry.interval);
|
||||
typingRefreshers.delete(sessionId);
|
||||
}
|
||||
|
||||
/** Start the active container poll loop (~1s). */
|
||||
export function startActiveDeliveryPoll(): void {
|
||||
if (activePolling) return;
|
||||
@@ -553,15 +413,22 @@ async function deliverMessage(
|
||||
throw new Error(`unknown messaging group for ${msg.channel_type}/${msg.platform_id} (message ${msg.id})`);
|
||||
}
|
||||
const isOriginChat = session.messaging_group_id === mg.id;
|
||||
if (!isOriginChat && !hasDestination(session.agent_group_id, 'channel', mg.id)) {
|
||||
// Guarded: without the agent-to-agent module, `agent_destinations`
|
||||
// doesn't exist and we permit all non-origin channel sends (the
|
||||
// origin-chat case is always allowed regardless).
|
||||
const checkDestinations = hasTable(getDb(), 'agent_destinations');
|
||||
if (!isOriginChat && checkDestinations && !hasDestination(session.agent_group_id, 'channel', mg.id)) {
|
||||
throw new Error(
|
||||
`unauthorized channel destination: ${session.agent_group_id} cannot send to ${mg.channel_type}/${mg.platform_id}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Track pending questions for ask_user_question flow
|
||||
if (content.type === 'ask_question' && content.questionId) {
|
||||
// Track pending questions for ask_user_question flow.
|
||||
// Guarded: without the interactive module, `pending_questions` doesn't
|
||||
// exist and we skip persistence — the card still delivers to the user,
|
||||
// but the response path has nowhere to land and will log unclaimed.
|
||||
if (content.type === 'ask_question' && content.questionId && hasTable(getDb(), 'pending_questions')) {
|
||||
const title = content.title as string | undefined;
|
||||
const rawOptions = content.options as unknown;
|
||||
if (!title || !Array.isArray(rawOptions)) {
|
||||
@@ -637,6 +504,34 @@ async function deliverMessage(
|
||||
return platformMsgId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delivery action registry.
|
||||
*
|
||||
* Modules register handlers for system-kind outbound message actions via
|
||||
* `registerDeliveryAction`. Core checks the registry first in
|
||||
* `handleSystemAction` and falls through to the inline switch when no
|
||||
* handler is registered. The switch will shrink as modules are extracted
|
||||
* (scheduling, approvals, agent-to-agent) and eventually only its default
|
||||
* branch remains.
|
||||
*
|
||||
* Default when no handler registered and the switch doesn't match: log
|
||||
* "Unknown system action" and return.
|
||||
*/
|
||||
export type DeliveryActionHandler = (
|
||||
content: Record<string, unknown>,
|
||||
session: Session,
|
||||
inDb: Database.Database,
|
||||
) => Promise<void>;
|
||||
|
||||
const actionHandlers = new Map<string, DeliveryActionHandler>();
|
||||
|
||||
export function registerDeliveryAction(action: string, handler: DeliveryActionHandler): void {
|
||||
if (actionHandlers.has(action)) {
|
||||
log.warn('Delivery action handler overwritten', { action });
|
||||
}
|
||||
actionHandlers.set(action, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle system actions from the container agent.
|
||||
* These are written to messages_out because the container can't write to inbound.db.
|
||||
@@ -650,6 +545,12 @@ async function handleSystemAction(
|
||||
const action = content.action as string;
|
||||
log.info('System action from agent', { sessionId: session.id, action });
|
||||
|
||||
const registered = actionHandlers.get(action);
|
||||
if (registered) {
|
||||
await registered(content, session, inDb);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (action) {
|
||||
case 'schedule_task': {
|
||||
const taskId = content.taskId as string;
|
||||
|
||||
Reference in New Issue
Block a user