750 lines
28 KiB
Markdown
750 lines
28 KiB
Markdown
# NanoClaw Agent-Runner 详解
|
||
|
||
容器内 agent-runner(代理运行器)的实现级细节。高层设计请参见 [architecture.md](architecture.md)。
|
||
|
||
## 关注点分离
|
||
|
||
agent-runner 分为两层:
|
||
|
||
1. **Agent-runner 核心** — 负责 poll loop(轮询循环)、消息格式化、数据库读写、MCP tool(MCP 工具)实现、路由、状态管理、媒体处理。此部分是 NanoClaw 专有的,跨所有 provider(提供器)共享。
|
||
|
||
2. **Agent provider(代理提供器)** — 负责 SDK 交互。接收格式化后的提示词,将其推送到 SDK,并回传事件。主干代码内置 `claude` provider;其他 provider(OpenCode、Codex 等)通过 `/add-<provider>` 技能从 `providers` 分支安装。
|
||
|
||
边界划分:agent-runner 决定**发送什么**以及**如何处理结果**。provider 决定**如何与 SDK 通信**。
|
||
|
||
## AgentProvider 接口
|
||
|
||
```typescript
|
||
interface AgentProvider {
|
||
/** 启动一个新的查询。返回一个用于流式输入和输出的句柄。 */
|
||
query(input: QueryInput): AgentQuery;
|
||
}
|
||
|
||
interface QueryInput {
|
||
/** 初始提示词(已由 agent-runner 格式化)。
|
||
* 纯文本时使用 String。多模态(图片、PDF、音频)时使用 ContentBlock[]。 */
|
||
prompt: string | ContentBlock[];
|
||
|
||
/** 要恢复的会话 ID(如有) */
|
||
sessionId?: string;
|
||
|
||
/** 从会话的特定位置恢复(provider 特定字段,可能被忽略) */
|
||
resumeAt?: string;
|
||
|
||
/** 容器内的工作目录 */
|
||
cwd: string;
|
||
|
||
/** MCP server(MCP 服务器)配置(标准化格式 — provider 负责转换) */
|
||
mcpServers: Record<string, McpServerConfig>;
|
||
|
||
/** 系统提示词 / 开发者指令 */
|
||
systemPrompt?: string;
|
||
|
||
/** SDK 进程的环境变量 */
|
||
env: Record<string, string | undefined>;
|
||
|
||
/** agent 可访问的额外目录 */
|
||
additionalDirectories?: string[];
|
||
}
|
||
|
||
interface McpServerConfig {
|
||
command: string;
|
||
args: string[];
|
||
env: Record<string, string>;
|
||
}
|
||
|
||
interface AgentQuery {
|
||
/** 将一条后续消息推送到活动查询中 */
|
||
push(message: string): void;
|
||
|
||
/** 表示不再发送更多输入 */
|
||
end(): void;
|
||
|
||
/** 输出事件流 */
|
||
events: AsyncIterable<ProviderEvent>;
|
||
|
||
/** 强制停止查询(例如容器正在关闭) */
|
||
abort(): void;
|
||
}
|
||
|
||
type ProviderEvent =
|
||
| { type: 'init'; sessionId: string }
|
||
| { type: 'result'; text: string | null }
|
||
| { type: 'error'; message: string; retryable: boolean; classification?: string }
|
||
| { type: 'progress'; message: string };
|
||
```
|
||
|
||
### 接口不包含的内容
|
||
|
||
- **消息格式化** — agent-runner 在传递给 provider 之前格式化消息。provider 接收的是可直接发送的提示词字符串。
|
||
- **Hooks(钩子)** — Claude 特有功能。Claude provider 在内部注册 hooks(PreCompact、PreToolUse 等)。其他 provider 不需要。
|
||
- **工具许可列表** — Claude 使用 `allowedTools`。Codex 使用 `approvalPolicy`。OpenCode 使用 `permission`。各 provider 基于相同的意图("允许一切,无需提示")在内部自行配置。
|
||
- **会话持久化** — Claude 自动将会话持久化到磁盘。Codex 和 OpenCode 管理各自的 session 状态。agent-runner 不控制这一点 — 它只传递 `sessionId` 和 `resumeAt`。
|
||
- **沙箱配置** — provider 特定。各 provider 在内部配置自己的沙箱。
|
||
|
||
### Provider 事件语义
|
||
|
||
- **`init`** — 每次查询当 provider 建立或恢复 session 时发送一次。agent-runner 捕获 `sessionId` 用于后续恢复。
|
||
- **`result`** — 当 agent 生成完整响应时发送。每次查询可发送多次(例如 Claude 的多轮子 agent 交互)。agent-runner 将每个 result 写入 `messages_out`。
|
||
- **`error`** — 失败时发送。`retryable` 指示 agent-runner 是否应重试。`classification` 是可选的详细分类(如 `'quota'`、`'auth'`、`'transport'`)。
|
||
- **`progress`** — 可选,用于日志记录。agent-runner 记录这些事件但不据此作出行动。
|
||
|
||
## Provider 实现
|
||
|
||
主干代码仅内置 `claude` provider。下文的 Codex 和 OpenCode 章节记录了 provider 接口以供参考,以及供安装额外 provider 的技能使用 — 它们并非核心镜像的内置部分。
|
||
|
||
### Claude Provider
|
||
|
||
封装 `@anthropic-ai/claude-agent-sdk` 的 `query()`。
|
||
|
||
```typescript
|
||
class ClaudeProvider implements AgentProvider {
|
||
query(input: QueryInput): AgentQuery {
|
||
const stream = new MessageStream(); // AsyncIterable<SDKUserMessage>
|
||
stream.push(input.prompt);
|
||
|
||
const sdkQuery = query({
|
||
prompt: stream,
|
||
options: {
|
||
cwd: input.cwd,
|
||
resume: input.sessionId,
|
||
resumeSessionAt: input.resumeAt,
|
||
systemPrompt: input.systemPrompt
|
||
? { type: 'preset', preset: 'claude_code', append: input.systemPrompt }
|
||
: undefined,
|
||
mcpServers: input.mcpServers, // 已是正确格式
|
||
additionalDirectories: input.additionalDirectories,
|
||
env: input.env,
|
||
allowedTools: NANOCLAW_TOOL_ALLOWLIST,
|
||
permissionMode: 'bypassPermissions',
|
||
allowDangerouslySkipPermissions: true,
|
||
hooks: {
|
||
PreCompact: [{ hooks: [preCompactHook] }],
|
||
PreToolUse: [{ matcher: 'Bash', hooks: [sanitizeBashHook] }],
|
||
},
|
||
},
|
||
});
|
||
|
||
return {
|
||
push: (msg) => stream.push(msg),
|
||
end: () => stream.end(),
|
||
abort: () => sdkQuery.close(),
|
||
events: translateClaudeEvents(sdkQuery),
|
||
};
|
||
}
|
||
}
|
||
```
|
||
|
||
`translateClaudeEvents` 是一个异步生成器,将 SDK 消息映射为 `ProviderEvent`:
|
||
- `message.type === 'system' && message.subtype === 'init'` → `{ type: 'init', sessionId }`
|
||
- `message.type === 'result'` → `{ type: 'result', text }`
|
||
- `message.type === 'system' && message.subtype === 'api_retry'` → `{ type: 'error', retryable: true }`
|
||
- `message.type === 'system' && message.subtype === 'rate_limit_event'` → `{ type: 'error', retryable: false, classification: 'quota' }`
|
||
- `message.type === 'system' && message.subtype === 'task_notification'` → `{ type: 'progress', message }`
|
||
- 其他一切 → 仅记录日志,不发送事件
|
||
|
||
**在 provider 内部保留的 Claude 特有功能:**
|
||
- `MessageStream` 用于异步可迭代输入(基于推送模式)
|
||
- `resumeSessionAt` 用于在特定消息 UUID 处恢复
|
||
- PreCompact hook 用于对话转录归档
|
||
- PreToolUse hook 用于清理 bash 环境变量
|
||
- 完整的工具许可列表
|
||
- `additionalDirectories` 用于多目录访问
|
||
|
||
### Codex Provider
|
||
|
||
封装 `@openai/codex-sdk`。
|
||
|
||
```typescript
|
||
class CodexProvider implements AgentProvider {
|
||
query(input: QueryInput): AgentQuery {
|
||
const codex = new Codex(this.buildOptions(input));
|
||
const thread = input.sessionId
|
||
? codex.resumeThread(input.sessionId, this.threadOptions(input))
|
||
: codex.startThread(this.threadOptions(input));
|
||
|
||
const abortController = new AbortController();
|
||
let pendingFollowUp: string | null = null;
|
||
|
||
return {
|
||
push: (msg) => {
|
||
// Codex 不支持流式输入。
|
||
// 存储后续消息并中止当前轮次。
|
||
pendingFollowUp = msg;
|
||
abortController.abort();
|
||
},
|
||
end: () => { /* 无操作 — Codex 轮次自然结束 */ },
|
||
abort: () => abortController.abort(),
|
||
events: this.run(thread, input.prompt, abortController, () => pendingFollowUp),
|
||
};
|
||
}
|
||
|
||
private async *run(thread, prompt, abortController, getPendingFollowUp): AsyncIterable<ProviderEvent> {
|
||
let currentPrompt = prompt;
|
||
|
||
while (true) {
|
||
try {
|
||
const streamed = await thread.runStreamed(currentPrompt, {
|
||
signal: abortController.signal,
|
||
});
|
||
|
||
let sessionId: string | undefined;
|
||
let resultText = '';
|
||
|
||
for await (const event of streamed.events) {
|
||
if (event.type === 'thread.started') {
|
||
sessionId = event.thread_id;
|
||
yield { type: 'init', sessionId };
|
||
}
|
||
if (event.type === 'item.completed' && event.item.type === 'agent_message') {
|
||
resultText = event.item.text || resultText;
|
||
}
|
||
if (event.type === 'turn.failed') {
|
||
yield { type: 'error', message: event.error.message, retryable: false };
|
||
return;
|
||
}
|
||
}
|
||
|
||
yield { type: 'result', text: resultText || null };
|
||
|
||
// 检查本轮次中是否有后续消息排入队列
|
||
const followUp = getPendingFollowUp();
|
||
if (followUp) {
|
||
currentPrompt = followUp;
|
||
// 为下一次迭代重置
|
||
continue;
|
||
}
|
||
|
||
return;
|
||
} catch (err) {
|
||
if (abortController.signal.aborted && getPendingFollowUp()) {
|
||
// 因后续消息被中止 — 使用新提示词重启
|
||
currentPrompt = getPendingFollowUp();
|
||
abortController = new AbortController();
|
||
continue;
|
||
}
|
||
throw err;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**在 provider 内部保留的 Codex 特有行为:**
|
||
- `developer_instructions` 用于系统提示词(从 CLAUDE.md 加载)
|
||
- 工作区中执行 `git init`(Codex 需要 git 仓库)
|
||
- 中止+重启模式处理后续消息
|
||
- 从环境变量中读取 `sandboxMode`、`approvalPolicy`、`networkAccessEnabled`
|
||
- 对话归档(Codex 没有 PreCompact)
|
||
|
||
### OpenCode Provider
|
||
|
||
封装 `@opencode-ai/sdk`。
|
||
|
||
```typescript
|
||
class OpenCodeProvider implements AgentProvider {
|
||
query(input: QueryInput): AgentQuery {
|
||
// OpenCode 运行本地服务器 — 创建一次,跨查询复用
|
||
const { client, server } = await createOpencode({ config: this.buildConfig(input) });
|
||
const { stream } = await client.event.subscribe();
|
||
|
||
let aborted = false;
|
||
let pendingFollowUp: string | null = null;
|
||
|
||
return {
|
||
push: (msg) => {
|
||
pendingFollowUp = msg;
|
||
server.close(); // 中断当前查询
|
||
},
|
||
end: () => { /* 无操作 */ },
|
||
abort: () => { aborted = true; server.close(); },
|
||
events: this.run(client, server, stream, input, () => pendingFollowUp),
|
||
};
|
||
}
|
||
|
||
private async *run(client, server, stream, input, getPendingFollowUp): AsyncIterable<ProviderEvent> {
|
||
const session = await client.session.create();
|
||
yield { type: 'init', sessionId: session.data.id };
|
||
|
||
await client.session.promptAsync({
|
||
path: { id: session.data.id },
|
||
body: { parts: [{ type: 'text', text: input.prompt }] },
|
||
});
|
||
|
||
for await (const event of stream) {
|
||
if (event.type === 'session.idle') {
|
||
// 从累积的消息部分中收集结果文本
|
||
const resultText = this.extractResult(event);
|
||
yield { type: 'result', text: resultText };
|
||
|
||
const followUp = getPendingFollowUp();
|
||
if (followUp) {
|
||
await client.session.promptAsync({
|
||
path: { id: session.data.id },
|
||
body: { parts: [{ type: 'text', text: followUp }] },
|
||
});
|
||
continue;
|
||
}
|
||
|
||
return;
|
||
}
|
||
|
||
if (event.type === 'session.error') {
|
||
yield { type: 'error', message: event.properties?.error?.data?.message, retryable: false };
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**在 provider 内部保留的 OpenCode 特有行为:**
|
||
- 本地 gRPC/HTTP 服务器生命周期(`server.close()`)
|
||
- SSE 事件流用于输出
|
||
- 通过配置选择 provider/model(`OPENCODE_PROVIDER`、`OPENCODE_MODEL`)
|
||
- MCP config 格式转换(`type: 'local'`、`command: [cmd, ...args]`、`environment`)
|
||
- 通过提示词文本中以 `<system>` 前缀注入系统提示词
|
||
- 不支持恢复(session 始终是新建的或按 ID 复用的)
|
||
|
||
## Agent-Runner 核心
|
||
|
||
以下所有内容均由 agent-runner 处理,而非 provider。
|
||
|
||
### 轮询循环
|
||
|
||
```
|
||
┌─────────────────────────────────────────┐
|
||
│ │
|
||
│ 1. 查询 messages_in 中待处理的行 │
|
||
│ WHERE status = 'pending' │
|
||
│ AND (process_after IS NULL │
|
||
│ OR process_after <= now()) │
|
||
│ │
|
||
│ 2. 如果找到行: │
|
||
│ a. 设置 status = 'processing' │
|
||
│ b. 按 kind 格式化消息 │
|
||
│ c. 剥离路由字段 │
|
||
│ d. 调用 provider.query(prompt) │
|
||
│ e. 处理 provider 事件 │
|
||
│ f. 将结果写入 messages_out │
|
||
│ g. 设置 status = 'completed' │
|
||
│ │
|
||
│ 3. 当查询处于活动状态时: │
|
||
│ - 继续轮询 messages_in │
|
||
│ - 新消息 → provider.push() │
|
||
│ │
|
||
│ 4. 查询结束时: │
|
||
│ - 回到步骤 1 │
|
||
│ - 若无消息,休眠并重新轮询 │
|
||
│ │
|
||
└─────────────────────────────────────────┘
|
||
```
|
||
|
||
**活动查询期间的并发轮询:** 当 provider 正在运行查询时,agent-runner 以短间隔(约 500ms)持续轮询 `messages_in`。新的待处理消息被格式化并通过 `provider.push()` 推送到活动查询中。这使得在 agent 处理过程中后续消息可以到达 — Claude 原生支持此方式,Codex/OpenCode 通过内部的中止+重启来处理。
|
||
|
||
**空闲行为:** 当没有待处理消息且没有活动查询时,agent-runner 短暂休眠(1s)并重新轮询。容器保持运行状态直到 host(主机)将其终止(空闲超时)。
|
||
|
||
**空闲检测的例外情况:** 在以下情况下容器**不应**被视为空闲:
|
||
- 一个 `ask_user_question` 工具调用正在等待(等待用户在 `messages_in` 中的响应)
|
||
- agent 正在活跃工作中(工具调用进行中、子 agent 运行中)
|
||
|
||
agent-runner 向 host 发送"忙碌"状态信号。具体机制因 provider 而异 — 对于 Claude,查询 AsyncGenerator 仍在产出事件。对于其他 provider,agent-runner 可以将心跳或状态指示器写入 session DB(会话数据库),host 在终止前会检查该状态。
|
||
|
||
### 消息格式化
|
||
|
||
agent-runner 将 `messages_in` 行转换为提示词字符串。provider 接收的是可直接发送的字符串 — 它不知道消息的种类或路由信息。
|
||
|
||
**路由字段剥离:** `platform_id`、`channel_type`、`thread_id` 永远不会包含在提示词中。它们作为上下文存储,用于写入 `messages_out`。
|
||
|
||
**按 kind 分类的单条消息格式化:**
|
||
|
||
- **`chat`** — 格式化为消息 XML:
|
||
```xml
|
||
<message sender="John" time="2024-01-01 10:00">
|
||
Check this PR
|
||
</message>
|
||
```
|
||
|
||
- **`chat-sdk`** — 从序列化的 Chat SDK 消息中提取字段:
|
||
```xml
|
||
<message sender="John (john@slack)" time="2024-01-01 10:00">
|
||
Check this PR
|
||
[image: screenshot.png — https://signed-url...]
|
||
</message>
|
||
```
|
||
附件以内联方式列出。Claude 原生支持的图片/PDF 以 content block(内容块)方式传递(参见下文的媒体处理部分)。
|
||
|
||
- **`task`** — 任务提示词,可选择附带脚本输出:
|
||
```
|
||
[SCHEDULED TASK]
|
||
|
||
Script output:
|
||
{"data": ...}
|
||
|
||
Instructions:
|
||
Review open PRs
|
||
```
|
||
|
||
- **`webhook`** — webhook 负载:
|
||
```
|
||
[WEBHOOK: github/pull_request]
|
||
|
||
{"action": "opened", "pull_request": {...}}
|
||
```
|
||
|
||
- **`system`** — host 操作结果(对先前系统请求的响应):
|
||
```
|
||
[SYSTEM RESPONSE]
|
||
|
||
Action: register_agent_group
|
||
Status: success
|
||
Result: {"agent_group_id": "ag-456"}
|
||
```
|
||
|
||
**批量格式化:** 多条待处理消息合并为一个提示词:
|
||
|
||
```xml
|
||
<context timezone="America/Los_Angeles">
|
||
<messages>
|
||
<message sender="John" time="10:00">Check this PR</message>
|
||
<message sender="Jane" time="10:01">Already on it</message>
|
||
</messages>
|
||
```
|
||
|
||
混合 kind(例如一条 chat 消息 + 一条 system 响应)用清晰的分隔符组合。每个部分按 kind 标注。
|
||
|
||
**命令检测:** 以 `/` 开头的消息会与命令列表进行匹配。识别到的命令会绕过格式化,原样传递给 provider(用于 Claude 的斜杠命令处理),或被 agent-runner 拦截(用于会话重置等 NanoClaw 级别的命令)。
|
||
|
||
### 路由
|
||
|
||
当 agent-runner 拾取 `messages_in` 行时,它会从批次中捕获路由字段:
|
||
|
||
```typescript
|
||
interface RoutingContext {
|
||
platformId: string | null;
|
||
channelType: string | null;
|
||
threadId: string | null;
|
||
inReplyTo: string | null; // 触发消息的 messages_in.id
|
||
}
|
||
```
|
||
|
||
写入 `messages_out`(无论是来自 provider 结果还是 MCP tool 调用)时,agent-runner 默认会复制此路由上下文。agent 永远不会看到路由字段 — 它只生成文本。路由是隐式的:"回复发送消息的人。"
|
||
|
||
目标为其他目的地的 MCP tool(例如带显式 channel 参数的 `send_to_agent`、`send_message`)会覆盖该特定 `messages_out` 行的路由上下文。
|
||
|
||
### 状态管理
|
||
|
||
agent-runner 管理 `messages_in` 上的 `status` 和 `status_changed` 字段:
|
||
|
||
```
|
||
pending → processing → completed
|
||
→ failed (若 provider 返回错误且已达到最大重试次数)
|
||
```
|
||
|
||
- **拾取:** `UPDATE messages_in SET status = 'processing', status_changed = now(), tries = tries + 1 WHERE id IN (...)`
|
||
- **完成:** `UPDATE messages_in SET status = 'completed', status_changed = now() WHERE id IN (...)`
|
||
- **错误:** agent-runner 不设置 `failed` — 它将消息保持为 `processing`。host 通过 `status_changed` 检测过时的 processing 状态并处理重试逻辑(重置为 pending,带退避策略)。这样将重试策略保留在 host 端。
|
||
|
||
### MCP 工具
|
||
|
||
agent-runner 运行一个 MCP server(MCP 服务器),向 agent 暴露 NanoClaw 工具。所有工具都写入 session DB。
|
||
|
||
**数据库路径:** MCP server 通过环境变量接收 session 数据库路径。它打开第二个连接到同一个 SQLite 文件(WAL 模式允许并发访问)。
|
||
|
||
#### send_message
|
||
|
||
向当前对话(或指定 destination(目的地))发送聊天消息。
|
||
|
||
```typescript
|
||
{
|
||
name: 'send_message',
|
||
params: {
|
||
text: string, // 消息内容
|
||
channel?: string, // 可选:目标 channel 类型(默认:回复来源)
|
||
platformId?: string, // 可选:目标 platform ID
|
||
threadId?: string, // 可选:目标 thread ID
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:写入一个 `messages_out` 行,`kind: 'chat'`。如果提供了 channel/platformId/threadId,则使用这些作为路由。否则,从当前路由上下文中复制。
|
||
|
||
#### send_file
|
||
|
||
向当前对话发送文件。
|
||
|
||
```typescript
|
||
{
|
||
name: 'send_file',
|
||
params: {
|
||
path: string, // 文件路径(相对于 /workspace/agent/ 或绝对路径)
|
||
text?: string, // 可选:附带消息
|
||
filename?: string, // 显示名称(默认:path 的 basename)
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:
|
||
1. 生成消息 ID
|
||
2. 创建 `outbox/{messageId}/` 目录
|
||
3. 将文件复制到 outbox 目录中
|
||
4. 写入一个 `messages_out` 行,内容中包含 `files: [filename]`
|
||
|
||
#### send_card
|
||
|
||
发送结构化卡片(交互式或仅展示)。
|
||
|
||
```typescript
|
||
{
|
||
name: 'send_card',
|
||
params: {
|
||
card: CardElement, // 卡片结构(title、children、actions)
|
||
fallbackText?: string, // 不支持卡片的平台使用的文本回退
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:写入一个 `messages_out` 行,`kind: 'chat-sdk'`,内容中包含卡片结构。
|
||
|
||
#### ask_user_question
|
||
|
||
发送一个交互式问题并等待用户响应。这是一个**阻塞式工具调用** — 直到用户响应后工具才会返回。
|
||
|
||
```typescript
|
||
{
|
||
name: 'ask_user_question',
|
||
params: {
|
||
title: string, // 简短卡片标题,例如 "Confirm deletion"
|
||
question: string,
|
||
options: (string | { label: string; selectedLabel?: string; value?: string })[],
|
||
timeout?: number, // 秒(默认:300)
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:
|
||
1. 生成 `questionId`
|
||
2. 写入一个 `messages_out` 行,包含 `operation: 'ask_question'`、问题、选项和 questionId
|
||
3. 轮询 `messages_in` 中是否有内容匹配的 `questionId` 的行
|
||
4. 找到后,将 `selectedOption` 作为工具结果返回
|
||
5. 如果超时到期,将超时错误作为工具结果返回
|
||
|
||
agent 的执行在此工具调用处暂停。provider 的查询继续运行(Claude 保持工具调用打开)。agent-runner 在单独循环中轮询响应。
|
||
|
||
#### edit_message
|
||
|
||
编辑先前发送的消息。
|
||
|
||
```typescript
|
||
{
|
||
name: 'edit_message',
|
||
params: {
|
||
messageId: string, // 显示给 agent 的整数 ID
|
||
text: string, // 新内容
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:写入一个 `messages_out` 行,包含 `operation: 'edit'`、消息 ID 和新文本。
|
||
|
||
#### add_reaction
|
||
|
||
向消息添加表情反应。
|
||
|
||
```typescript
|
||
{
|
||
name: 'add_reaction',
|
||
params: {
|
||
messageId: string, // 显示给 agent 的整数 ID
|
||
emoji: string, // 表情名称(如 'thumbs_up')
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:写入一个 `messages_out` 行,包含 `operation: 'reaction'`。
|
||
|
||
#### send_to_agent
|
||
|
||
向另一个 agent group(代理组)发送消息。
|
||
|
||
```typescript
|
||
{
|
||
name: 'send_to_agent',
|
||
params: {
|
||
agentGroupId: string, // 目标 agent group
|
||
text: string, // 消息内容
|
||
sessionId?: string, // 可选:目标特定 session
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:写入一个 `messages_out` 行,包含 `channel_type: 'agent'`、`platform_id: agentGroupId`、`thread_id: sessionId`。
|
||
|
||
#### schedule_task
|
||
|
||
安排一次性或 recurring task(周期性任务)。
|
||
|
||
```typescript
|
||
{
|
||
name: 'schedule_task',
|
||
params: {
|
||
prompt: string, // 任务提示词
|
||
processAfter: string, // 首次运行的 ISO 时间戳
|
||
recurrence?: string, // cron 表达式(可选)
|
||
script?: string, // 前置脚本(可选)
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:写入一个 `messages_in` 行(发给自己),包含 `kind: 'task'`、`process_after` 和可选的 `recurrence`。host sweep 在到期时拾取。
|
||
|
||
#### list_tasks
|
||
|
||
列出活跃的 scheduled/recurring 任务。
|
||
|
||
```typescript
|
||
{
|
||
name: 'list_tasks',
|
||
params: {}
|
||
}
|
||
```
|
||
|
||
实现:查询 `messages_in WHERE recurrence IS NOT NULL AND status != 'failed'`。
|
||
|
||
#### cancel_task / pause_task / resume_task / update_task
|
||
|
||
修改已安排的任务。
|
||
|
||
```typescript
|
||
{
|
||
name: 'cancel_task',
|
||
params: { taskId: string }
|
||
}
|
||
// pause_task: 设置 status = 'paused'(周期性任务的新状态值)
|
||
// resume_task: 设置 status = 'pending'
|
||
// update_task: 将 { prompt?, recurrence?, processAfter?, script? } 合并到活动行中
|
||
```
|
||
|
||
实现:cancel/pause/resume 直接更新活动行。update_task 作为 system action 发送 — host 读取当前内容,合并提供的字段,并写回。所有四个操作均通过 `(id = ? OR series_id = ?) AND kind='task' AND status IN ('pending','paused')` 匹配,因此即使 agent 传入原始(现已完成的)id,也能触及周期性任务的下一个活动实例。
|
||
|
||
#### register_agent_group
|
||
|
||
注册一个新的 agent group(仅限 admin)。
|
||
|
||
```typescript
|
||
{
|
||
name: 'register_agent_group',
|
||
params: {
|
||
name: string,
|
||
folder: string,
|
||
platformId: string, // 要连接的 messaging group
|
||
channelType: string,
|
||
triggerRules?: object,
|
||
sessionMode?: 'shared' | 'per-thread',
|
||
}
|
||
}
|
||
```
|
||
|
||
实现:写入一个 `messages_out` 行,包含 `kind: 'system'`、`action: 'register_agent_group'`。host 读取、验证 admin 权限、在 central DB 中创建实体行,并写入一个 `system` 类型的 `messages_in` 响应。
|
||
|
||
### 媒体处理
|
||
|
||
#### 入站(messages_in → agent 提示词)
|
||
|
||
agent-runner 检查 chat/chat-sdk 消息中的附件(attachment),并根据类型和 provider 能力进行处理:
|
||
|
||
**Provider 原生的 content block:**
|
||
|
||
| 类型 | Claude | Codex / OpenCode |
|
||
|------|--------|------------------|
|
||
| 图片(JPEG、PNG、GIF、WebP) | 原生图片 content block | 保存到磁盘 |
|
||
| PDF | 原生文档 content block | 保存到磁盘 |
|
||
| 音频 | 原生音频 content block | 保存到磁盘 |
|
||
| 其他文件(代码、数据、视频、存档) | 保存到磁盘 | 保存到磁盘 |
|
||
|
||
**"保存到磁盘"** 的含义是:下载到 `/workspace/downloads/{messageId}/`,在提示词文本中引用:
|
||
|
||
```
|
||
<message sender="John" time="10:00">
|
||
Check this spreadsheet
|
||
[file available at: /workspace/downloads/msg-123/data.xlsx]
|
||
</message>
|
||
```
|
||
|
||
agent 可以使用工具(Read、Bash)访问保存的文件。
|
||
|
||
对于无法直接下载的 channel(例如 WhatsApp 缓冲流),channel adapter 通过本地 URL 提供媒体内容。agent-runner 从该 URL 下载。
|
||
|
||
**Content block 构建(Claude):** agent-runner 构建多部分 `MessageParam` 内容:`[{ type: 'image', source: { type: 'base64', media_type, data } }, { type: 'text', text: '...' }]`。此时传递给 provider 的提示词不是纯字符串 — `QueryInput.prompt` 字段需要支持 Claude 的结构化内容。provider 的 `query()` 方法处理特定格式的构建。
|
||
|
||
**Content block 构建(Codex/OpenCode):** 一切皆为文本。文件引用以内联方式出现在提示词字符串中。provider 接收的是纯字符串提示词。
|
||
|
||
#### 出站(agent → messages_out)
|
||
|
||
通过 `send_file` MCP tool 处理(参见上文)。agent 显式决定发送文件 — agent-runner 不会扫描输出中的文件引用。
|
||
|
||
### 任务前置脚本
|
||
|
||
对于 `kind` 为 `task` 且内容中包含 `script` 字段的消息:
|
||
|
||
1. agent-runner 将脚本写入临时文件
|
||
2. 使用 `bash` 执行(30s 超时)
|
||
3. 将 stdout 最后一行解析为 JSON:`{ wakeAgent: boolean, data?: unknown }`
|
||
4. 如果 `wakeAgent === false`:将消息标记为已完成,不调用 provider
|
||
5. 如果 `wakeAgent === true`:用脚本输出丰富提示词,然后调用 provider
|
||
|
||
### 对话转录归档
|
||
|
||
agent-runner 在上下文压缩前归档对话转录。对于 Claude,这通过 PreCompact hook 处理(provider 内部)。对于其他没有 hooks 的 provider,agent-runner 在每次查询完成后基于 provider 的输出进行归档。
|
||
|
||
归档位置:`/workspace/agent/conversations/{date}-{summary}.md`
|
||
|
||
### 会话恢复
|
||
|
||
agent-runner 在查询之间跟踪 `sessionId` 和 `resumeAt`:
|
||
|
||
- `sessionId` — 从 `ProviderEvent { type: 'init' }` 中捕获。在下一次查询时传回 `QueryInput.sessionId`。
|
||
- `resumeAt` — Claude 特有(最后一条 assistant 消息的 UUID)。由 agent-runner 存储,传递给 `QueryInput.resumeAt`。不支持的 provider 会忽略它。
|
||
|
||
这些是容器生命周期的临时数据。当容器被终止并重启时,host 会传递从 central DB sessions 表中存储的 `sessionId`。`resumeAt` 在容器重启时丢失(provider 从 session 末尾恢复)。
|
||
|
||
### 容器启动
|
||
|
||
agent-runner 通过以下方式接收配置:
|
||
|
||
- **环境变量:** `AGENT_PROVIDER`(claude/codex/opencode)、`NANOCLAW_ADMIN_USER_ID`、provider 特定变量(API 密钥、模型覆盖)、`TZ`
|
||
- **固定挂载路径:** Session DB 位于 `/workspace/session.db`。Agent group 文件夹位于 `/workspace/agent/`。系统提示词来自 `/workspace/agent/CLAUDE.md` 和 `/workspace/global/CLAUDE.md`。
|
||
- **可选启动配置:** 部分配置可能以 JSON 文件形式传递到固定路径(例如 `/workspace/config.json`),用于诸如要恢复的 session ID、assistant 名称和 admin user ID 等内容。这避免了对环境变量的过度使用。
|
||
|
||
agent-runner 读取配置,创建 provider,并进入轮询循环。无标准输入,无初始提示词 — 消息已经存在于 session DB 中。
|
||
|
||
### Provider 工厂
|
||
|
||
```typescript
|
||
type ProviderName = 'claude' | string;
|
||
|
||
function createProvider(name: ProviderName, config: ProviderConfig): AgentProvider {
|
||
// 主干代码注册 'claude';其他 provider 在通过技能安装时自行注册。
|
||
const factory = providerRegistry.get(name);
|
||
if (!factory) throw new Error(`Unknown provider: ${name}`);
|
||
return factory(config);
|
||
}
|
||
```
|
||
|
||
provider 名称来自容器的环境变量(`AGENT_PROVIDER` env var),由 host 根据 `agent_groups.agent_provider` 或 `sessions.agent_provider` 设置。
|
||
|
||
`ProviderConfig` 包含 provider 特定设置(API 密钥、模型覆盖等),通过环境变量传递 — 而非通过接口。每个 provider 根据自身需要从 `env` 中读取。
|
||
|
||
## Agent-Runner 属性
|
||
|
||
- MCP server 是由 provider(通过 `mcpServers` 配置)启动的独立 Node 进程
|
||
- MCP server 二进制文件跨 provider 共享 — 相同的工具、相同的数据库访问
|
||
- CLAUDE.md 加载(全局 + 每组) — agent-runner 读取并作为 `systemPrompt` 传递
|
||
- 额外目录发现(`/workspace/extra/*`)
|
||
- 通过 stderr 进行日志记录(`[agent-runner] ...`)
|
||
|
||
## 相关文档
|
||
|
||
- **[architecture.md](architecture.md)** — 高层架构(session DB schema、central DB、channel adapter、消息流)
|
||
- **[api-details.md](api-details.md)** — Channel adapter 接口、消息内容示例、host delivery 逻辑
|