Merge pull request #1839 from qwibitai/refactor/pr2-scaffolding
refactor: scaffold module registries + default-module layout (PR #2)
This commit is contained in:
@@ -9,6 +9,7 @@
|
|||||||
* (see mcp-tools/index.ts). The host re-checks permission on receive.
|
* (see mcp-tools/index.ts). The host re-checks permission on receive.
|
||||||
*/
|
*/
|
||||||
import { writeMessageOut } from '../db/messages-out.js';
|
import { writeMessageOut } from '../db/messages-out.js';
|
||||||
|
import { registerTools } from './server.js';
|
||||||
import type { McpToolDefinition } from './types.js';
|
import type { McpToolDefinition } from './types.js';
|
||||||
|
|
||||||
function log(msg: string): void {
|
function log(msg: string): void {
|
||||||
@@ -62,4 +63,4 @@ export const createAgent: McpToolDefinition = {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export const agentTools: McpToolDefinition[] = [createAgent];
|
registerTools([createAgent]);
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import path from 'path';
|
|||||||
import { findByName, getAllDestinations } from '../destinations.js';
|
import { findByName, getAllDestinations } from '../destinations.js';
|
||||||
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
|
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
|
||||||
import { getSessionRouting } from '../db/session-routing.js';
|
import { getSessionRouting } from '../db/session-routing.js';
|
||||||
|
import { registerTools } from './server.js';
|
||||||
import type { McpToolDefinition } from './types.js';
|
import type { McpToolDefinition } from './types.js';
|
||||||
|
|
||||||
function log(msg: string): void {
|
function log(msg: string): void {
|
||||||
@@ -258,4 +259,4 @@ export const addReaction: McpToolDefinition = {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export const coreTools: McpToolDefinition[] = [sendMessage, sendFile, editMessage, addReaction];
|
registerTools([sendMessage, sendFile, editMessage, addReaction]);
|
||||||
|
|||||||
@@ -1,59 +1,21 @@
|
|||||||
/**
|
/**
|
||||||
* MCP tools barrel — collects all tool modules and starts the server.
|
* MCP tools barrel — imports each tool module for its side-effect
|
||||||
|
* `registerTools([...])` call, then starts the MCP server.
|
||||||
*
|
*
|
||||||
* Each module exports a McpToolDefinition[] array. This file registers
|
* Adding a new tool module: create the file, call `registerTools([...])`
|
||||||
* them all with the MCP server. Adding a new tool module requires only
|
* at module scope, and append the import here. No central list.
|
||||||
* importing it here and spreading its tools array.
|
|
||||||
*/
|
*/
|
||||||
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
|
import './core.js';
|
||||||
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
|
import './scheduling.js';
|
||||||
import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js';
|
import './interactive.js';
|
||||||
|
import './agents.js';
|
||||||
import type { McpToolDefinition } from './types.js';
|
import './self-mod.js';
|
||||||
import { coreTools } from './core.js';
|
import { startMcpServer } from './server.js';
|
||||||
import { schedulingTools } from './scheduling.js';
|
|
||||||
import { interactiveTools } from './interactive.js';
|
|
||||||
import { agentTools } from './agents.js';
|
|
||||||
import { selfModTools } from './self-mod.js';
|
|
||||||
|
|
||||||
function log(msg: string): void {
|
function log(msg: string): void {
|
||||||
console.error(`[mcp-tools] ${msg}`);
|
console.error(`[mcp-tools] ${msg}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const allTools: McpToolDefinition[] = [
|
|
||||||
...coreTools,
|
|
||||||
...schedulingTools,
|
|
||||||
...interactiveTools,
|
|
||||||
...agentTools,
|
|
||||||
...selfModTools,
|
|
||||||
];
|
|
||||||
|
|
||||||
const toolMap = new Map<string, McpToolDefinition>();
|
|
||||||
for (const t of allTools) {
|
|
||||||
toolMap.set(t.tool.name, t);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function startMcpServer(): Promise<void> {
|
|
||||||
const server = new Server({ name: 'nanoclaw', version: '2.0.0' }, { capabilities: { tools: {} } });
|
|
||||||
|
|
||||||
server.setRequestHandler(ListToolsRequestSchema, async () => ({
|
|
||||||
tools: allTools.map((t) => t.tool),
|
|
||||||
}));
|
|
||||||
|
|
||||||
server.setRequestHandler(CallToolRequestSchema, async (request) => {
|
|
||||||
const { name, arguments: args } = request.params;
|
|
||||||
const tool = toolMap.get(name);
|
|
||||||
if (!tool) {
|
|
||||||
return { content: [{ type: 'text', text: `Unknown tool: ${name}` }] };
|
|
||||||
}
|
|
||||||
return tool.handler(args ?? {});
|
|
||||||
});
|
|
||||||
|
|
||||||
const transport = new StdioServerTransport();
|
|
||||||
await server.connect(transport);
|
|
||||||
log(`MCP server started with ${allTools.length} tools: ${allTools.map((t) => t.tool.name).join(', ')}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
startMcpServer().catch((err) => {
|
startMcpServer().catch((err) => {
|
||||||
log(`MCP server error: ${err instanceof Error ? err.message : String(err)}`);
|
log(`MCP server error: ${err instanceof Error ? err.message : String(err)}`);
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
import { findQuestionResponse, markCompleted } from '../db/messages-in.js';
|
import { findQuestionResponse, markCompleted } from '../db/messages-in.js';
|
||||||
import { writeMessageOut } from '../db/messages-out.js';
|
import { writeMessageOut } from '../db/messages-out.js';
|
||||||
import { getSessionRouting } from '../db/session-routing.js';
|
import { getSessionRouting } from '../db/session-routing.js';
|
||||||
|
import { registerTools } from './server.js';
|
||||||
import type { McpToolDefinition } from './types.js';
|
import type { McpToolDefinition } from './types.js';
|
||||||
|
|
||||||
function log(msg: string): void {
|
function log(msg: string): void {
|
||||||
@@ -165,4 +166,4 @@ export const sendCard: McpToolDefinition = {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export const interactiveTools: McpToolDefinition[] = [askUserQuestion, sendCard];
|
registerTools([askUserQuestion, sendCard]);
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
import { getInboundDb } from '../db/connection.js';
|
import { getInboundDb } from '../db/connection.js';
|
||||||
import { writeMessageOut } from '../db/messages-out.js';
|
import { writeMessageOut } from '../db/messages-out.js';
|
||||||
import { getSessionRouting } from '../db/session-routing.js';
|
import { getSessionRouting } from '../db/session-routing.js';
|
||||||
|
import { registerTools } from './server.js';
|
||||||
import type { McpToolDefinition } from './types.js';
|
import type { McpToolDefinition } from './types.js';
|
||||||
|
|
||||||
function log(msg: string): void {
|
function log(msg: string): void {
|
||||||
@@ -265,4 +266,4 @@ export const updateTask: McpToolDefinition = {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export const schedulingTools: McpToolDefinition[] = [scheduleTask, listTasks, updateTask, cancelTask, pauseTask, resumeTask];
|
registerTools([scheduleTask, listTasks, updateTask, cancelTask, pauseTask, resumeTask]);
|
||||||
|
|||||||
@@ -9,6 +9,7 @@
|
|||||||
* the host side (defense in depth).
|
* the host side (defense in depth).
|
||||||
*/
|
*/
|
||||||
import { writeMessageOut } from '../db/messages-out.js';
|
import { writeMessageOut } from '../db/messages-out.js';
|
||||||
|
import { registerTools } from './server.js';
|
||||||
import type { McpToolDefinition } from './types.js';
|
import type { McpToolDefinition } from './types.js';
|
||||||
|
|
||||||
function log(msg: string): void {
|
function log(msg: string): void {
|
||||||
@@ -140,4 +141,4 @@ export const requestRebuild: McpToolDefinition = {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export const selfModTools: McpToolDefinition[] = [installPackages, addMcpServer, requestRebuild];
|
registerTools([installPackages, addMcpServer, requestRebuild]);
|
||||||
|
|||||||
54
container/agent-runner/src/mcp-tools/server.ts
Normal file
54
container/agent-runner/src/mcp-tools/server.ts
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
/**
|
||||||
|
* MCP server bootstrap + tool self-registration.
|
||||||
|
*
|
||||||
|
* Each tool module calls `registerTools([...])` at import time. The
|
||||||
|
* barrel (`index.ts`) imports every tool module for side effects, then
|
||||||
|
* calls `startMcpServer()` which uses whatever was registered.
|
||||||
|
*
|
||||||
|
* Default when only `core.ts` is imported: the core `send_message` /
|
||||||
|
* `send_file` / `edit_message` / `add_reaction` tools are available.
|
||||||
|
*/
|
||||||
|
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
|
||||||
|
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
|
||||||
|
import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js';
|
||||||
|
|
||||||
|
import type { McpToolDefinition } from './types.js';
|
||||||
|
|
||||||
|
function log(msg: string): void {
|
||||||
|
console.error(`[mcp-tools] ${msg}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const allTools: McpToolDefinition[] = [];
|
||||||
|
const toolMap = new Map<string, McpToolDefinition>();
|
||||||
|
|
||||||
|
export function registerTools(tools: McpToolDefinition[]): void {
|
||||||
|
for (const t of tools) {
|
||||||
|
if (toolMap.has(t.tool.name)) {
|
||||||
|
log(`Warning: tool "${t.tool.name}" already registered, skipping duplicate`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
allTools.push(t);
|
||||||
|
toolMap.set(t.tool.name, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function startMcpServer(): Promise<void> {
|
||||||
|
const server = new Server({ name: 'nanoclaw', version: '2.0.0' }, { capabilities: { tools: {} } });
|
||||||
|
|
||||||
|
server.setRequestHandler(ListToolsRequestSchema, async () => ({
|
||||||
|
tools: allTools.map((t) => t.tool),
|
||||||
|
}));
|
||||||
|
|
||||||
|
server.setRequestHandler(CallToolRequestSchema, async (request) => {
|
||||||
|
const { name, arguments: args } = request.params;
|
||||||
|
const tool = toolMap.get(name);
|
||||||
|
if (!tool) {
|
||||||
|
return { content: [{ type: 'text', text: `Unknown tool: ${name}` }] };
|
||||||
|
}
|
||||||
|
return tool.handler(args ?? {});
|
||||||
|
});
|
||||||
|
|
||||||
|
const transport = new StdioServerTransport();
|
||||||
|
await server.connect(transport);
|
||||||
|
log(`MCP server started with ${allTools.length} tools: ${allTools.map((t) => t.tool.name).join(', ')}`);
|
||||||
|
}
|
||||||
@@ -156,11 +156,18 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|||||||
// Pre-task scripts: for any task rows with a `script`, run it before the
|
// Pre-task scripts: for any task rows with a `script`, run it before the
|
||||||
// provider call. Scripts returning wakeAgent=false (or erroring) gate
|
// provider call. Scripts returning wakeAgent=false (or erroring) gate
|
||||||
// their own task row only — surviving messages still go to the agent.
|
// their own task row only — surviving messages still go to the agent.
|
||||||
|
//
|
||||||
|
// MODULE-HOOK:scheduling-pre-task:start
|
||||||
|
// When scheduling is extracted (PR #4), `applyPreTaskScripts` moves
|
||||||
|
// to the scheduling module and the `/add-scheduling` skill replaces
|
||||||
|
// this block with a call to the module. Without scheduling installed,
|
||||||
|
// the block is empty (no script gating) and `keep = normalMessages`.
|
||||||
const { keep, skipped } = await applyPreTaskScripts(normalMessages);
|
const { keep, skipped } = await applyPreTaskScripts(normalMessages);
|
||||||
if (skipped.length > 0) {
|
if (skipped.length > 0) {
|
||||||
markCompleted(skipped);
|
markCompleted(skipped);
|
||||||
log(`Pre-task script skipped ${skipped.length} task(s): ${skipped.join(', ')}`);
|
log(`Pre-task script skipped ${skipped.length} task(s): ${skipped.join(', ')}`);
|
||||||
}
|
}
|
||||||
|
// MODULE-HOOK:scheduling-pre-task:end
|
||||||
|
|
||||||
if (keep.length === 0) {
|
if (keep.length === 0) {
|
||||||
log(`All ${normalMessages.length} non-command message(s) gated by script, skipping query`);
|
log(`All ${normalMessages.length} non-command message(s) gated by script, skipping query`);
|
||||||
|
|||||||
@@ -13,11 +13,12 @@ import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZO
|
|||||||
import { readContainerConfig, writeContainerConfig } from './container-config.js';
|
import { readContainerConfig, writeContainerConfig } from './container-config.js';
|
||||||
import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js';
|
import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js';
|
||||||
import { getAgentGroup } from './db/agent-groups.js';
|
import { getAgentGroup } from './db/agent-groups.js';
|
||||||
|
import { getDb, hasTable } from './db/connection.js';
|
||||||
import { getAdminsOfAgentGroup, getGlobalAdmins, getOwners } from './db/user-roles.js';
|
import { getAdminsOfAgentGroup, getGlobalAdmins, getOwners } from './db/user-roles.js';
|
||||||
import { initGroupFilesystem } from './group-init.js';
|
import { initGroupFilesystem } from './group-init.js';
|
||||||
import { stopTypingRefresh } from './delivery.js';
|
import { stopTypingRefresh } from './modules/typing/index.js';
|
||||||
import { log } from './log.js';
|
import { log } from './log.js';
|
||||||
import { validateAdditionalMounts } from './mount-security.js';
|
import { validateAdditionalMounts } from './modules/mount-security/index.js';
|
||||||
// Provider host-side config barrel — each provider that needs host-side
|
// Provider host-side config barrel — each provider that needs host-side
|
||||||
// container setup self-registers on import.
|
// container setup self-registers on import.
|
||||||
import './providers/index.js';
|
import './providers/index.js';
|
||||||
@@ -286,10 +287,16 @@ async function buildContainerArgs(
|
|||||||
// Users allowed to run admin commands (e.g. /clear) inside this container.
|
// Users allowed to run admin commands (e.g. /clear) inside this container.
|
||||||
// Computed at wake time: owners + global admins + admins scoped to this
|
// Computed at wake time: owners + global admins + admins scoped to this
|
||||||
// agent group. Role changes take effect on next container spawn.
|
// agent group. Role changes take effect on next container spawn.
|
||||||
|
//
|
||||||
|
// Guarded: if the permissions module isn't installed, `user_roles`
|
||||||
|
// doesn't exist and the set stays empty — the formatter treats an
|
||||||
|
// empty admin set as permissionless (every sender is admin).
|
||||||
const adminUserIds = new Set<string>();
|
const adminUserIds = new Set<string>();
|
||||||
|
if (hasTable(getDb(), 'user_roles')) {
|
||||||
for (const r of getOwners()) adminUserIds.add(r.user_id);
|
for (const r of getOwners()) adminUserIds.add(r.user_id);
|
||||||
for (const r of getGlobalAdmins()) adminUserIds.add(r.user_id);
|
for (const r of getGlobalAdmins()) adminUserIds.add(r.user_id);
|
||||||
for (const r of getAdminsOfAgentGroup(agentGroup.id)) adminUserIds.add(r.user_id);
|
for (const r of getAdminsOfAgentGroup(agentGroup.id)) adminUserIds.add(r.user_id);
|
||||||
|
}
|
||||||
if (adminUserIds.size > 0) {
|
if (adminUserIds.size > 0) {
|
||||||
args.push('-e', `NANOCLAW_ADMIN_USER_IDS=${Array.from(adminUserIds).join(',')}`);
|
args.push('-e', `NANOCLAW_ADMIN_USER_IDS=${Array.from(adminUserIds).join(',')}`);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,3 +31,18 @@ export function closeDb(): void {
|
|||||||
_db?.close();
|
_db?.close();
|
||||||
_db = null;
|
_db = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether a table exists. Used by core code that touches
|
||||||
|
* module-owned tables so that an uninstalled module degrades silently
|
||||||
|
* instead of raising SQLite errors. Cheap: a single indexed lookup on
|
||||||
|
* sqlite_master. Results are not cached — a module install adds the
|
||||||
|
* table at runtime (next service start), and callers may run before
|
||||||
|
* or after that boundary.
|
||||||
|
*/
|
||||||
|
export function hasTable(db: Database.Database, name: string): boolean {
|
||||||
|
const row = db.prepare(`SELECT 1 FROM sqlite_master WHERE type='table' AND name = ? LIMIT 1`).get(name) as
|
||||||
|
| { '1': number }
|
||||||
|
| undefined;
|
||||||
|
return row !== undefined;
|
||||||
|
}
|
||||||
|
|||||||
@@ -32,29 +32,34 @@ export function runMigrations(db: Database.Database): void {
|
|||||||
name TEXT NOT NULL,
|
name TEXT NOT NULL,
|
||||||
applied TEXT NOT NULL
|
applied TEXT NOT NULL
|
||||||
);
|
);
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_schema_version_name ON schema_version(name);
|
||||||
`);
|
`);
|
||||||
|
|
||||||
const currentVersion =
|
// Uniqueness is keyed on `name`, not `version`. This lets module
|
||||||
(db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number | null })?.v ?? 0;
|
// migrations (added later by install skills) pick arbitrary version
|
||||||
|
// numbers without coordinating across modules. `version` stays on
|
||||||
const pending = migrations.filter((m) => m.version > currentVersion);
|
// the Migration object as an ordering hint within the barrel array;
|
||||||
|
// the stored `version` column is auto-assigned at insert time as an
|
||||||
|
// applied-order number.
|
||||||
|
const applied = new Set<string>(
|
||||||
|
(db.prepare('SELECT name FROM schema_version').all() as { name: string }[]).map((r) => r.name),
|
||||||
|
);
|
||||||
|
const pending = migrations.filter((m) => !applied.has(m.name));
|
||||||
if (pending.length === 0) return;
|
if (pending.length === 0) return;
|
||||||
|
|
||||||
log.info('Running migrations', {
|
log.info('Running migrations', { count: pending.length });
|
||||||
from: currentVersion,
|
|
||||||
to: pending[pending.length - 1].version,
|
|
||||||
count: pending.length,
|
|
||||||
});
|
|
||||||
|
|
||||||
for (const m of pending) {
|
for (const m of pending) {
|
||||||
db.transaction(() => {
|
db.transaction(() => {
|
||||||
m.up(db);
|
m.up(db);
|
||||||
|
const next =
|
||||||
|
(db.prepare('SELECT COALESCE(MAX(version), 0) + 1 AS v FROM schema_version').get() as { v: number }).v;
|
||||||
db.prepare('INSERT INTO schema_version (version, name, applied) VALUES (?, ?, ?)').run(
|
db.prepare('INSERT INTO schema_version (version, name, applied) VALUES (?, ?, ?)').run(
|
||||||
m.version,
|
next,
|
||||||
m.name,
|
m.name,
|
||||||
new Date().toISOString(),
|
new Date().toISOString(),
|
||||||
);
|
);
|
||||||
})();
|
})();
|
||||||
log.info('Migration applied', { version: m.version, name: m.name });
|
log.info('Migration applied', { name: m.name });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
197
src/delivery.ts
197
src/delivery.ts
@@ -21,6 +21,7 @@ import {
|
|||||||
} from './db/sessions.js';
|
} from './db/sessions.js';
|
||||||
import { getAgentGroup, createAgentGroup, updateAgentGroup, getAgentGroupByFolder } from './db/agent-groups.js';
|
import { getAgentGroup, createAgentGroup, updateAgentGroup, getAgentGroupByFolder } from './db/agent-groups.js';
|
||||||
import { createDestination, getDestinationByName, hasDestination, normalizeName } from './db/agent-destinations.js';
|
import { createDestination, getDestinationByName, hasDestination, normalizeName } from './db/agent-destinations.js';
|
||||||
|
import { getDb, hasTable } from './db/connection.js';
|
||||||
import { getMessagingGroup, getMessagingGroupByPlatform } from './db/messaging-groups.js';
|
import { getMessagingGroup, getMessagingGroupByPlatform } from './db/messaging-groups.js';
|
||||||
import { pickApprovalDelivery, pickApprover } from './access.js';
|
import { pickApprovalDelivery, pickApprover } from './access.js';
|
||||||
import {
|
import {
|
||||||
@@ -38,18 +39,16 @@ import {
|
|||||||
import { log } from './log.js';
|
import { log } from './log.js';
|
||||||
import { normalizeOptions, type RawOption } from './channels/ask-question.js';
|
import { normalizeOptions, type RawOption } from './channels/ask-question.js';
|
||||||
import {
|
import {
|
||||||
heartbeatPath,
|
|
||||||
openInboundDb,
|
openInboundDb,
|
||||||
openOutboundDb,
|
openOutboundDb,
|
||||||
sessionDir,
|
sessionDir,
|
||||||
inboundDbPath,
|
|
||||||
resolveSession,
|
resolveSession,
|
||||||
writeDestinations,
|
writeDestinations,
|
||||||
writeSessionMessage,
|
writeSessionMessage,
|
||||||
writeSystemResponse,
|
|
||||||
} from './session-manager.js';
|
} from './session-manager.js';
|
||||||
import { resetContainerIdleTimer, wakeContainer } from './container-runner.js';
|
import { resetContainerIdleTimer, wakeContainer } from './container-runner.js';
|
||||||
import { initGroupFilesystem } from './group-init.js';
|
import { initGroupFilesystem } from './group-init.js';
|
||||||
|
import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js';
|
||||||
import type { OutboundFile } from './channels/adapter.js';
|
import type { OutboundFile } from './channels/adapter.js';
|
||||||
import type { AgentGroup, Session } from './types.js';
|
import type { AgentGroup, Session } from './types.js';
|
||||||
|
|
||||||
@@ -93,6 +92,9 @@ let sweepPolling = false;
|
|||||||
|
|
||||||
export function setDeliveryAdapter(adapter: ChannelDeliveryAdapter): void {
|
export function setDeliveryAdapter(adapter: ChannelDeliveryAdapter): void {
|
||||||
deliveryAdapter = adapter;
|
deliveryAdapter = adapter;
|
||||||
|
// Forward to the typing module so it can fire setTyping on its own
|
||||||
|
// interval. Direct call, not a registry — typing is a default module.
|
||||||
|
setTypingAdapter(adapter);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -194,148 +196,6 @@ async function requestApproval(
|
|||||||
log.info('Approval requested', { action, approvalId, agentName, approver: target.userId });
|
log.info('Approval requested', { action, approvalId, agentName, approver: target.userId });
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Show typing indicator on a channel. Called when a message is routed to the agent. */
|
|
||||||
export async function triggerTyping(channelType: string, platformId: string, threadId: string | null): Promise<void> {
|
|
||||||
try {
|
|
||||||
await deliveryAdapter?.setTyping?.(channelType, platformId, threadId);
|
|
||||||
} catch {
|
|
||||||
// Typing is best-effort — don't fail routing if it errors
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Typing refresh ──
|
|
||||||
// Most platforms expire a typing indicator after 5–10s, so a one-shot call
|
|
||||||
// on message arrival goes stale long before the agent finishes thinking.
|
|
||||||
// We keep it alive by re-firing setTyping on a short interval — but only
|
|
||||||
// while the agent is actually WORKING, not just while the container is
|
|
||||||
// alive. The agent-runner touches `heartbeat` on every SDK event, so we
|
|
||||||
// gate each tick on "is the heartbeat file fresh?". If it goes stale (agent
|
|
||||||
// finished its turn and is idle-polling), the refresh stops on its own
|
|
||||||
// without waiting for the container to exit.
|
|
||||||
//
|
|
||||||
// After delivering a user-facing message, the refresh is paused for
|
|
||||||
// POST_DELIVERY_PAUSE_MS — long enough for the client-side typing
|
|
||||||
// indicator to visually clear (Discord ~10s, Telegram ~5s). If the agent
|
|
||||||
// keeps touching heartbeat past the pause window, typing resumes
|
|
||||||
// naturally on the next refresh tick.
|
|
||||||
//
|
|
||||||
// `startTypingRefresh` is idempotent per session. `stopTypingRefresh` is
|
|
||||||
// called from container-runner.ts on container exit as a fast-path cleanup
|
|
||||||
// (the heartbeat-staleness path would catch it within one tick anyway).
|
|
||||||
const TYPING_REFRESH_MS = 4000;
|
|
||||||
// Grace window from startTypingRefresh: fire typing unconditionally for
|
|
||||||
// this long regardless of heartbeat state. Covers container spawn/wake
|
|
||||||
// latency, which can be 5–12s on a cold start before the first heartbeat
|
|
||||||
// touch lands.
|
|
||||||
const TYPING_GRACE_MS = 15000;
|
|
||||||
// After the grace window, a heartbeat must be mtimed within this many
|
|
||||||
// milliseconds of now to count as "agent is working." Heartbeats are
|
|
||||||
// touched on every SDK event (tool calls, result chunks), so during
|
|
||||||
// active work they land every few hundred ms. 6s is well above that
|
|
||||||
// while still being small enough to stop typing quickly when the agent
|
|
||||||
// goes idle.
|
|
||||||
const HEARTBEAT_FRESH_MS = 6000;
|
|
||||||
// After we deliver a user-facing message, pause typing for this long so
|
|
||||||
// the client-side indicator has time to visually clear. Tuned for the
|
|
||||||
// longest common client expiry (Discord ~10s). The interval stays
|
|
||||||
// running; ticks inside the pause just skip the setTyping call.
|
|
||||||
const POST_DELIVERY_PAUSE_MS = 10000;
|
|
||||||
|
|
||||||
interface TypingTarget {
|
|
||||||
agentGroupId: string;
|
|
||||||
channelType: string;
|
|
||||||
platformId: string;
|
|
||||||
threadId: string | null;
|
|
||||||
interval: NodeJS.Timeout;
|
|
||||||
startedAt: number;
|
|
||||||
pausedUntil: number; // epoch ms; 0 = not paused
|
|
||||||
}
|
|
||||||
|
|
||||||
const typingRefreshers = new Map<string, TypingTarget>();
|
|
||||||
|
|
||||||
function isHeartbeatFresh(agentGroupId: string, sessionId: string): boolean {
|
|
||||||
const hbPath = heartbeatPath(agentGroupId, sessionId);
|
|
||||||
try {
|
|
||||||
const stat = fs.statSync(hbPath);
|
|
||||||
return Date.now() - stat.mtimeMs < HEARTBEAT_FRESH_MS;
|
|
||||||
} catch {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function startTypingRefresh(
|
|
||||||
sessionId: string,
|
|
||||||
agentGroupId: string,
|
|
||||||
channelType: string,
|
|
||||||
platformId: string,
|
|
||||||
threadId: string | null,
|
|
||||||
): void {
|
|
||||||
const existing = typingRefreshers.get(sessionId);
|
|
||||||
if (existing) {
|
|
||||||
// Already refreshing. Fire an immediate tick for the new inbound
|
|
||||||
// event and reset the grace window — the new message restarts the
|
|
||||||
// container-wake latency budget. Also clear any lingering
|
|
||||||
// post-delivery pause: a new inbound means the user expects typing
|
|
||||||
// to show immediately.
|
|
||||||
triggerTyping(channelType, platformId, threadId).catch(() => {});
|
|
||||||
existing.startedAt = Date.now();
|
|
||||||
existing.pausedUntil = 0;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Immediate tick + periodic refresh.
|
|
||||||
triggerTyping(channelType, platformId, threadId).catch(() => {});
|
|
||||||
const startedAt = Date.now();
|
|
||||||
const interval = setInterval(() => {
|
|
||||||
const entry = typingRefreshers.get(sessionId);
|
|
||||||
if (!entry) return; // stopped externally since this tick was scheduled
|
|
||||||
|
|
||||||
// Inside a post-delivery pause: skip setTyping but keep the interval
|
|
||||||
// running so we resume automatically once the pause expires.
|
|
||||||
if (entry.pausedUntil > Date.now()) return;
|
|
||||||
|
|
||||||
const withinGrace = Date.now() - entry.startedAt < TYPING_GRACE_MS;
|
|
||||||
if (withinGrace || isHeartbeatFresh(entry.agentGroupId, sessionId)) {
|
|
||||||
triggerTyping(entry.channelType, entry.platformId, entry.threadId).catch(() => {});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Out of grace AND heartbeat stale — agent is idle, stop refreshing.
|
|
||||||
clearInterval(entry.interval);
|
|
||||||
typingRefreshers.delete(sessionId);
|
|
||||||
}, TYPING_REFRESH_MS);
|
|
||||||
// unref so a stale refresher can't hold the event loop alive.
|
|
||||||
interval.unref();
|
|
||||||
typingRefreshers.set(sessionId, {
|
|
||||||
agentGroupId,
|
|
||||||
channelType,
|
|
||||||
platformId,
|
|
||||||
threadId,
|
|
||||||
interval,
|
|
||||||
startedAt,
|
|
||||||
pausedUntil: 0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pause the typing refresh for POST_DELIVERY_PAUSE_MS. Called after a
|
|
||||||
* user-facing message is delivered so the client-side indicator has a
|
|
||||||
* chance to visually clear before the agent's next SDK event pushes it
|
|
||||||
* back on. No-op if no refresh is active for this session.
|
|
||||||
*/
|
|
||||||
export function pauseTypingRefreshAfterDelivery(sessionId: string): void {
|
|
||||||
const entry = typingRefreshers.get(sessionId);
|
|
||||||
if (!entry) return;
|
|
||||||
entry.pausedUntil = Date.now() + POST_DELIVERY_PAUSE_MS;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function stopTypingRefresh(sessionId: string): void {
|
|
||||||
const entry = typingRefreshers.get(sessionId);
|
|
||||||
if (!entry) return;
|
|
||||||
clearInterval(entry.interval);
|
|
||||||
typingRefreshers.delete(sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Start the active container poll loop (~1s). */
|
/** Start the active container poll loop (~1s). */
|
||||||
export function startActiveDeliveryPoll(): void {
|
export function startActiveDeliveryPoll(): void {
|
||||||
if (activePolling) return;
|
if (activePolling) return;
|
||||||
@@ -553,15 +413,22 @@ async function deliverMessage(
|
|||||||
throw new Error(`unknown messaging group for ${msg.channel_type}/${msg.platform_id} (message ${msg.id})`);
|
throw new Error(`unknown messaging group for ${msg.channel_type}/${msg.platform_id} (message ${msg.id})`);
|
||||||
}
|
}
|
||||||
const isOriginChat = session.messaging_group_id === mg.id;
|
const isOriginChat = session.messaging_group_id === mg.id;
|
||||||
if (!isOriginChat && !hasDestination(session.agent_group_id, 'channel', mg.id)) {
|
// Guarded: without the agent-to-agent module, `agent_destinations`
|
||||||
|
// doesn't exist and we permit all non-origin channel sends (the
|
||||||
|
// origin-chat case is always allowed regardless).
|
||||||
|
const checkDestinations = hasTable(getDb(), 'agent_destinations');
|
||||||
|
if (!isOriginChat && checkDestinations && !hasDestination(session.agent_group_id, 'channel', mg.id)) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
`unauthorized channel destination: ${session.agent_group_id} cannot send to ${mg.channel_type}/${mg.platform_id}`,
|
`unauthorized channel destination: ${session.agent_group_id} cannot send to ${mg.channel_type}/${mg.platform_id}`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track pending questions for ask_user_question flow
|
// Track pending questions for ask_user_question flow.
|
||||||
if (content.type === 'ask_question' && content.questionId) {
|
// Guarded: without the interactive module, `pending_questions` doesn't
|
||||||
|
// exist and we skip persistence — the card still delivers to the user,
|
||||||
|
// but the response path has nowhere to land and will log unclaimed.
|
||||||
|
if (content.type === 'ask_question' && content.questionId && hasTable(getDb(), 'pending_questions')) {
|
||||||
const title = content.title as string | undefined;
|
const title = content.title as string | undefined;
|
||||||
const rawOptions = content.options as unknown;
|
const rawOptions = content.options as unknown;
|
||||||
if (!title || !Array.isArray(rawOptions)) {
|
if (!title || !Array.isArray(rawOptions)) {
|
||||||
@@ -637,6 +504,34 @@ async function deliverMessage(
|
|||||||
return platformMsgId;
|
return platformMsgId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delivery action registry.
|
||||||
|
*
|
||||||
|
* Modules register handlers for system-kind outbound message actions via
|
||||||
|
* `registerDeliveryAction`. Core checks the registry first in
|
||||||
|
* `handleSystemAction` and falls through to the inline switch when no
|
||||||
|
* handler is registered. The switch will shrink as modules are extracted
|
||||||
|
* (scheduling, approvals, agent-to-agent) and eventually only its default
|
||||||
|
* branch remains.
|
||||||
|
*
|
||||||
|
* Default when no handler registered and the switch doesn't match: log
|
||||||
|
* "Unknown system action" and return.
|
||||||
|
*/
|
||||||
|
export type DeliveryActionHandler = (
|
||||||
|
content: Record<string, unknown>,
|
||||||
|
session: Session,
|
||||||
|
inDb: Database.Database,
|
||||||
|
) => Promise<void>;
|
||||||
|
|
||||||
|
const actionHandlers = new Map<string, DeliveryActionHandler>();
|
||||||
|
|
||||||
|
export function registerDeliveryAction(action: string, handler: DeliveryActionHandler): void {
|
||||||
|
if (actionHandlers.has(action)) {
|
||||||
|
log.warn('Delivery action handler overwritten', { action });
|
||||||
|
}
|
||||||
|
actionHandlers.set(action, handler);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle system actions from the container agent.
|
* Handle system actions from the container agent.
|
||||||
* These are written to messages_out because the container can't write to inbound.db.
|
* These are written to messages_out because the container can't write to inbound.db.
|
||||||
@@ -650,6 +545,12 @@ async function handleSystemAction(
|
|||||||
const action = content.action as string;
|
const action = content.action as string;
|
||||||
log.info('System action from agent', { sessionId: session.id, action });
|
log.info('System action from agent', { sessionId: session.id, action });
|
||||||
|
|
||||||
|
const registered = actionHandlers.get(action);
|
||||||
|
if (registered) {
|
||||||
|
await registered(content, session, inDb);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
switch (action) {
|
switch (action) {
|
||||||
case 'schedule_task': {
|
case 'schedule_task': {
|
||||||
const taskId = content.taskId as string;
|
const taskId = content.taskId as string;
|
||||||
|
|||||||
@@ -100,8 +100,14 @@ async function sweepSession(session: Session): Promise<void> {
|
|||||||
detectStaleContainers(inDb, outDb, session, agentGroup.id);
|
detectStaleContainers(inDb, outDb, session, agentGroup.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Handle recurrence for completed messages
|
// 4. Handle recurrence for completed messages.
|
||||||
|
// MODULE-HOOK:scheduling-recurrence:start
|
||||||
|
// When scheduling is extracted (PR #4), `handleRecurrence` moves to
|
||||||
|
// `src/modules/scheduling/` and the `/add-scheduling` skill replaces
|
||||||
|
// this block with a call to the module. Without scheduling
|
||||||
|
// installed, the block is empty and recurrence is a no-op.
|
||||||
await handleRecurrence(inDb, session);
|
await handleRecurrence(inDb, session);
|
||||||
|
// MODULE-HOOK:scheduling-recurrence:end
|
||||||
} finally {
|
} finally {
|
||||||
inDb.close();
|
inDb.close();
|
||||||
outDb?.close();
|
outDb?.close();
|
||||||
|
|||||||
58
src/index.ts
58
src/index.ts
@@ -33,10 +33,55 @@ import { writeSessionMessage } from './session-manager.js';
|
|||||||
import { wakeContainer, buildAgentGroupImage, killContainer } from './container-runner.js';
|
import { wakeContainer, buildAgentGroupImage, killContainer } from './container-runner.js';
|
||||||
import { log } from './log.js';
|
import { log } from './log.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Response handler registry.
|
||||||
|
*
|
||||||
|
* Button-click / question responses arrive via the channel adapter's
|
||||||
|
* `onAction` callback. Core iterates registered handlers in registration
|
||||||
|
* order; the first one that returns `true` claims the response.
|
||||||
|
* Unclaimed responses fall through to the inline `handleQuestionResponse`
|
||||||
|
* below (which handles OneCLI credential approvals, pending_approvals,
|
||||||
|
* and pending_questions). As those modules are extracted, the inline
|
||||||
|
* function will shrink and the registry will own the full dispatch.
|
||||||
|
*/
|
||||||
|
export interface ResponsePayload {
|
||||||
|
questionId: string;
|
||||||
|
value: string;
|
||||||
|
userId: string | null;
|
||||||
|
channelType: string;
|
||||||
|
platformId: string;
|
||||||
|
threadId: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type ResponseHandler = (payload: ResponsePayload) => Promise<boolean>;
|
||||||
|
|
||||||
|
const responseHandlers: ResponseHandler[] = [];
|
||||||
|
|
||||||
|
export function registerResponseHandler(handler: ResponseHandler): void {
|
||||||
|
responseHandlers.push(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function dispatchResponse(payload: ResponsePayload): Promise<void> {
|
||||||
|
for (const handler of responseHandlers) {
|
||||||
|
try {
|
||||||
|
const claimed = await handler(payload);
|
||||||
|
if (claimed) return;
|
||||||
|
} catch (err) {
|
||||||
|
log.error('Response handler threw', { questionId: payload.questionId, err });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Unclaimed — fall through to inline handler.
|
||||||
|
await handleQuestionResponse(payload.questionId, payload.value, payload.userId ?? '');
|
||||||
|
}
|
||||||
|
|
||||||
// Channel barrel — each enabled channel self-registers on import.
|
// Channel barrel — each enabled channel self-registers on import.
|
||||||
// Channel skills uncomment lines in channels/index.ts to enable them.
|
// Channel skills uncomment lines in channels/index.ts to enable them.
|
||||||
import './channels/index.js';
|
import './channels/index.js';
|
||||||
|
|
||||||
|
// Modules barrel — default modules (typing, mount-security) ship here; skills
|
||||||
|
// append registry-based modules. Imported for side effects (registrations).
|
||||||
|
import './modules/index.js';
|
||||||
|
|
||||||
import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js';
|
import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js';
|
||||||
import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js';
|
import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js';
|
||||||
|
|
||||||
@@ -82,7 +127,18 @@ async function main(): Promise<void> {
|
|||||||
});
|
});
|
||||||
},
|
},
|
||||||
onAction(questionId, selectedOption, userId) {
|
onAction(questionId, selectedOption, userId) {
|
||||||
handleQuestionResponse(questionId, selectedOption, userId).catch((err) => {
|
dispatchResponse({
|
||||||
|
questionId,
|
||||||
|
value: selectedOption,
|
||||||
|
userId,
|
||||||
|
channelType: adapter.channelType,
|
||||||
|
// platformId/threadId aren't surfaced by the current onAction
|
||||||
|
// signature — the inline fallback looks them up from the
|
||||||
|
// pending_question / pending_approval row. Registered handlers
|
||||||
|
// typically do the same.
|
||||||
|
platformId: '',
|
||||||
|
threadId: null,
|
||||||
|
}).catch((err) => {
|
||||||
log.error('Failed to handle question response', { questionId, err });
|
log.error('Failed to handle question response', { questionId, err });
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|||||||
16
src/modules/index.ts
Normal file
16
src/modules/index.ts
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
/**
|
||||||
|
* Modules barrel.
|
||||||
|
*
|
||||||
|
* Each module self-registers at import time. This barrel is imported by
|
||||||
|
* src/index.ts for side effects (registry registrations, typing impl setup,
|
||||||
|
* etc.). Core runs with an empty barrel — the registries have inline
|
||||||
|
* fallbacks and `sqlite_master` guards.
|
||||||
|
*
|
||||||
|
* Default modules (ship with main, direct core import):
|
||||||
|
* - src/modules/typing/ → imported directly by router/delivery/container-runner
|
||||||
|
* - src/modules/mount-security/ → imported directly by container-runner
|
||||||
|
*
|
||||||
|
* Registry-based modules (installed via /add-<name> skills, pulled from the
|
||||||
|
* `modules` branch): append imports below.
|
||||||
|
*/
|
||||||
|
export {};
|
||||||
@@ -9,8 +9,8 @@
|
|||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import os from 'os';
|
import os from 'os';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
import { MOUNT_ALLOWLIST_PATH } from './config.js';
|
import { MOUNT_ALLOWLIST_PATH } from '../../config.js';
|
||||||
import { log } from './log.js';
|
import { log } from '../../log.js';
|
||||||
|
|
||||||
export interface AdditionalMount {
|
export interface AdditionalMount {
|
||||||
hostPath: string;
|
hostPath: string;
|
||||||
165
src/modules/typing/index.ts
Normal file
165
src/modules/typing/index.ts
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
/**
|
||||||
|
* Typing indicator refresh — default module.
|
||||||
|
*
|
||||||
|
* Most platforms expire a typing indicator after 5–10s, so a one-shot
|
||||||
|
* call on message arrival goes stale long before the agent finishes
|
||||||
|
* thinking. This module keeps it alive by re-firing `setTyping` on a
|
||||||
|
* short interval — but only while the agent is actually WORKING, gated
|
||||||
|
* on the heartbeat file's mtime after an initial grace period.
|
||||||
|
*
|
||||||
|
* After delivering a user-facing message, the refresh is paused for
|
||||||
|
* POST_DELIVERY_PAUSE_MS so the client-side indicator can visually
|
||||||
|
* clear.
|
||||||
|
*
|
||||||
|
* Default module status:
|
||||||
|
* - Lives in src/modules/ for signaling (not really core), but ships
|
||||||
|
* on main and is imported directly by core. No registry, no hook.
|
||||||
|
* - Removing requires editing src/router.ts, src/delivery.ts, and
|
||||||
|
* src/container-runner.ts to drop the calls.
|
||||||
|
*/
|
||||||
|
import fs from 'fs';
|
||||||
|
|
||||||
|
import { heartbeatPath } from '../../session-manager.js';
|
||||||
|
|
||||||
|
const TYPING_REFRESH_MS = 4000;
|
||||||
|
/**
|
||||||
|
* Grace window from startTypingRefresh: fire typing unconditionally
|
||||||
|
* for this long regardless of heartbeat state. Covers container
|
||||||
|
* spawn/wake latency (5–12s on cold start before first heartbeat).
|
||||||
|
*/
|
||||||
|
const TYPING_GRACE_MS = 15000;
|
||||||
|
/**
|
||||||
|
* After the grace window, a heartbeat must be mtimed within this
|
||||||
|
* many ms of now to count as "agent is working." Heartbeats land
|
||||||
|
* every few hundred ms during active work, so 6s is well above
|
||||||
|
* the working floor and small enough to stop typing quickly when
|
||||||
|
* the agent goes idle.
|
||||||
|
*/
|
||||||
|
const HEARTBEAT_FRESH_MS = 6000;
|
||||||
|
/**
|
||||||
|
* After we deliver a user-facing message, pause typing for this
|
||||||
|
* long so the client-side indicator has time to visually clear.
|
||||||
|
* Tuned for the longest common expiry (Discord ~10s). The interval
|
||||||
|
* stays running; ticks inside the pause just skip the setTyping call.
|
||||||
|
*/
|
||||||
|
const POST_DELIVERY_PAUSE_MS = 10000;
|
||||||
|
|
||||||
|
interface TypingAdapter {
|
||||||
|
setTyping?(channelType: string, platformId: string, threadId: string | null): Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface TypingTarget {
|
||||||
|
agentGroupId: string;
|
||||||
|
channelType: string;
|
||||||
|
platformId: string;
|
||||||
|
threadId: string | null;
|
||||||
|
interval: NodeJS.Timeout;
|
||||||
|
startedAt: number;
|
||||||
|
pausedUntil: number; // epoch ms; 0 = not paused
|
||||||
|
}
|
||||||
|
|
||||||
|
let adapter: TypingAdapter | null = null;
|
||||||
|
const typingRefreshers = new Map<string, TypingTarget>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bind the typing module to the channel delivery adapter so it can
|
||||||
|
* call `setTyping`. Called once by `src/delivery.ts` inside
|
||||||
|
* `setDeliveryAdapter`. Passing a fresh adapter replaces the prior
|
||||||
|
* binding and leaves active refreshers in place (they'll use the
|
||||||
|
* new adapter on their next tick).
|
||||||
|
*/
|
||||||
|
export function setTypingAdapter(a: TypingAdapter): void {
|
||||||
|
adapter = a;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function triggerTyping(channelType: string, platformId: string, threadId: string | null): Promise<void> {
|
||||||
|
try {
|
||||||
|
await adapter?.setTyping?.(channelType, platformId, threadId);
|
||||||
|
} catch {
|
||||||
|
// Typing is best-effort — don't let it fail delivery or routing.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function isHeartbeatFresh(agentGroupId: string, sessionId: string): boolean {
|
||||||
|
const hbPath = heartbeatPath(agentGroupId, sessionId);
|
||||||
|
try {
|
||||||
|
const stat = fs.statSync(hbPath);
|
||||||
|
return Date.now() - stat.mtimeMs < HEARTBEAT_FRESH_MS;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function startTypingRefresh(
|
||||||
|
sessionId: string,
|
||||||
|
agentGroupId: string,
|
||||||
|
channelType: string,
|
||||||
|
platformId: string,
|
||||||
|
threadId: string | null,
|
||||||
|
): void {
|
||||||
|
const existing = typingRefreshers.get(sessionId);
|
||||||
|
if (existing) {
|
||||||
|
// Already refreshing. Fire an immediate tick for the new inbound
|
||||||
|
// event and reset the grace window — the new message restarts
|
||||||
|
// the container-wake latency budget. Also clear any lingering
|
||||||
|
// post-delivery pause: a new inbound means the user expects
|
||||||
|
// typing to show immediately.
|
||||||
|
triggerTyping(channelType, platformId, threadId).catch(() => {});
|
||||||
|
existing.startedAt = Date.now();
|
||||||
|
existing.pausedUntil = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Immediate tick + periodic refresh.
|
||||||
|
triggerTyping(channelType, platformId, threadId).catch(() => {});
|
||||||
|
const startedAt = Date.now();
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
const entry = typingRefreshers.get(sessionId);
|
||||||
|
if (!entry) return; // stopped externally since this tick was scheduled
|
||||||
|
|
||||||
|
// Inside a post-delivery pause: skip setTyping but keep the
|
||||||
|
// interval running so we resume automatically once the pause
|
||||||
|
// expires.
|
||||||
|
if (entry.pausedUntil > Date.now()) return;
|
||||||
|
|
||||||
|
const withinGrace = Date.now() - entry.startedAt < TYPING_GRACE_MS;
|
||||||
|
if (withinGrace || isHeartbeatFresh(entry.agentGroupId, sessionId)) {
|
||||||
|
triggerTyping(entry.channelType, entry.platformId, entry.threadId).catch(() => {});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Out of grace AND heartbeat stale — agent is idle, stop refreshing.
|
||||||
|
clearInterval(entry.interval);
|
||||||
|
typingRefreshers.delete(sessionId);
|
||||||
|
}, TYPING_REFRESH_MS);
|
||||||
|
// unref so a stale refresher can't hold the event loop alive.
|
||||||
|
interval.unref();
|
||||||
|
typingRefreshers.set(sessionId, {
|
||||||
|
agentGroupId,
|
||||||
|
channelType,
|
||||||
|
platformId,
|
||||||
|
threadId,
|
||||||
|
interval,
|
||||||
|
startedAt,
|
||||||
|
pausedUntil: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pause the typing refresh for POST_DELIVERY_PAUSE_MS. Called after
|
||||||
|
* a user-facing message is delivered so the client-side indicator
|
||||||
|
* has a chance to visually clear before the agent's next SDK event
|
||||||
|
* pushes it back on. No-op if no refresh is active for this session.
|
||||||
|
*/
|
||||||
|
export function pauseTypingRefreshAfterDelivery(sessionId: string): void {
|
||||||
|
const entry = typingRefreshers.get(sessionId);
|
||||||
|
if (!entry) return;
|
||||||
|
entry.pausedUntil = Date.now() + POST_DELIVERY_PAUSE_MS;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function stopTypingRefresh(sessionId: string): void {
|
||||||
|
const entry = typingRefreshers.get(sessionId);
|
||||||
|
if (!entry) return;
|
||||||
|
clearInterval(entry.interval);
|
||||||
|
typingRefreshers.delete(sessionId);
|
||||||
|
}
|
||||||
@@ -21,7 +21,7 @@ import { getChannelAdapter } from './channels/channel-registry.js';
|
|||||||
import { isMember } from './db/agent-group-members.js';
|
import { isMember } from './db/agent-group-members.js';
|
||||||
import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js';
|
import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js';
|
||||||
import { upsertUser, getUser } from './db/users.js';
|
import { upsertUser, getUser } from './db/users.js';
|
||||||
import { startTypingRefresh } from './delivery.js';
|
import { startTypingRefresh } from './modules/typing/index.js';
|
||||||
import { log } from './log.js';
|
import { log } from './log.js';
|
||||||
import { resolveSession, writeSessionMessage } from './session-manager.js';
|
import { resolveSession, writeSessionMessage } from './session-manager.js';
|
||||||
import { wakeContainer } from './container-runner.js';
|
import { wakeContainer } from './container-runner.js';
|
||||||
@@ -45,6 +45,34 @@ export interface InboundEvent {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inbound gate registry.
|
||||||
|
*
|
||||||
|
* A module (permissions, today) can register a single gate function that
|
||||||
|
* owns sender resolution + access decision. Without a registered gate,
|
||||||
|
* core falls back to the inline `extractAndUpsertUser` +
|
||||||
|
* `enforceAccess` + `handleUnknownSender` chain.
|
||||||
|
*
|
||||||
|
* Takes the raw event so the gate can read sender fields from
|
||||||
|
* `event.message.content`. Returns either allowed=true with a `userId`
|
||||||
|
* (null if unresolved) or allowed=false with a reason; core drops the
|
||||||
|
* message on refusal.
|
||||||
|
*/
|
||||||
|
export type InboundGateResult =
|
||||||
|
| { allowed: true; userId: string | null }
|
||||||
|
| { allowed: false; userId: string | null; reason: string };
|
||||||
|
|
||||||
|
export type InboundGateFn = (event: InboundEvent, mg: MessagingGroup, agentGroupId: string) => InboundGateResult;
|
||||||
|
|
||||||
|
let inboundGate: InboundGateFn | null = null;
|
||||||
|
|
||||||
|
export function setInboundGate(fn: InboundGateFn): void {
|
||||||
|
if (inboundGate) {
|
||||||
|
log.warn('Inbound gate overwritten');
|
||||||
|
}
|
||||||
|
inboundGate = fn;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Route an inbound message from a channel adapter to the correct session.
|
* Route an inbound message from a channel adapter to the correct session.
|
||||||
* Creates messaging group + session if they don't exist yet.
|
* Creates messaging group + session if they don't exist yet.
|
||||||
@@ -81,13 +109,8 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Resolve sender → user id. Upsert into users table on first sight so
|
// 2. Resolve agent groups wired to this messaging group. (The gate runs
|
||||||
// subsequent messages find an existing row. `userId` is null if the
|
// after this so it can decide based on the target agent group.)
|
||||||
// adapter didn't give us enough to identify a sender (the gate will
|
|
||||||
// then apply unknown_sender_policy).
|
|
||||||
const userId = extractAndUpsertUser(event);
|
|
||||||
|
|
||||||
// 3. Resolve agent groups wired to this messaging group
|
|
||||||
const agents = getMessagingGroupAgents(mg.id);
|
const agents = getMessagingGroupAgents(mg.id);
|
||||||
if (agents.length === 0) {
|
if (agents.length === 0) {
|
||||||
log.warn('MESSAGE DROPPED — no agent groups wired to this channel. Run setup register step to configure.', {
|
log.warn('MESSAGE DROPPED — no agent groups wired to this channel. Run setup register step to configure.', {
|
||||||
@@ -128,7 +151,24 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Access gate. Public channels skip the gate entirely.
|
// 3. Inbound gate: sender resolution + access decision. If a module
|
||||||
|
// registered a gate, it owns the whole thing (it can upsert users,
|
||||||
|
// check roles, etc.). Otherwise fall back to the inline chain.
|
||||||
|
let userId: string | null;
|
||||||
|
if (inboundGate) {
|
||||||
|
const result = inboundGate(event, mg, match.agent_group_id);
|
||||||
|
userId = result.userId;
|
||||||
|
if (!result.allowed) {
|
||||||
|
log.info('MESSAGE DROPPED — inbound gate refused', {
|
||||||
|
messagingGroupId: mg.id,
|
||||||
|
agentGroupId: match.agent_group_id,
|
||||||
|
userId,
|
||||||
|
reason: result.reason,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
userId = extractAndUpsertUser(event);
|
||||||
if (mg.unknown_sender_policy !== 'public') {
|
if (mg.unknown_sender_policy !== 'public') {
|
||||||
const gate = enforceAccess(userId, match.agent_group_id);
|
const gate = enforceAccess(userId, match.agent_group_id);
|
||||||
if (!gate.allowed) {
|
if (!gate.allowed) {
|
||||||
@@ -136,6 +176,7 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 5. Resolve or create session.
|
// 5. Resolve or create session.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import path from 'path';
|
|||||||
import { DATA_DIR } from './config.js';
|
import { DATA_DIR } from './config.js';
|
||||||
import { getAgentGroup } from './db/agent-groups.js';
|
import { getAgentGroup } from './db/agent-groups.js';
|
||||||
import { getDestinations } from './db/agent-destinations.js';
|
import { getDestinations } from './db/agent-destinations.js';
|
||||||
|
import { getDb, hasTable } from './db/connection.js';
|
||||||
import { getMessagingGroup } from './db/messaging-groups.js';
|
import { getMessagingGroup } from './db/messaging-groups.js';
|
||||||
import { createSession, findSession, findSessionByAgentGroup, getSession, updateSession } from './db/sessions.js';
|
import { createSession, findSession, findSessionByAgentGroup, getSession, updateSession } from './db/sessions.js';
|
||||||
import {
|
import {
|
||||||
@@ -183,6 +184,11 @@ export function writeDestinations(agentGroupId: string, sessionId: string): void
|
|||||||
const dbPath = inboundDbPath(agentGroupId, sessionId);
|
const dbPath = inboundDbPath(agentGroupId, sessionId);
|
||||||
if (!fs.existsSync(dbPath)) return;
|
if (!fs.existsSync(dbPath)) return;
|
||||||
|
|
||||||
|
// Guarded: when the agent-to-agent module isn't installed, the
|
||||||
|
// `agent_destinations` table doesn't exist. Skip silently — core
|
||||||
|
// container spawn continues without projecting destinations.
|
||||||
|
if (!hasTable(getDb(), 'agent_destinations')) return;
|
||||||
|
|
||||||
const rows = getDestinations(agentGroupId);
|
const rows = getDestinations(agentGroupId);
|
||||||
const resolved: DestinationRow[] = [];
|
const resolved: DestinationRow[] = [];
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user