From 7169c25e70fb6f72c0fac6ac17274a1810b899c8 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Sat, 18 Apr 2026 21:06:30 +0300 Subject: [PATCH] refactor: relocate outbox I/O to session-manager + dead-code sweep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Outbox extraction (delivery.ts → session-manager.ts) File I/O for outbound attachments now lives in session-manager.ts alongside the symmetric inbound extractAttachmentFiles. delivery.ts no longer touches the filesystem — it hands buffers to the adapter and calls clearOutbox on success. - New `readOutboxFiles(agentGroupId, sessionId, messageId, filenames)` and `clearOutbox(agentGroupId, sessionId, messageId)` in session-manager.ts. - deliverMessage in delivery.ts loses ~35 lines of fs/path code and its `fs`/`path` imports. ## Dead-code sweep TypeScript's --noUnusedLocals surfaced several cruft imports. Fixed: - src/container-runner.ts: drop unused `markContainerIdle` import; drop unused `session` parameter from `buildContainerArgs` signature. - src/delivery.ts: drop unused `getSession`, `writeSessionMessage`, `wakeContainer` imports. - src/host-sweep.ts: drop unused `updateSession`, `outboundDbPath` imports. - container/agent-runner/src/poll-loop.ts: drop unused `config`, `processingIds` params from `processQuery`. - Test files: drop unused imports in channel-registry.test, db-v2.test, host-core.test. Skipped: `conversations` state in chat-sdk-bridge.ts (never read but tangled with public `updateConversations` method; cleaning it risks a merge conflict with the channels branch at the next sync). ## Validation - `pnpm run build` clean - `pnpm test` — 137 host tests pass - `bun test` in container/agent-runner — 17 tests pass - Service boots (`NanoClaw running`, `OneCLI approval handler started`) and shuts down cleanly on SIGTERM Co-Authored-By: Claude Opus 4.7 (1M context) --- container/agent-runner/src/poll-loop.ts | 4 +-- src/channels/channel-registry.test.ts | 2 +- src/container-runner.ts | 12 +------ src/db/db-v2.test.ts | 1 - src/delivery.ts | 42 ++++++---------------- src/host-core.test.ts | 1 - src/host-sweep.ts | 4 +-- src/session-manager.ts | 48 +++++++++++++++++++++++++ 8 files changed, 65 insertions(+), 49 deletions(-) diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index e5ad1a5..cc26286 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -192,7 +192,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise { const skippedSet = new Set(skipped); const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id)); try { - const result = await processQuery(query, routing, config, processingIds); + const result = await processQuery(query, routing); if (result.continuation && result.continuation !== continuation) { continuation = result.continuation; setStoredSessionId(continuation); @@ -264,7 +264,7 @@ interface QueryResult { continuation?: string; } -async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig, processingIds: string[]): Promise { +async function processQuery(query: AgentQuery, routing: RoutingContext): Promise { let queryContinuation: string | undefined; let done = false; let lastEventTime = Date.now(); diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index fb9dfbf..0abbf9d 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -206,7 +206,7 @@ describe('channel + router integration', () => { factory: () => mockAdapter, }); - await initChannelAdapters((adapter) => ({ + await initChannelAdapters(() => ({ conversations: [], onInbound: () => {}, onMetadata: () => {}, diff --git a/src/container-runner.ts b/src/container-runner.ts index a786816..c3fb24f 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -27,7 +27,6 @@ import { type VolumeMount, } from './providers/provider-container-registry.js'; import { - markContainerIdle, markContainerRunning, markContainerStopped, sessionDir, @@ -107,15 +106,7 @@ async function spawnContainer(session: Session): Promise { // OneCLI agent identifier is always the agent group id — stable across // sessions and reversible via getAgentGroup() for approval routing. const agentIdentifier = agentGroup.id; - const args = await buildContainerArgs( - mounts, - containerName, - session, - agentGroup, - provider, - contribution, - agentIdentifier, - ); + const args = await buildContainerArgs(mounts, containerName, agentGroup, provider, contribution, agentIdentifier); log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName }); @@ -257,7 +248,6 @@ function buildMounts( async function buildContainerArgs( mounts: VolumeMount[], containerName: string, - session: Session, agentGroup: AgentGroup, provider: string, providerContribution: ProviderContainerContribution, diff --git a/src/db/db-v2.test.ts b/src/db/db-v2.test.ts index 87d8161..f8689eb 100644 --- a/src/db/db-v2.test.ts +++ b/src/db/db-v2.test.ts @@ -13,7 +13,6 @@ import { createMessagingGroup, getMessagingGroup, getMessagingGroupByPlatform, - getAllMessagingGroups, updateMessagingGroup, deleteMessagingGroup, createMessagingGroupAgent, diff --git a/src/delivery.ts b/src/delivery.ts index b924d2f..7b1ee7d 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -8,10 +8,8 @@ * - Never writes to outbound.db — preserves single-writer-per-file invariant */ import type Database from 'better-sqlite3'; -import fs from 'fs'; -import path from 'path'; -import { getRunningSessions, getActiveSessions, createPendingQuestion, getSession } from './db/sessions.js'; +import { getRunningSessions, getActiveSessions, createPendingQuestion } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { getDb, hasTable } from './db/connection.js'; import { getMessagingGroupByPlatform } from './db/messaging-groups.js'; @@ -24,8 +22,8 @@ import { } from './db/session-db.js'; import { log } from './log.js'; import { normalizeOptions } from './channels/ask-question.js'; -import { openInboundDb, openOutboundDb, sessionDir, writeSessionMessage } from './session-manager.js'; -import { resetContainerIdleTimer, wakeContainer } from './container-runner.js'; +import { clearOutbox, openInboundDb, openOutboundDb, readOutboxFiles } from './session-manager.js'; +import { resetContainerIdleTimer } from './container-runner.js'; import { pauseTypingRefreshAfterDelivery, setTypingAdapter } from './modules/typing/index.js'; import type { OutboundFile } from './channels/adapter.js'; import type { Session } from './types.js'; @@ -346,21 +344,13 @@ async function deliverMessage( return; } - // Read file attachments from outbox if the content declares files - let files: OutboundFile[] | undefined; - const outboxDir = path.join(sessionDir(session.agent_group_id, session.id), 'outbox', msg.id); - if (Array.isArray(content.files) && content.files.length > 0 && fs.existsSync(outboxDir)) { - files = []; - for (const filename of content.files as string[]) { - const filePath = path.join(outboxDir, filename); - if (fs.existsSync(filePath)) { - files.push({ filename, data: fs.readFileSync(filePath) }); - } else { - log.warn('Outbox file not found', { messageId: msg.id, filename }); - } - } - if (files.length === 0) files = undefined; - } + // Read file attachments from outbox if the content declares files. + // File I/O lives in session-manager.ts (symmetric with inbound + // extractAttachmentFiles) — delivery just hands buffers to the adapter. + const files = + Array.isArray(content.files) && content.files.length > 0 + ? readOutboxFiles(session.agent_group_id, session.id, msg.id, content.files as string[]) + : undefined; const platformMsgId = await deliveryAdapter.deliver( msg.channel_type, @@ -378,17 +368,7 @@ async function deliverMessage( fileCount: files?.length, }); - // Clean up outbox best-effort — the message is already on the user's - // screen, so a cleanup failure must NOT propagate. If it did, the - // caller would treat the whole delivery as failed, retry on the next - // poll, and the user would see the message twice. - if (fs.existsSync(outboxDir)) { - try { - fs.rmSync(outboxDir, { recursive: true, force: true }); - } catch (err) { - log.warn('Outbox cleanup failed (message already delivered)', { messageId: msg.id, err }); - } - } + clearOutbox(session.agent_group_id, session.id, msg.id); return platformMsgId; } diff --git a/src/host-core.test.ts b/src/host-core.test.ts index f7a0539..a8b4684 100644 --- a/src/host-core.test.ts +++ b/src/host-core.test.ts @@ -23,7 +23,6 @@ import { sessionDir, inboundDbPath, outboundDbPath, - sessionsBaseDir, } from './session-manager.js'; import { getSession, findSession } from './db/sessions.js'; import type { InboundEvent } from './router.js'; diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 9bbd4bc..7a7688f 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -10,7 +10,7 @@ import type Database from 'better-sqlite3'; import fs from 'fs'; -import { getActiveSessions, updateSession } from './db/sessions.js'; +import { getActiveSessions } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { countDueMessages, @@ -21,7 +21,7 @@ import { retryWithBackoff, } from './db/session-db.js'; import { log } from './log.js'; -import { openInboundDb, openOutboundDb, inboundDbPath, outboundDbPath, heartbeatPath } from './session-manager.js'; +import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js'; import { wakeContainer, isContainerRunning } from './container-runner.js'; import type { Session } from './types.js'; diff --git a/src/session-manager.ts b/src/session-manager.ts index 2183056..7aaef24 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -14,6 +14,7 @@ import type Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; +import type { OutboundFile } from './channels/adapter.js'; import { DATA_DIR } from './config.js'; import { getMessagingGroup } from './db/messaging-groups.js'; import { createSession, findSession, findSessionByAgentGroup, getSession, updateSession } from './db/sessions.js'; @@ -289,6 +290,53 @@ export function writeSystemResponse( }); } +/** + * Load outbox attachments for a delivered message. + * + * Symmetric with `extractAttachmentFiles` on the inbound side: the container + * writes files into the session's `outbox//` directory alongside + * its `messages_out` row, and the host reads them back at delivery time. + * + * Returns undefined when the outbox dir is missing or no declared file was + * actually on disk — delivery continues without attachments rather than + * failing the whole message. + */ +export function readOutboxFiles( + agentGroupId: string, + sessionId: string, + messageId: string, + filenames: string[], +): OutboundFile[] | undefined { + const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId); + if (!fs.existsSync(outboxDir)) return undefined; + const files: OutboundFile[] = []; + for (const filename of filenames) { + const filePath = path.join(outboxDir, filename); + if (fs.existsSync(filePath)) { + files.push({ filename, data: fs.readFileSync(filePath) }); + } else { + log.warn('Outbox file not found', { messageId, filename }); + } + } + return files.length > 0 ? files : undefined; +} + +/** + * Remove a message's outbox directory after successful delivery. Best-effort: + * failures log and swallow. A cleanup failure must NOT propagate to the + * delivery caller — the message is already on the user's screen, and a + * thrown error would trigger the delivery retry path and deliver twice. + */ +export function clearOutbox(agentGroupId: string, sessionId: string, messageId: string): void { + const outboxDir = path.join(sessionDir(agentGroupId, sessionId), 'outbox', messageId); + if (!fs.existsSync(outboxDir)) return; + try { + fs.rmSync(outboxDir, { recursive: true, force: true }); + } catch (err) { + log.warn('Outbox cleanup failed (message already delivered)', { messageId, err }); + } +} + /** Mark a container as running for a session. */ export function markContainerRunning(sessionId: string): void { updateSession(sessionId, { container_status: 'running', last_active: new Date().toISOString() });