refactor: relocate outbox I/O to session-manager + dead-code sweep
## Outbox extraction (delivery.ts → session-manager.ts) File I/O for outbound attachments now lives in session-manager.ts alongside the symmetric inbound extractAttachmentFiles. delivery.ts no longer touches the filesystem — it hands buffers to the adapter and calls clearOutbox on success. - New `readOutboxFiles(agentGroupId, sessionId, messageId, filenames)` and `clearOutbox(agentGroupId, sessionId, messageId)` in session-manager.ts. - deliverMessage in delivery.ts loses ~35 lines of fs/path code and its `fs`/`path` imports. ## Dead-code sweep TypeScript's --noUnusedLocals surfaced several cruft imports. Fixed: - src/container-runner.ts: drop unused `markContainerIdle` import; drop unused `session` parameter from `buildContainerArgs` signature. - src/delivery.ts: drop unused `getSession`, `writeSessionMessage`, `wakeContainer` imports. - src/host-sweep.ts: drop unused `updateSession`, `outboundDbPath` imports. - container/agent-runner/src/poll-loop.ts: drop unused `config`, `processingIds` params from `processQuery`. - Test files: drop unused imports in channel-registry.test, db-v2.test, host-core.test. Skipped: `conversations` state in chat-sdk-bridge.ts (never read but tangled with public `updateConversations` method; cleaning it risks a merge conflict with the channels branch at the next sync). ## Validation - `pnpm run build` clean - `pnpm test` — 137 host tests pass - `bun test` in container/agent-runner — 17 tests pass - Service boots (`NanoClaw running`, `OneCLI approval handler started`) and shuts down cleanly on SIGTERM Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -192,7 +192,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
||||
const skippedSet = new Set(skipped);
|
||||
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
||||
try {
|
||||
const result = await processQuery(query, routing, config, processingIds);
|
||||
const result = await processQuery(query, routing);
|
||||
if (result.continuation && result.continuation !== continuation) {
|
||||
continuation = result.continuation;
|
||||
setStoredSessionId(continuation);
|
||||
@@ -264,7 +264,7 @@ interface QueryResult {
|
||||
continuation?: string;
|
||||
}
|
||||
|
||||
async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig, processingIds: string[]): Promise<QueryResult> {
|
||||
async function processQuery(query: AgentQuery, routing: RoutingContext): Promise<QueryResult> {
|
||||
let queryContinuation: string | undefined;
|
||||
let done = false;
|
||||
let lastEventTime = Date.now();
|
||||
|
||||
@@ -206,7 +206,7 @@ describe('channel + router integration', () => {
|
||||
factory: () => mockAdapter,
|
||||
});
|
||||
|
||||
await initChannelAdapters((adapter) => ({
|
||||
await initChannelAdapters(() => ({
|
||||
conversations: [],
|
||||
onInbound: () => {},
|
||||
onMetadata: () => {},
|
||||
|
||||
@@ -27,7 +27,6 @@ import {
|
||||
type VolumeMount,
|
||||
} from './providers/provider-container-registry.js';
|
||||
import {
|
||||
markContainerIdle,
|
||||
markContainerRunning,
|
||||
markContainerStopped,
|
||||
sessionDir,
|
||||
@@ -107,15 +106,7 @@ async function spawnContainer(session: Session): Promise<void> {
|
||||
// OneCLI agent identifier is always the agent group id — stable across
|
||||
// sessions and reversible via getAgentGroup() for approval routing.
|
||||
const agentIdentifier = agentGroup.id;
|
||||
const args = await buildContainerArgs(
|
||||
mounts,
|
||||
containerName,
|
||||
session,
|
||||
agentGroup,
|
||||
provider,
|
||||
contribution,
|
||||
agentIdentifier,
|
||||
);
|
||||
const args = await buildContainerArgs(mounts, containerName, agentGroup, provider, contribution, agentIdentifier);
|
||||
|
||||
log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName });
|
||||
|
||||
@@ -257,7 +248,6 @@ function buildMounts(
|
||||
async function buildContainerArgs(
|
||||
mounts: VolumeMount[],
|
||||
containerName: string,
|
||||
session: Session,
|
||||
agentGroup: AgentGroup,
|
||||
provider: string,
|
||||
providerContribution: ProviderContainerContribution,
|
||||
|
||||
@@ -13,7 +13,6 @@ import {
|
||||
createMessagingGroup,
|
||||
getMessagingGroup,
|
||||
getMessagingGroupByPlatform,
|
||||
getAllMessagingGroups,
|
||||
updateMessagingGroup,
|
||||
deleteMessagingGroup,
|
||||
createMessagingGroupAgent,
|
||||
|
||||
@@ -8,10 +8,8 @@
|
||||
* - Never writes to outbound.db — preserves single-writer-per-file invariant
|
||||
*/
|
||||
import type Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { getRunningSessions, getActiveSessions, createPendingQuestion, getSession } from './db/sessions.js';
|
||||
import { getRunningSessions, getActiveSessions, createPendingQuestion } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import { getDb, hasTable } from './db/connection.js';
|
||||
import { getMessagingGroupByPlatform } from './db/messaging-groups.js';
|
||||
@@ -24,8 +22,8 @@ import {
|
||||
} from './db/session-db.js';
|
||||
import { log } from './log.js';
|
||||
import { normalizeOptions } from './channels/ask-question.js';
|
||||
import { openInboundDb, openOutboundDb, sessionDir, writeSessionMessage } from './session-manager.js';
|
||||
import { resetContainerIdleTimer, wakeContainer } from './container-runner.js';
|
||||
import { clearOutbox, openInboundDb, openOutboundDb, readOutboxFiles } from './session-manager.js';
|
||||
import { resetContainerIdleTimer } from './container-runner.js';
|
||||
import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js';
|
||||
import type { OutboundFile } from './channels/adapter.js';
|
||||
import type { Session } from './types.js';
|
||||
@@ -346,21 +344,13 @@ async function deliverMessage(
|
||||
return;
|
||||
}
|
||||
|
||||
// Read file attachments from outbox if the content declares files
|
||||
let files: OutboundFile[] | undefined;
|
||||
const outboxDir = path.join(sessionDir(session.agent_group_id, session.id), 'outbox', msg.id);
|
||||
if (Array.isArray(content.files) && content.files.length > 0 && fs.existsSync(outboxDir)) {
|
||||
files = [];
|
||||
for (const filename of content.files as string[]) {
|
||||
const filePath = path.join(outboxDir, filename);
|
||||
if (fs.existsSync(filePath)) {
|
||||
files.push({ filename, data: fs.readFileSync(filePath) });
|
||||
} else {
|
||||
log.warn('Outbox file not found', { messageId: msg.id, filename });
|
||||
}
|
||||
}
|
||||
if (files.length === 0) files = undefined;
|
||||
}
|
||||
// Read file attachments from outbox if the content declares files.
|
||||
// File I/O lives in session-manager.ts (symmetric with inbound
|
||||
// extractAttachmentFiles) — delivery just hands buffers to the adapter.
|
||||
const files =
|
||||
Array.isArray(content.files) && content.files.length > 0
|
||||
? readOutboxFiles(session.agent_group_id, session.id, msg.id, content.files as string[])
|
||||
: undefined;
|
||||
|
||||
const platformMsgId = await deliveryAdapter.deliver(
|
||||
msg.channel_type,
|
||||
@@ -378,17 +368,7 @@ async function deliverMessage(
|
||||
fileCount: files?.length,
|
||||
});
|
||||
|
||||
// Clean up outbox best-effort — the message is already on the user's
|
||||
// screen, so a cleanup failure must NOT propagate. If it did, the
|
||||
// caller would treat the whole delivery as failed, retry on the next
|
||||
// poll, and the user would see the message twice.
|
||||
if (fs.existsSync(outboxDir)) {
|
||||
try {
|
||||
fs.rmSync(outboxDir, { recursive: true, force: true });
|
||||
} catch (err) {
|
||||
log.warn('Outbox cleanup failed (message already delivered)', { messageId: msg.id, err });
|
||||
}
|
||||
}
|
||||
clearOutbox(session.agent_group_id, session.id, msg.id);
|
||||
|
||||
return platformMsgId;
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ import {
|
||||
sessionDir,
|
||||
inboundDbPath,
|
||||
outboundDbPath,
|
||||
sessionsBaseDir,
|
||||
} from './session-manager.js';
|
||||
import { getSession, findSession } from './db/sessions.js';
|
||||
import type { InboundEvent } from './router.js';
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
import type Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
|
||||
import { getActiveSessions, updateSession } from './db/sessions.js';
|
||||
import { getActiveSessions } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import {
|
||||
countDueMessages,
|
||||
@@ -21,7 +21,7 @@ import {
|
||||
retryWithBackoff,
|
||||
} from './db/session-db.js';
|
||||
import { log } from './log.js';
|
||||
import { openInboundDb, openOutboundDb, inboundDbPath, outboundDbPath, heartbeatPath } from './session-manager.js';
|
||||
import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js';
|
||||
import { wakeContainer, isContainerRunning } from './container-runner.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import type Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import type { OutboundFile } from './channels/adapter.js';
|
||||
import { DATA_DIR } from './config.js';
|
||||
import { getMessagingGroup } from './db/messaging-groups.js';
|
||||
import { createSession, findSession, findSessionByAgentGroup, getSession, updateSession } from './db/sessions.js';
|
||||
@@ -289,6 +290,53 @@ export function writeSystemResponse(
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Load outbox attachments for a delivered message.
|
||||
*
|
||||
* Symmetric with `extractAttachmentFiles` on the inbound side: the container
|
||||
* writes files into the session's `outbox/<messageId>/` directory alongside
|
||||
* its `messages_out` row, and the host reads them back at delivery time.
|
||||
*
|
||||
* Returns undefined when the outbox dir is missing or no declared file was
|
||||
* actually on disk — delivery continues without attachments rather than
|
||||
* failing the whole message.
|
||||
*/
|
||||
export function readOutboxFiles(
|
||||
agentGroupId: string,
|
||||
sessionId: string,
|
||||
messageId: string,
|
||||
filenames: string[],
|
||||
): OutboundFile[] | undefined {
|
||||
const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId);
|
||||
if (!fs.existsSync(outboxDir)) return undefined;
|
||||
const files: OutboundFile[] = [];
|
||||
for (const filename of filenames) {
|
||||
const filePath = path.join(outboxDir, filename);
|
||||
if (fs.existsSync(filePath)) {
|
||||
files.push({ filename, data: fs.readFileSync(filePath) });
|
||||
} else {
|
||||
log.warn('Outbox file not found', { messageId, filename });
|
||||
}
|
||||
}
|
||||
return files.length > 0 ? files : undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a message's outbox directory after successful delivery. Best-effort:
|
||||
* failures log and swallow. A cleanup failure must NOT propagate to the
|
||||
* delivery caller — the message is already on the user's screen, and a
|
||||
* thrown error would trigger the delivery retry path and deliver twice.
|
||||
*/
|
||||
export function clearOutbox(agentGroupId: string, sessionId: string, messageId: string): void {
|
||||
const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId);
|
||||
if (!fs.existsSync(outboxDir)) return;
|
||||
try {
|
||||
fs.rmSync(outboxDir, { recursive: true, force: true });
|
||||
} catch (err) {
|
||||
log.warn('Outbox cleanup failed (message already delivered)', { messageId, err });
|
||||
}
|
||||
}
|
||||
|
||||
/** Mark a container as running for a session. */
|
||||
export function markContainerRunning(sessionId: string): void {
|
||||
updateSession(sessionId, { container_status: 'running', last_active: new Date().toISOString() });
|
||||
|
||||
Reference in New Issue
Block a user