Merge branch 'main' into fix/register-channel-wiring
This commit is contained in:
100
container/agent-runner/src/db/session-state.test.ts
Normal file
100
container/agent-runner/src/db/session-state.test.ts
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
import { beforeEach, describe, expect, test } from 'bun:test';
|
||||||
|
|
||||||
|
import { getOutboundDb, initTestSessionDb } from './connection.js';
|
||||||
|
import {
|
||||||
|
clearContinuation,
|
||||||
|
getContinuation,
|
||||||
|
migrateLegacyContinuation,
|
||||||
|
setContinuation,
|
||||||
|
} from './session-state.js';
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
initTestSessionDb();
|
||||||
|
});
|
||||||
|
|
||||||
|
function seedLegacy(value: string): void {
|
||||||
|
getOutboundDb()
|
||||||
|
.prepare('INSERT INTO session_state (key, value, updated_at) VALUES (?, ?, ?)')
|
||||||
|
.run('sdk_session_id', value, new Date().toISOString());
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('session-state — per-provider continuations', () => {
|
||||||
|
test('set/get round-trip, case-insensitive provider key', () => {
|
||||||
|
setContinuation('claude', 'claude-conv-1');
|
||||||
|
expect(getContinuation('claude')).toBe('claude-conv-1');
|
||||||
|
expect(getContinuation('Claude')).toBe('claude-conv-1');
|
||||||
|
expect(getContinuation('CLAUDE')).toBe('claude-conv-1');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('providers are isolated — switching reads the right slot', () => {
|
||||||
|
setContinuation('claude', 'claude-conv-1');
|
||||||
|
setContinuation('codex', 'codex-thread-xyz');
|
||||||
|
|
||||||
|
expect(getContinuation('claude')).toBe('claude-conv-1');
|
||||||
|
expect(getContinuation('codex')).toBe('codex-thread-xyz');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('clearContinuation only affects the specified provider', () => {
|
||||||
|
setContinuation('claude', 'keep-me');
|
||||||
|
setContinuation('codex', 'drop-me');
|
||||||
|
|
||||||
|
clearContinuation('codex');
|
||||||
|
|
||||||
|
expect(getContinuation('claude')).toBe('keep-me');
|
||||||
|
expect(getContinuation('codex')).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('unknown provider returns undefined', () => {
|
||||||
|
expect(getContinuation('never-used')).toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('session-state — legacy migration', () => {
|
||||||
|
test('adopts legacy value into current provider when current is empty', () => {
|
||||||
|
seedLegacy('old-session-id');
|
||||||
|
|
||||||
|
const adopted = migrateLegacyContinuation('claude');
|
||||||
|
|
||||||
|
expect(adopted).toBe('old-session-id');
|
||||||
|
expect(getContinuation('claude')).toBe('old-session-id');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('always deletes legacy row regardless of migration outcome', () => {
|
||||||
|
seedLegacy('old-session-id');
|
||||||
|
setContinuation('claude', 'existing');
|
||||||
|
|
||||||
|
migrateLegacyContinuation('claude');
|
||||||
|
|
||||||
|
// After migration the legacy key must be gone, whether or not it was adopted.
|
||||||
|
// A subsequent migration for a different provider must not see it.
|
||||||
|
const resultAfterSecondCall = migrateLegacyContinuation('codex');
|
||||||
|
expect(resultAfterSecondCall).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('prefers existing current-provider slot over legacy', () => {
|
||||||
|
seedLegacy('legacy-value');
|
||||||
|
setContinuation('claude', 'claude-value');
|
||||||
|
|
||||||
|
const result = migrateLegacyContinuation('claude');
|
||||||
|
|
||||||
|
expect(result).toBe('claude-value');
|
||||||
|
expect(getContinuation('claude')).toBe('claude-value');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('no legacy row — returns current provider value (possibly undefined)', () => {
|
||||||
|
expect(migrateLegacyContinuation('claude')).toBeUndefined();
|
||||||
|
|
||||||
|
setContinuation('codex', 'codex-value');
|
||||||
|
expect(migrateLegacyContinuation('codex')).toBe('codex-value');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('migration is idempotent on a second call (legacy already gone)', () => {
|
||||||
|
seedLegacy('once');
|
||||||
|
|
||||||
|
const first = migrateLegacyContinuation('claude');
|
||||||
|
expect(first).toBe('once');
|
||||||
|
|
||||||
|
const second = migrateLegacyContinuation('claude');
|
||||||
|
expect(second).toBe('once');
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -2,12 +2,20 @@
|
|||||||
* Persistent key/value state for the container. Lives in outbound.db
|
* Persistent key/value state for the container. Lives in outbound.db
|
||||||
* (container-owned, already scoped per channel/thread).
|
* (container-owned, already scoped per channel/thread).
|
||||||
*
|
*
|
||||||
* Primary use: remember the SDK session ID so the agent's conversation
|
* Primary use: remember each provider's opaque continuation id so the
|
||||||
* resumes across container restarts. Cleared by /clear.
|
* agent's conversation resumes across container restarts. Keyed per
|
||||||
|
* provider because continuations are provider-private — a Claude
|
||||||
|
* conversation id means nothing to Codex and vice versa. Switching
|
||||||
|
* providers is therefore lossless: each provider's last thread stays
|
||||||
|
* on file and resumes cleanly if the user flips back.
|
||||||
*/
|
*/
|
||||||
import { getOutboundDb } from './connection.js';
|
import { getOutboundDb } from './connection.js';
|
||||||
|
|
||||||
const SDK_SESSION_KEY = 'sdk_session_id';
|
const LEGACY_KEY = 'sdk_session_id';
|
||||||
|
|
||||||
|
function continuationKey(providerName: string): string {
|
||||||
|
return `continuation:${providerName.toLowerCase()}`;
|
||||||
|
}
|
||||||
|
|
||||||
function getValue(key: string): string | undefined {
|
function getValue(key: string): string | undefined {
|
||||||
const row = getOutboundDb()
|
const row = getOutboundDb()
|
||||||
@@ -18,9 +26,7 @@ function getValue(key: string): string | undefined {
|
|||||||
|
|
||||||
function setValue(key: string, value: string): void {
|
function setValue(key: string, value: string): void {
|
||||||
getOutboundDb()
|
getOutboundDb()
|
||||||
.prepare(
|
.prepare('INSERT OR REPLACE INTO session_state (key, value, updated_at) VALUES (?, ?, ?)')
|
||||||
'INSERT OR REPLACE INTO session_state (key, value, updated_at) VALUES (?, ?, ?)',
|
|
||||||
)
|
|
||||||
.run(key, value, new Date().toISOString());
|
.run(key, value, new Date().toISOString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -28,14 +34,46 @@ function deleteValue(key: string): void {
|
|||||||
getOutboundDb().prepare('DELETE FROM session_state WHERE key = ?').run(key);
|
getOutboundDb().prepare('DELETE FROM session_state WHERE key = ?').run(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getStoredSessionId(): string | undefined {
|
/**
|
||||||
return getValue(SDK_SESSION_KEY);
|
* One-time migration of the pre-per-provider continuation row.
|
||||||
|
*
|
||||||
|
* Before this was keyed per provider, continuations lived under the
|
||||||
|
* single key `sdk_session_id`. On container start, if that legacy row
|
||||||
|
* exists and the current provider has no continuation of its own, adopt
|
||||||
|
* the legacy value into the current provider's slot (best-guess — the
|
||||||
|
* legacy row was written by whatever provider ran last). The legacy row
|
||||||
|
* is always deleted so future provider flips never re-read a stale id
|
||||||
|
* through the wrong lens.
|
||||||
|
*
|
||||||
|
* Returns the continuation the caller should use at startup (either the
|
||||||
|
* current provider's existing value, the adopted legacy value, or
|
||||||
|
* undefined).
|
||||||
|
*/
|
||||||
|
export function migrateLegacyContinuation(providerName: string): string | undefined {
|
||||||
|
const legacy = getValue(LEGACY_KEY);
|
||||||
|
const currentKey = continuationKey(providerName);
|
||||||
|
const current = getValue(currentKey);
|
||||||
|
|
||||||
|
if (legacy === undefined) return current;
|
||||||
|
|
||||||
|
// Always drop the legacy row so no future provider reads it.
|
||||||
|
deleteValue(LEGACY_KEY);
|
||||||
|
|
||||||
|
// Prefer the current provider's own slot if one already exists.
|
||||||
|
if (current !== undefined) return current;
|
||||||
|
|
||||||
|
setValue(currentKey, legacy);
|
||||||
|
return legacy;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function setStoredSessionId(sessionId: string): void {
|
export function getContinuation(providerName: string): string | undefined {
|
||||||
setValue(SDK_SESSION_KEY, sessionId);
|
return getValue(continuationKey(providerName));
|
||||||
}
|
}
|
||||||
|
|
||||||
export function clearStoredSessionId(): void {
|
export function setContinuation(providerName: string, id: string): void {
|
||||||
deleteValue(SDK_SESSION_KEY);
|
setValue(continuationKey(providerName), id);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function clearContinuation(providerName: string): void {
|
||||||
|
deleteValue(continuationKey(providerName));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -95,6 +95,7 @@ async function main(): Promise<void> {
|
|||||||
|
|
||||||
await runPollLoop({
|
await runPollLoop({
|
||||||
provider,
|
provider,
|
||||||
|
providerName,
|
||||||
cwd: CWD,
|
cwd: CWD,
|
||||||
systemContext: { instructions },
|
systemContext: { instructions },
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -98,6 +98,7 @@ async function runPollLoopWithTimeout(provider: MockProvider, signal: AbortSigna
|
|||||||
return Promise.race([
|
return Promise.race([
|
||||||
runPollLoop({
|
runPollLoop({
|
||||||
provider,
|
provider,
|
||||||
|
providerName: 'mock',
|
||||||
cwd: '/tmp',
|
cwd: '/tmp',
|
||||||
}),
|
}),
|
||||||
new Promise<void>((_, reject) => {
|
new Promise<void>((_, reject) => {
|
||||||
|
|||||||
@@ -2,7 +2,11 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destina
|
|||||||
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
|
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
|
||||||
import { writeMessageOut } from './db/messages-out.js';
|
import { writeMessageOut } from './db/messages-out.js';
|
||||||
import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
||||||
import { getStoredSessionId, setStoredSessionId, clearStoredSessionId } from './db/session-state.js';
|
import {
|
||||||
|
clearContinuation,
|
||||||
|
migrateLegacyContinuation,
|
||||||
|
setContinuation,
|
||||||
|
} from './db/session-state.js';
|
||||||
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, stripInternalTags, type RoutingContext } from './formatter.js';
|
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, stripInternalTags, type RoutingContext } from './formatter.js';
|
||||||
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
|
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
|
||||||
|
|
||||||
@@ -19,6 +23,12 @@ function generateId(): string {
|
|||||||
|
|
||||||
export interface PollLoopConfig {
|
export interface PollLoopConfig {
|
||||||
provider: AgentProvider;
|
provider: AgentProvider;
|
||||||
|
/**
|
||||||
|
* Name of the provider (e.g. "claude", "codex", "opencode"). Used to key
|
||||||
|
* the stored continuation per-provider so flipping providers doesn't
|
||||||
|
* resurrect a stale id from a different backend.
|
||||||
|
*/
|
||||||
|
providerName: string;
|
||||||
cwd: string;
|
cwd: string;
|
||||||
systemContext?: {
|
systemContext?: {
|
||||||
instructions?: string;
|
instructions?: string;
|
||||||
@@ -39,8 +49,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|||||||
// Resume the agent's prior session from a previous container run if one
|
// Resume the agent's prior session from a previous container run if one
|
||||||
// was persisted. The continuation is opaque to the poll-loop — the
|
// was persisted. The continuation is opaque to the poll-loop — the
|
||||||
// provider decides how to use it (Claude resumes a .jsonl transcript,
|
// provider decides how to use it (Claude resumes a .jsonl transcript,
|
||||||
// other providers may reload a thread ID, etc.).
|
// other providers may reload a thread ID, etc.). Keyed per-provider so
|
||||||
let continuation: string | undefined = getStoredSessionId();
|
// a Codex thread id never gets handed to Claude or vice versa.
|
||||||
|
let continuation: string | undefined = migrateLegacyContinuation(config.providerName);
|
||||||
|
|
||||||
if (continuation) {
|
if (continuation) {
|
||||||
log(`Resuming agent session ${continuation}`);
|
log(`Resuming agent session ${continuation}`);
|
||||||
@@ -94,7 +105,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|||||||
if ((msg.kind === 'chat' || msg.kind === 'chat-sdk') && isClearCommand(msg)) {
|
if ((msg.kind === 'chat' || msg.kind === 'chat-sdk') && isClearCommand(msg)) {
|
||||||
log('Clearing session (resetting continuation)');
|
log('Clearing session (resetting continuation)');
|
||||||
continuation = undefined;
|
continuation = undefined;
|
||||||
clearStoredSessionId();
|
clearContinuation(config.providerName);
|
||||||
writeMessageOut({
|
writeMessageOut({
|
||||||
id: generateId(),
|
id: generateId(),
|
||||||
kind: 'chat',
|
kind: 'chat',
|
||||||
@@ -160,10 +171,10 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|||||||
const skippedSet = new Set(skipped);
|
const skippedSet = new Set(skipped);
|
||||||
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
||||||
try {
|
try {
|
||||||
const result = await processQuery(query, routing, processingIds);
|
const result = await processQuery(query, routing, processingIds, config.providerName);
|
||||||
if (result.continuation && result.continuation !== continuation) {
|
if (result.continuation && result.continuation !== continuation) {
|
||||||
continuation = result.continuation;
|
continuation = result.continuation;
|
||||||
setStoredSessionId(continuation);
|
setContinuation(config.providerName, continuation);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const errMsg = err instanceof Error ? err.message : String(err);
|
const errMsg = err instanceof Error ? err.message : String(err);
|
||||||
@@ -175,7 +186,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|||||||
if (continuation && config.provider.isSessionInvalid(err)) {
|
if (continuation && config.provider.isSessionInvalid(err)) {
|
||||||
log(`Stale session detected (${continuation}) — clearing for next retry`);
|
log(`Stale session detected (${continuation}) — clearing for next retry`);
|
||||||
continuation = undefined;
|
continuation = undefined;
|
||||||
clearStoredSessionId();
|
clearContinuation(config.providerName);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write error response so the user knows something went wrong
|
// Write error response so the user knows something went wrong
|
||||||
@@ -238,6 +249,7 @@ async function processQuery(
|
|||||||
query: AgentQuery,
|
query: AgentQuery,
|
||||||
routing: RoutingContext,
|
routing: RoutingContext,
|
||||||
initialBatchIds: string[],
|
initialBatchIds: string[],
|
||||||
|
providerName: string,
|
||||||
): Promise<QueryResult> {
|
): Promise<QueryResult> {
|
||||||
let queryContinuation: string | undefined;
|
let queryContinuation: string | undefined;
|
||||||
let done = false;
|
let done = false;
|
||||||
@@ -288,7 +300,7 @@ async function processQuery(
|
|||||||
// container died between `init` and `result`, the SDK session was
|
// container died between `init` and `result`, the SDK session was
|
||||||
// effectively orphaned and the next message started a blank
|
// effectively orphaned and the next message started a blank
|
||||||
// Claude session with no prior context.
|
// Claude session with no prior context.
|
||||||
setStoredSessionId(event.continuation);
|
setContinuation(providerName, event.continuation);
|
||||||
} else if (event.type === 'result') {
|
} else if (event.type === 'result') {
|
||||||
// A result — with or without text — means the turn is done. Mark
|
// A result — with or without text — means the turn is done. Mark
|
||||||
// the initial batch completed now so the host sweep doesn't see
|
// the initial batch completed now so the host sweep doesn't see
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "nanoclaw",
|
"name": "nanoclaw",
|
||||||
"version": "2.0.11",
|
"version": "2.0.12",
|
||||||
"description": "Personal Claude assistant. Lightweight, secure, customizable.",
|
"description": "Personal Claude assistant. Lightweight, secure, customizable.",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"packageManager": "pnpm@10.33.0",
|
"packageManager": "pnpm@10.33.0",
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="90" height="20" role="img" aria-label="130k tokens, 65% of context window">
|
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="90" height="20" role="img" aria-label="132k tokens, 66% of context window">
|
||||||
<title>130k tokens, 65% of context window</title>
|
<title>132k tokens, 66% of context window</title>
|
||||||
<linearGradient id="s" x2="0" y2="100%">
|
<linearGradient id="s" x2="0" y2="100%">
|
||||||
<stop offset="0" stop-color="#bbb" stop-opacity=".1"/>
|
<stop offset="0" stop-color="#bbb" stop-opacity=".1"/>
|
||||||
<stop offset="1" stop-opacity=".1"/>
|
<stop offset="1" stop-opacity=".1"/>
|
||||||
@@ -15,8 +15,8 @@
|
|||||||
<g fill="#fff" text-anchor="middle" font-family="Verdana,Geneva,DejaVu Sans,sans-serif" font-size="11">
|
<g fill="#fff" text-anchor="middle" font-family="Verdana,Geneva,DejaVu Sans,sans-serif" font-size="11">
|
||||||
<text aria-hidden="true" x="26" y="15" fill="#010101" fill-opacity=".3">tokens</text>
|
<text aria-hidden="true" x="26" y="15" fill="#010101" fill-opacity=".3">tokens</text>
|
||||||
<text x="26" y="14">tokens</text>
|
<text x="26" y="14">tokens</text>
|
||||||
<text aria-hidden="true" x="71" y="15" fill="#010101" fill-opacity=".3">130k</text>
|
<text aria-hidden="true" x="71" y="15" fill="#010101" fill-opacity=".3">132k</text>
|
||||||
<text x="71" y="14">130k</text>
|
<text x="71" y="14">132k</text>
|
||||||
</g>
|
</g>
|
||||||
</g>
|
</g>
|
||||||
</a>
|
</a>
|
||||||
|
|||||||
|
Before Width: | Height: | Size: 1.1 KiB After Width: | Height: | Size: 1.1 KiB |
46
src/modules/agent-to-agent/agent-route.test.ts
Normal file
46
src/modules/agent-to-agent/agent-route.test.ts
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
import { describe, expect, it } from 'vitest';
|
||||||
|
|
||||||
|
import { isSafeAttachmentName } from './agent-route.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* `forwardAttachedFiles` has a filesystem side that's awkward to unit-test
|
||||||
|
* without mocking DATA_DIR. The guarantee worth pinning is that the
|
||||||
|
* filename validator rejects everything that could escape the inbox dir —
|
||||||
|
* `forwardAttachedFiles` runs this guard before any I/O, so traversal is
|
||||||
|
* impossible as long as this matrix holds.
|
||||||
|
*/
|
||||||
|
describe('isSafeAttachmentName', () => {
|
||||||
|
it('accepts plain filenames', () => {
|
||||||
|
expect(isSafeAttachmentName('baby-duck.png')).toBe(true);
|
||||||
|
expect(isSafeAttachmentName('file with spaces.pdf')).toBe(true);
|
||||||
|
expect(isSafeAttachmentName('report.v2.docx')).toBe(true);
|
||||||
|
expect(isSafeAttachmentName('.hidden')).toBe(true); // leading dot is fine, just not `.` / `..`
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects empty / sentinel values', () => {
|
||||||
|
expect(isSafeAttachmentName('')).toBe(false);
|
||||||
|
expect(isSafeAttachmentName('.')).toBe(false);
|
||||||
|
expect(isSafeAttachmentName('..')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects path separators', () => {
|
||||||
|
expect(isSafeAttachmentName('../evil.png')).toBe(false);
|
||||||
|
expect(isSafeAttachmentName('/etc/passwd')).toBe(false);
|
||||||
|
expect(isSafeAttachmentName('nested/file.txt')).toBe(false);
|
||||||
|
expect(isSafeAttachmentName('windows\\path.exe')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects NUL bytes', () => {
|
||||||
|
expect(isSafeAttachmentName('clean\0.png')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects anything path.basename would strip', () => {
|
||||||
|
expect(isSafeAttachmentName('a/b')).toBe(false);
|
||||||
|
expect(isSafeAttachmentName('./thing')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects non-string input', () => {
|
||||||
|
expect(isSafeAttachmentName(null as unknown as string)).toBe(false);
|
||||||
|
expect(isSafeAttachmentName(undefined as unknown as string)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -3,9 +3,13 @@
|
|||||||
*
|
*
|
||||||
* Outbound messages with `channel_type === 'agent'` target another agent
|
* Outbound messages with `channel_type === 'agent'` target another agent
|
||||||
* group rather than a channel. Permission is enforced via `agent_destinations` —
|
* group rather than a channel. Permission is enforced via `agent_destinations` —
|
||||||
* the source agent must have a row for the target. Content is copied verbatim;
|
* the source agent must have a row for the target. Content is copied into the
|
||||||
* the target's formatter looks up the source agent in its own local map to
|
* target's inbound DB; if the source message had `files` (from `send_file`),
|
||||||
* display a name.
|
* the actual bytes are copied from the source's outbox into the target's
|
||||||
|
* `inbox/<a2a-msg-id>/` directory and surfaced to the target agent as
|
||||||
|
* `attachments` (existing formatter convention — see formatter.ts:230).
|
||||||
|
* The target agent can then forward the file onward via its own `send_file`
|
||||||
|
* call using the absolute `/workspace/inbox/<a2a-msg-id>/<filename>` path.
|
||||||
*
|
*
|
||||||
* Self-messages are always allowed (used for system notes injected back into
|
* Self-messages are always allowed (used for system notes injected back into
|
||||||
* an agent's own session, e.g. post-approval follow-up prompts).
|
* an agent's own session, e.g. post-approval follow-up prompts).
|
||||||
@@ -14,14 +18,102 @@
|
|||||||
* `channel_type === 'agent'` check. When the module is absent the check in
|
* `channel_type === 'agent'` check. When the module is absent the check in
|
||||||
* core throws with a "module not installed" message so retry → mark failed.
|
* core throws with a "module not installed" message so retry → mark failed.
|
||||||
*/
|
*/
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
|
||||||
import { getAgentGroup } from '../../db/agent-groups.js';
|
import { getAgentGroup } from '../../db/agent-groups.js';
|
||||||
import { getSession } from '../../db/sessions.js';
|
import { getSession } from '../../db/sessions.js';
|
||||||
import { wakeContainer } from '../../container-runner.js';
|
import { wakeContainer } from '../../container-runner.js';
|
||||||
import { log } from '../../log.js';
|
import { log } from '../../log.js';
|
||||||
import { resolveSession, writeSessionMessage } from '../../session-manager.js';
|
import { resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js';
|
||||||
import type { Session } from '../../types.js';
|
import type { Session } from '../../types.js';
|
||||||
import { hasDestination } from './db/agent-destinations.js';
|
import { hasDestination } from './db/agent-destinations.js';
|
||||||
|
|
||||||
|
export interface ForwardedAttachment {
|
||||||
|
name: string;
|
||||||
|
filename: string;
|
||||||
|
type: 'file';
|
||||||
|
localPath: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is `name` safe to use as the last segment of a path inside the target
|
||||||
|
* agent's inbox directory? Filenames arrive in messages_out content from
|
||||||
|
* the source agent — under a multi-agent setup with heterogenous providers
|
||||||
|
* (or a compromised / hallucinating sub-agent) they can't be trusted.
|
||||||
|
*
|
||||||
|
* Rejects:
|
||||||
|
* - empty string
|
||||||
|
* - `.` / `..` (traversal sentinels that path.basename returns as-is)
|
||||||
|
* - anything containing a path separator (`/` or `\`) or NUL
|
||||||
|
* - any value where `path.basename(name) !== name`, catching OS-specific
|
||||||
|
* separators and covering drives/prefixes on Windows runtimes
|
||||||
|
*/
|
||||||
|
export function isSafeAttachmentName(name: string): boolean {
|
||||||
|
if (typeof name !== 'string' || name.length === 0) return false;
|
||||||
|
if (name === '.' || name === '..') return false;
|
||||||
|
if (/[\\/\0]/.test(name)) return false;
|
||||||
|
return path.basename(name) === name;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy file attachments from the source agent's outbox into the target
|
||||||
|
* agent's inbox. Returns attachments using the formatter's existing
|
||||||
|
* `{name, type, localPath}` convention — target agent reads `localPath`
|
||||||
|
* as relative to `/workspace/`, matching how channel-inbound attachments
|
||||||
|
* are surfaced today.
|
||||||
|
*
|
||||||
|
* Missing source files and unsafe (path-traversal) filenames are skipped
|
||||||
|
* with a warning rather than failing the whole route — a bad filename
|
||||||
|
* reference shouldn't kill the accompanying text.
|
||||||
|
*/
|
||||||
|
export function forwardAttachedFiles(
|
||||||
|
source: { agentGroupId: string; sessionId: string; messageId: string; filenames: string[] },
|
||||||
|
target: { agentGroupId: string; sessionId: string; messageId: string },
|
||||||
|
): ForwardedAttachment[] {
|
||||||
|
if (source.filenames.length === 0) return [];
|
||||||
|
|
||||||
|
const sourceDir = path.join(sessionDir(source.agentGroupId, source.sessionId), 'outbox', source.messageId);
|
||||||
|
if (!fs.existsSync(sourceDir)) {
|
||||||
|
log.warn('agent-route: source outbox dir missing, no files forwarded', {
|
||||||
|
sourceMsgId: source.messageId,
|
||||||
|
sourceDir,
|
||||||
|
});
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const targetInboxDir = path.join(sessionDir(target.agentGroupId, target.sessionId), 'inbox', target.messageId);
|
||||||
|
fs.mkdirSync(targetInboxDir, { recursive: true });
|
||||||
|
|
||||||
|
const attachments: ForwardedAttachment[] = [];
|
||||||
|
for (const filename of source.filenames) {
|
||||||
|
if (!isSafeAttachmentName(filename)) {
|
||||||
|
log.warn('agent-route: rejecting unsafe attachment filename (path traversal attempt?)', {
|
||||||
|
sourceMsgId: source.messageId,
|
||||||
|
filename,
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const src = path.join(sourceDir, filename);
|
||||||
|
if (!fs.existsSync(src)) {
|
||||||
|
log.warn('agent-route: referenced file missing in source outbox, skipped', {
|
||||||
|
sourceMsgId: source.messageId,
|
||||||
|
filename,
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const dst = path.join(targetInboxDir, filename);
|
||||||
|
fs.copyFileSync(src, dst);
|
||||||
|
attachments.push({
|
||||||
|
name: filename,
|
||||||
|
filename,
|
||||||
|
type: 'file',
|
||||||
|
localPath: `inbox/${target.messageId}/${filename}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return attachments;
|
||||||
|
}
|
||||||
|
|
||||||
export interface RoutableAgentMessage {
|
export interface RoutableAgentMessage {
|
||||||
id: string;
|
id: string;
|
||||||
platform_id: string | null;
|
platform_id: string | null;
|
||||||
@@ -45,20 +137,87 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess
|
|||||||
throw new Error(`target agent group ${targetAgentGroupId} not found for message ${msg.id}`);
|
throw new Error(`target agent group ${targetAgentGroupId} not found for message ${msg.id}`);
|
||||||
}
|
}
|
||||||
const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared');
|
const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared');
|
||||||
|
const a2aMsgId = `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||||
|
|
||||||
|
// If the source message references files (via `send_file`), forward the
|
||||||
|
// bytes from the source's outbox into the target's inbox so the target
|
||||||
|
// agent can actually see and re-send them. Without this, agent-to-agent
|
||||||
|
// file attachments look like they arrive but the target has no way to
|
||||||
|
// read the bytes — they live in a session dir it doesn't mount.
|
||||||
|
const forwardedContent = forwardFileAttachments(msg, a2aMsgId, session, targetAgentGroupId, targetSession.id);
|
||||||
|
|
||||||
writeSessionMessage(targetAgentGroupId, targetSession.id, {
|
writeSessionMessage(targetAgentGroupId, targetSession.id, {
|
||||||
id: `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
id: a2aMsgId,
|
||||||
kind: 'chat',
|
kind: 'chat',
|
||||||
timestamp: new Date().toISOString(),
|
timestamp: new Date().toISOString(),
|
||||||
platformId: session.agent_group_id,
|
platformId: session.agent_group_id,
|
||||||
channelType: 'agent',
|
channelType: 'agent',
|
||||||
threadId: null,
|
threadId: null,
|
||||||
content: msg.content,
|
content: forwardedContent,
|
||||||
});
|
});
|
||||||
log.info('Agent message routed', {
|
log.info('Agent message routed', {
|
||||||
from: session.agent_group_id,
|
from: session.agent_group_id,
|
||||||
to: targetAgentGroupId,
|
to: targetAgentGroupId,
|
||||||
targetSession: targetSession.id,
|
targetSession: targetSession.id,
|
||||||
|
a2aMsgId,
|
||||||
|
forwardedFileCount: countForwardedFiles(forwardedContent),
|
||||||
});
|
});
|
||||||
const fresh = getSession(targetSession.id);
|
const fresh = getSession(targetSession.id);
|
||||||
if (fresh) await wakeContainer(fresh);
|
if (fresh) await wakeContainer(fresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse source content, copy any referenced `files` from source outbox to
|
||||||
|
* target inbox, and return a JSON string with an `attachments` array added
|
||||||
|
* (formatter.ts:223 already knows how to render this shape).
|
||||||
|
*
|
||||||
|
* If the source content isn't JSON or has no files, returns the original
|
||||||
|
* content string unchanged — this is safe to call on every route.
|
||||||
|
*/
|
||||||
|
function forwardFileAttachments(
|
||||||
|
msg: RoutableAgentMessage,
|
||||||
|
a2aMsgId: string,
|
||||||
|
sourceSession: Session,
|
||||||
|
targetAgentGroupId: string,
|
||||||
|
targetSessionId: string,
|
||||||
|
): string {
|
||||||
|
let parsed: Record<string, unknown>;
|
||||||
|
try {
|
||||||
|
parsed = JSON.parse(msg.content);
|
||||||
|
} catch {
|
||||||
|
return msg.content;
|
||||||
|
}
|
||||||
|
const files = parsed.files as unknown;
|
||||||
|
if (!Array.isArray(files) || files.length === 0) return msg.content;
|
||||||
|
const filenames = files.filter((f): f is string => typeof f === 'string');
|
||||||
|
if (filenames.length === 0) return msg.content;
|
||||||
|
|
||||||
|
const attachments = forwardAttachedFiles(
|
||||||
|
{
|
||||||
|
agentGroupId: sourceSession.agent_group_id,
|
||||||
|
sessionId: sourceSession.id,
|
||||||
|
messageId: msg.id,
|
||||||
|
filenames,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
agentGroupId: targetAgentGroupId,
|
||||||
|
sessionId: targetSessionId,
|
||||||
|
messageId: a2aMsgId,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// Merge into any existing `attachments` (unlikely in a2a context but safe).
|
||||||
|
const existing = Array.isArray(parsed.attachments) ? (parsed.attachments as Record<string, unknown>[]) : [];
|
||||||
|
parsed.attachments = [...existing, ...attachments];
|
||||||
|
|
||||||
|
return JSON.stringify(parsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
function countForwardedFiles(contentStr: string): number {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(contentStr);
|
||||||
|
return Array.isArray(parsed.attachments) ? parsed.attachments.length : 0;
|
||||||
|
} catch {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user