From b63dd186dfb1bf30f48fa8a2420171065d946d01 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Mon, 13 Apr 2026 10:25:29 +0300 Subject: [PATCH] refactor(agent-runner): decouple provider interface from Claude specifics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reshape AgentProvider so provider-specific assumptions stop leaking into the generic layer. No change to what reaches sdkQuery() — same values, different plumbing. - QueryInput: opaque `continuation` replaces `sessionId` + `resumeAt`; `systemContext.instructions` replaces ambiguous `systemPrompt`; `mcpServers`, `env`, `additionalDirectories` move to `ProviderOptions` at construction time. - AgentProvider gains `isSessionInvalid(err)` and `supportsNativeSlashCommands` so the poll-loop stops regex-matching Claude error strings and gates passthrough slash commands per provider. - ClaudeProvider owns `CLAUDE_CODE_AUTO_COMPACT_WINDOW` and the stale-session regex internally. - ProviderEvent.activity kept and documented as the liveness signal (fires on every SDK message so the idle timer stays honest during long tool runs); init carries `continuation` instead of `sessionId`. - poll-loop drops mcpServers/env/systemPrompt from its config; admin user id now passed explicitly. Co-Authored-By: Claude Opus 4.6 (1M context) --- container/agent-runner/src/index.ts | 28 +++--- .../agent-runner/src/integration.test.ts | 8 +- container/agent-runner/src/poll-loop.test.ts | 12 +-- container/agent-runner/src/poll-loop.ts | 89 +++++++++---------- .../agent-runner/src/providers/claude.ts | 54 ++++++++--- .../agent-runner/src/providers/factory.ts | 8 +- container/agent-runner/src/providers/mock.ts | 12 ++- container/agent-runner/src/providers/types.ts | 59 ++++++++---- 8 files changed, 156 insertions(+), 114 deletions(-) diff --git a/container/agent-runner/src/index.ts b/container/agent-runner/src/index.ts index 6692d33..c0e431c 100644 --- a/container/agent-runner/src/index.ts +++ b/container/agent-runner/src/index.ts @@ -40,19 +40,18 @@ const GLOBAL_CLAUDE_MD = '/workspace/global/CLAUDE.md'; async function main(): Promise { const providerName = (process.env.AGENT_PROVIDER || 'claude') as ProviderName; const assistantName = process.env.NANOCLAW_ASSISTANT_NAME; + const adminUserId = process.env.NANOCLAW_ADMIN_USER_ID; log(`Starting v2 agent-runner (provider: ${providerName})`); - const provider = createProvider(providerName, { assistantName }); - // Load global CLAUDE.md as additional system context, then append destinations addendum - let systemPrompt: string | undefined; + let instructions: string | undefined; if (fs.existsSync(GLOBAL_CLAUDE_MD)) { - systemPrompt = fs.readFileSync(GLOBAL_CLAUDE_MD, 'utf-8'); + instructions = fs.readFileSync(GLOBAL_CLAUDE_MD, 'utf-8'); log('Loaded global CLAUDE.md'); } const addendum = buildSystemPromptAddendum(); - systemPrompt = systemPrompt ? `${systemPrompt}\n\n${addendum}` : addendum; + instructions = instructions ? `${instructions}\n\n${addendum}` : addendum; // Discover additional directories mounted at /workspace/extra/* const additionalDirectories: string[] = []; @@ -73,12 +72,6 @@ async function main(): Promise { const __dirname = path.dirname(fileURLToPath(import.meta.url)); const mcpServerPath = path.join(__dirname, 'mcp-tools', 'index.js'); - // SDK env - const env: Record = { - ...process.env, - CLAUDE_CODE_AUTO_COMPACT_WINDOW: '165000', - }; - // Build MCP servers config: nanoclaw built-in + any additional from host const mcpServers: Record }> = { nanoclaw: { @@ -105,13 +98,18 @@ async function main(): Promise { } } + const provider = createProvider(providerName, { + assistantName, + mcpServers, + env: { ...process.env }, + additionalDirectories: additionalDirectories.length > 0 ? additionalDirectories : undefined, + }); + await runPollLoop({ provider, cwd: CWD, - mcpServers, - systemPrompt, - env, - additionalDirectories: additionalDirectories.length > 0 ? additionalDirectories : undefined, + systemContext: { instructions }, + adminUserId, }); } diff --git a/container/agent-runner/src/integration.test.ts b/container/agent-runner/src/integration.test.ts index d30f324..7b2dc36 100644 --- a/container/agent-runner/src/integration.test.ts +++ b/container/agent-runner/src/integration.test.ts @@ -34,7 +34,7 @@ describe('poll loop integration', () => { it('should pick up a message, process it, and write a response', async () => { insertMessage('m1', { sender: 'Alice', text: 'What is the meaning of life?' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'thread-1' }); - const provider = new MockProvider(() => '42'); + const provider = new MockProvider({}, () => '42'); const controller = new AbortController(); const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000); @@ -60,7 +60,7 @@ describe('poll loop integration', () => { insertMessage('m1', { sender: 'Alice', text: 'Hello' }); insertMessage('m2', { sender: 'Bob', text: 'World' }); - const provider = new MockProvider(() => 'Got both messages'); + const provider = new MockProvider({}, () => 'Got both messages'); const controller = new AbortController(); const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000); @@ -75,7 +75,7 @@ describe('poll loop integration', () => { }); it('should process messages arriving after loop starts', async () => { - const provider = new MockProvider(() => 'Processed'); + const provider = new MockProvider({}, () => 'Processed'); const controller = new AbortController(); const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 3000); @@ -99,8 +99,6 @@ async function runPollLoopWithTimeout(provider: MockProvider, signal: AbortSigna runPollLoop({ provider, cwd: '/tmp', - mcpServers: {}, - env: {}, }), new Promise((_, reject) => { signal.addEventListener('abort', () => reject(new Error('aborted'))); diff --git a/container/agent-runner/src/poll-loop.test.ts b/container/agent-runner/src/poll-loop.test.ts index 718be53..79e9e77 100644 --- a/container/agent-runner/src/poll-loop.test.ts +++ b/container/agent-runner/src/poll-loop.test.ts @@ -104,12 +104,10 @@ describe('routing', () => { describe('mock provider', () => { it('should produce init + result events', async () => { - const provider = new MockProvider((prompt) => `Echo: ${prompt}`); + const provider = new MockProvider({}, (prompt) => `Echo: ${prompt}`); const query = provider.query({ prompt: 'Hello', cwd: '/tmp', - mcpServers: {}, - env: {}, }); const events: Array<{ type: string }> = []; @@ -127,12 +125,10 @@ describe('mock provider', () => { }); it('should handle push() during active query', async () => { - const provider = new MockProvider((prompt) => `Re: ${prompt}`); + const provider = new MockProvider({}, (prompt) => `Re: ${prompt}`); const query = provider.query({ prompt: 'First', cwd: '/tmp', - mcpServers: {}, - env: {}, }); const events: Array<{ type: string; text?: string }> = []; @@ -164,12 +160,10 @@ describe('end-to-end with mock provider', () => { const prompt = formatMessages(messages); // Create mock provider and run query - const provider = new MockProvider(() => 'The answer is 4'); + const provider = new MockProvider({}, () => 'The answer is 4'); const query = provider.query({ prompt, cwd: '/tmp', - mcpServers: {}, - env: {}, }); // Process events — simulate what poll-loop does diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 2e401af..c5717f8 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -4,7 +4,7 @@ import { writeMessageOut } from './db/messages-out.js'; import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; import { getStoredSessionId, setStoredSessionId, clearStoredSessionId } from './db/session-state.js'; import { formatMessages, extractRouting, categorizeMessage, type RoutingContext } from './formatter.js'; -import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js'; +import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js'; const POLL_INTERVAL_MS = 1000; const ACTIVE_POLL_INTERVAL_MS = 500; @@ -21,10 +21,11 @@ function generateId(): string { export interface PollLoopConfig { provider: AgentProvider; cwd: string; - mcpServers: Record; - systemPrompt?: string; - env: Record; - additionalDirectories?: string[]; + systemContext?: { + instructions?: string; + }; + /** Admin user ID for permission checks on admin commands (e.g. /clear). */ + adminUserId?: string; } /** @@ -38,15 +39,14 @@ export interface PollLoopConfig { * 6. Loop */ export async function runPollLoop(config: PollLoopConfig): Promise { - // Resume the SDK session from a prior container run if one was persisted. - // The SDK's .jsonl transcripts live in the shared ~/.claude mount, so the - // conversation history is already on disk — we just need the session ID - // to tell the SDK which one to continue. - let sessionId: string | undefined = getStoredSessionId(); - let resumeAt: string | undefined; + // Resume the agent's prior session from a previous container run if one + // was persisted. The continuation is opaque to the poll-loop — the + // provider decides how to use it (Claude resumes a .jsonl transcript, + // other providers may reload a thread ID, etc.). + let continuation: string | undefined = getStoredSessionId(); - if (sessionId) { - log(`Resuming SDK session ${sessionId}`); + if (continuation) { + log(`Resuming agent session ${continuation}`); } // Clear leftover 'processing' acks from a previous crashed container. @@ -75,7 +75,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise { const routing = extractRouting(messages); // Handle commands: categorize chat messages - const adminUserId = config.env.NANOCLAW_ADMIN_USER_ID; + const adminUserId = config.adminUserId; const normalMessages = []; const commandIds: string[] = []; @@ -110,9 +110,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise { } // Handle admin commands directly if (cmdInfo.command === '/clear') { - log('Clearing session (resetting sessionId)'); - sessionId = undefined; - resumeAt = undefined; + log('Clearing session (resetting continuation)'); + continuation = undefined; clearStoredSessionId(); writeMessageOut({ id: generateId(), @@ -149,43 +148,37 @@ export async function runPollLoop(config: PollLoopConfig): Promise { continue; } - // Format messages: passthrough commands get raw text, others get XML - const prompt = formatMessagesWithCommands(normalMessages); + // Format messages: passthrough commands get raw text (only if the + // provider natively handles slash commands), others get XML. + const prompt = formatMessagesWithCommands(normalMessages, config.provider.supportsNativeSlashCommands); log(`Processing ${normalMessages.length} message(s), kinds: ${[...new Set(normalMessages.map((m) => m.kind))].join(',')}`); const query = config.provider.query({ prompt, - sessionId, - resumeAt, + continuation, cwd: config.cwd, - mcpServers: config.mcpServers, - systemPrompt: config.systemPrompt, - env: config.env, - additionalDirectories: config.additionalDirectories, + systemContext: config.systemContext, }); // Process the query while concurrently polling for new messages const processingIds = ids.filter((id) => !commandIds.includes(id)); try { const result = await processQuery(query, routing, config, processingIds); - if (result.sessionId && result.sessionId !== sessionId) { - sessionId = result.sessionId; - setStoredSessionId(sessionId); + if (result.continuation && result.continuation !== continuation) { + continuation = result.continuation; + setStoredSessionId(continuation); } - if (result.resumeAt) resumeAt = result.resumeAt; } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log(`Query error: ${errMsg}`); - // Stale/corrupt session recovery: if the SDK can't find the session - // we asked it to resume, clear the stored ID so the next attempt - // starts fresh. The transcript .jsonl can go missing after a crash - // mid-write, manual deletion, or disk-full. - if (sessionId && /no conversation found|ENOENT.*\.jsonl|session.*not found/i.test(errMsg)) { - log(`Stale session detected (${sessionId}) — clearing for next retry`); - sessionId = undefined; - resumeAt = undefined; + // Stale/corrupt continuation recovery: ask the provider whether + // this error means the stored continuation is unusable, and clear + // it so the next attempt starts fresh. + if (continuation && config.provider.isSessionInvalid(err)) { + log(`Stale session detected (${continuation}) — clearing for next retry`); + continuation = undefined; clearStoredSessionId(); } @@ -207,17 +200,16 @@ export async function runPollLoop(config: PollLoopConfig): Promise { /** * Format messages, handling passthrough commands differently. - * Passthrough commands (e.g., /foo) are sent raw (no XML wrapping). - * Admin commands from authorized users are formatted as system commands. - * Normal messages get standard XML formatting. + * When the provider handles slash commands natively (Claude Code), + * passthrough commands are sent raw (no XML wrapping) so the SDK can + * dispatch them. Otherwise they fall through to standard XML formatting. */ -function formatMessagesWithCommands(messages: MessageInRow[]): string { - // Check if any message is a passthrough command +function formatMessagesWithCommands(messages: MessageInRow[], nativeSlashCommands: boolean): string { const parts: string[] = []; const normalBatch: MessageInRow[] = []; for (const msg of messages) { - if (msg.kind === 'chat' || msg.kind === 'chat-sdk') { + if (nativeSlashCommands && (msg.kind === 'chat' || msg.kind === 'chat-sdk')) { const cmdInfo = categorizeMessage(msg); if (cmdInfo.category === 'passthrough' || cmdInfo.category === 'admin') { // Flush normal batch first @@ -241,12 +233,11 @@ function formatMessagesWithCommands(messages: MessageInRow[]): string { } interface QueryResult { - sessionId?: string; - resumeAt?: string; + continuation?: string; } async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig, processingIds: string[]): Promise { - let querySessionId: string | undefined; + let queryContinuation: string | undefined; let done = false; let lastEventTime = Date.now(); @@ -289,7 +280,7 @@ async function processQuery(query: AgentQuery, routing: RoutingContext, config: touchHeartbeat(); if (event.type === 'init') { - querySessionId = event.sessionId; + queryContinuation = event.continuation; } else if (event.type === 'result' && event.text) { dispatchResultText(event.text, routing); } @@ -299,13 +290,13 @@ async function processQuery(query: AgentQuery, routing: RoutingContext, config: clearInterval(pollHandle); } - return { sessionId: querySessionId }; + return { continuation: queryContinuation }; } function handleEvent(event: ProviderEvent, _routing: RoutingContext): void { switch (event.type) { case 'init': - log(`Session: ${event.sessionId}`); + log(`Session: ${event.continuation}`); break; case 'result': log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`); diff --git a/container/agent-runner/src/providers/claude.ts b/container/agent-runner/src/providers/claude.ts index adfd0e2..93976b6 100644 --- a/container/agent-runner/src/providers/claude.ts +++ b/container/agent-runner/src/providers/claude.ts @@ -3,7 +3,7 @@ import path from 'path'; import { query as sdkQuery, type HookCallback, type PreCompactHookInput } from '@anthropic-ai/claude-agent-sdk'; -import type { AgentProvider, AgentQuery, ProviderEvent, QueryInput } from './types.js'; +import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent, ProviderOptions, QueryInput } from './types.js'; function log(msg: string): void { console.error(`[claude-provider] ${msg}`); @@ -161,31 +161,61 @@ function createPreCompactHook(assistantName?: string): HookCallback { // ── Provider ── -export class ClaudeProvider implements AgentProvider { - private assistantName?: string; +/** + * Claude Code auto-compacts context at this window (tokens). Kept here so + * the generic bootstrap doesn't need to know about Claude-specific env vars. + */ +const CLAUDE_CODE_AUTO_COMPACT_WINDOW = '165000'; - constructor(opts?: { assistantName?: string }) { - this.assistantName = opts?.assistantName; +/** + * Stale-session detection. Matches Claude Code's error text when a + * resumed session can't be found — missing transcript .jsonl, unknown + * session ID, etc. + */ +const STALE_SESSION_RE = /no conversation found|ENOENT.*\.jsonl|session.*not found/i; + +export class ClaudeProvider implements AgentProvider { + readonly supportsNativeSlashCommands = true; + + private assistantName?: string; + private mcpServers: Record; + private env: Record; + private additionalDirectories?: string[]; + + constructor(options: ProviderOptions = {}) { + this.assistantName = options.assistantName; + this.mcpServers = options.mcpServers ?? {}; + this.additionalDirectories = options.additionalDirectories; + this.env = { + ...(options.env ?? {}), + CLAUDE_CODE_AUTO_COMPACT_WINDOW, + }; + } + + isSessionInvalid(err: unknown): boolean { + const msg = err instanceof Error ? err.message : String(err); + return STALE_SESSION_RE.test(msg); } query(input: QueryInput): AgentQuery { const stream = new MessageStream(); stream.push(input.prompt); + const instructions = input.systemContext?.instructions; + const sdkResult = sdkQuery({ prompt: stream, options: { cwd: input.cwd, - additionalDirectories: input.additionalDirectories, - resume: input.sessionId, - resumeSessionAt: input.resumeAt, - systemPrompt: input.systemPrompt ? { type: 'preset' as const, preset: 'claude_code' as const, append: input.systemPrompt } : undefined, + additionalDirectories: this.additionalDirectories, + resume: input.continuation, + systemPrompt: instructions ? { type: 'preset' as const, preset: 'claude_code' as const, append: instructions } : undefined, allowedTools: TOOL_ALLOWLIST, - env: input.env, + env: this.env, permissionMode: 'bypassPermissions', allowDangerouslySkipPermissions: true, settingSources: ['project', 'user'], - mcpServers: input.mcpServers, + mcpServers: this.mcpServers, hooks: { PreCompact: [{ hooks: [createPreCompactHook(this.assistantName)] }], }, @@ -204,7 +234,7 @@ export class ClaudeProvider implements AgentProvider { yield { type: 'activity' }; if (message.type === 'system' && message.subtype === 'init') { - yield { type: 'init', sessionId: message.session_id }; + yield { type: 'init', continuation: message.session_id }; } else if (message.type === 'result') { const text = 'result' in message ? (message as { result?: string }).result ?? null : null; yield { type: 'result', text }; diff --git a/container/agent-runner/src/providers/factory.ts b/container/agent-runner/src/providers/factory.ts index 077fd08..cf20b45 100644 --- a/container/agent-runner/src/providers/factory.ts +++ b/container/agent-runner/src/providers/factory.ts @@ -1,15 +1,15 @@ -import type { AgentProvider } from './types.js'; +import type { AgentProvider, ProviderOptions } from './types.js'; import { ClaudeProvider } from './claude.js'; import { MockProvider } from './mock.js'; export type ProviderName = 'claude' | 'mock'; -export function createProvider(name: ProviderName, opts?: { assistantName?: string }): AgentProvider { +export function createProvider(name: ProviderName, options: ProviderOptions = {}): AgentProvider { switch (name) { case 'claude': - return new ClaudeProvider(opts); + return new ClaudeProvider(options); case 'mock': - return new MockProvider(); + return new MockProvider(options); default: throw new Error(`Unknown provider: ${name}`); } diff --git a/container/agent-runner/src/providers/mock.ts b/container/agent-runner/src/providers/mock.ts index 0794557..d283957 100644 --- a/container/agent-runner/src/providers/mock.ts +++ b/container/agent-runner/src/providers/mock.ts @@ -1,16 +1,22 @@ -import type { AgentProvider, AgentQuery, ProviderEvent, QueryInput } from './types.js'; +import type { AgentProvider, AgentQuery, ProviderEvent, ProviderOptions, QueryInput } from './types.js'; /** * Mock provider for testing. Returns canned responses. * Supports push() — queued messages produce additional results. */ export class MockProvider implements AgentProvider { + readonly supportsNativeSlashCommands = false; + private responseFactory: (prompt: string) => string; - constructor(responseFactory?: (prompt: string) => string) { + constructor(_options: ProviderOptions = {}, responseFactory?: (prompt: string) => string) { this.responseFactory = responseFactory ?? ((prompt) => `Mock response to: ${prompt.slice(0, 100)}`); } + isSessionInvalid(_err: unknown): boolean { + return false; + } + query(input: QueryInput): AgentQuery { const pending: string[] = []; let waiting: (() => void) | null = null; @@ -21,7 +27,7 @@ export class MockProvider implements AgentProvider { const events: AsyncIterable = { async *[Symbol.asyncIterator]() { yield { type: 'activity' }; - yield { type: 'init', sessionId: `mock-session-${Date.now()}` }; + yield { type: 'init', continuation: `mock-session-${Date.now()}` }; // Process initial prompt yield { type: 'activity' }; diff --git a/container/agent-runner/src/providers/types.ts b/container/agent-runner/src/providers/types.ts index b0ad4da..55ab919 100644 --- a/container/agent-runner/src/providers/types.ts +++ b/container/agent-runner/src/providers/types.ts @@ -1,32 +1,52 @@ export interface AgentProvider { + /** + * True if the provider's underlying SDK handles slash commands natively and + * wants them passed through as raw text. When false, the poll-loop formats + * slash commands like any other chat message. + */ + readonly supportsNativeSlashCommands: boolean; + /** Start a new query. Returns a handle for streaming input and output. */ query(input: QueryInput): AgentQuery; + + /** + * True if the given error indicates the stored continuation is invalid + * (missing transcript, unknown session, etc.) and should be cleared. + */ + isSessionInvalid(err: unknown): boolean; +} + +/** + * Options passed to provider constructors. Fields are common to most + * providers; individual providers may ignore any they don't need. + */ +export interface ProviderOptions { + assistantName?: string; + mcpServers?: Record; + env?: Record; + additionalDirectories?: string[]; } export interface QueryInput { /** Initial prompt (already formatted by agent-runner). */ prompt: string; - /** Session ID to resume, if any. */ - sessionId?: string; - - /** Resume from a specific point in the session (provider-specific). */ - resumeAt?: string; + /** + * Opaque continuation token from a previous query. The provider decides + * what this means (session ID, thread ID, nothing at all). + */ + continuation?: string; /** Working directory inside the container. */ cwd: string; - /** MCP server configurations. */ - mcpServers: Record; - - /** System prompt / developer instructions. */ - systemPrompt?: string; - - /** Environment variables for the SDK process. */ - env: Record; - - /** Additional directories the agent can access. */ - additionalDirectories?: string[]; + /** + * System context to inject. Providers translate this into whatever their + * SDK expects (preset append, full system prompt, per-turn injection…). + */ + systemContext?: { + instructions?: string; + }; } export interface McpServerConfig { @@ -50,8 +70,13 @@ export interface AgentQuery { } export type ProviderEvent = - | { type: 'init'; sessionId: string } + | { type: 'init'; continuation: string } | { type: 'result'; text: string | null } | { type: 'error'; message: string; retryable: boolean; classification?: string } | { type: 'progress'; message: string } + /** + * Liveness signal. Providers MUST yield this on every underlying SDK + * event (tool call, thinking, partial message, anything) so the + * poll-loop's idle timer stays honest during long tool runs. + */ | { type: 'activity' };