11 KiB
Q13-Q15: 出站投递与系统动作
Q13: Agent 回复消息后,delivery.ts 怎么知道用哪个 channel adapter 发送?重试和失败怎么处理?
答案
Delivery 系统使用两层轮询:active poll(每 1s)扫描有运行容器的 session,sweep poll(每 60s)扫描所有 active session。从 outbound.db(container-owned)读取,在 inbound.db 的 delivered 表中跟踪投递状态。Channel adapter 是 boot 时设置的单个全局 ChannelDeliveryAdapter。
Adapter 的选择
messages_out的每行带有channel_type和platform_id字段(container 的writeMessageOut()填入)delivery.ts:356-363:deliveryAdapter.deliver(channelType, platformId, threadId, kind, content, files)被调用。Adapter 收到channelType+platformId,负责路由到正确的平台- Adapter 通过
setDeliveryAdapter()设置一次(line 95),是一个包装了所有 channel adapter 的ChannelDeliveryAdapter
完整投递流程
1. Poll 触发: pollActive()(1s,line 121-133)→ getRunningSessions();pollSweep()(60s,line 136-149)→ getActiveSessions()
2. 防竞态(line 151-162):inflightDeliveries 是 Set<string> — 如果 active poll 和 sweep poll 竞态同个 session,第二个调用者跳过,防止重复投递
3. Drain session(drainSession(),line 164-232):
- 只读打开
outbound.db,读写打开inbound.db getDueOutboundMessages(outDb)读messages_out(deliver_after <= now)getDeliveredIds(inDb)做去重比对(line 183)- 对每条未投递消息调用
deliverMessage()(line 192)
4. 消息路由(deliverMessage(),line 234-375):
- System actions(line 255-258):
msg.kind === 'system'→handleSystemAction()→ 查找actionHandlersMap。模块通过registerDeliveryAction()注册处理器 - Agent-to-agent(line 264-271):
msg.channel_type === 'agent'→routeAgentMessage() - Channel delivery(line 289-375):
- 权限检查(line 289-311):验证源 agent 是否有权发到目标 channel——要么目标是自己 session 的 origin(
session.messaging_group_id匹配),要么agent_destinations中有显式行 - Pending question 跟踪(line 317-340):
ask_question类型创建pending_questions行 - 文件附件(line 348-354):从 session 的
outbox/<messageId>/读文件 - Adapter call(line 356-363):实际发送
- 清理(line 372):
clearOutbox()删除 outbox 目录
- 权限检查(line 289-311):验证源 agent 是否有权发到目标 channel——要么目标是自己 session 的 origin(
重试和失败处理
- 投递尝试在
deliveryAttemptsMap 中以消息 ID 为 key 在内存中跟踪(进程重启时重置,给失败消息全新机会) - 在
MAX_DELIVERY_ATTEMPTS = 3次失败后,标记为 permanent failed(line 206-225) - 重试是惰性的:消息留在
messages_out表中未投递,下次 poll 迭代重新捡起
Q14: Agent 发起 install_packages 或 add_mcp_server 的完整审批-执行链路是什么?
答案
自我修改采用 fire-and-forget 模式 + admin 审批门控。Agent 调 MCP tool,tool 写 system message 到 outbound.db。Host delivery loop 拾起、验证/净化请求、排队审批,admin 批准后应用修改、重建镜像(如需要)、kill 容器、写 on_wake 消息给新容器。
完整链路(逐步)
Phase 1: Agent 请求(container 端)
-
Agent 调
install_packages或add_mcp_serverMCP tool(container/agent-runner/src/mcp-tools/self-mod.ts)install_packages(line 53-78):用正则验证包名(APT_RE、NPM_RE,最多 20 个),写kind: 'system'+action: 'install_packages'的 outbound messageadd_mcp_server(line 97-117):验证 name 和 command 存在,写action: 'add_mcp_server'
-
写
outbound.db:用writeMessageOut(),写入奇数 seq
Phase 2: Host delivery 拾起
delivery.ts→deliverMessage()看到kind === 'system'→handleSystemAction()handleSystemAction()(line 410-425):查actionHandlersMap。self-mod 模块注册了 handler:handleInstallPackages(self-mod/request.ts:20-64)handleAddMcpServer(self-mod/request.ts:66-91)
Phase 3: 请求验证 + 审批排队
-
Host 端验证(深度防御第二层):对 package 名称再次验证(同一正则),失败时调
notifyAgent()告知 agent,不创建审批 -
审批请求(
approvals/primitive.ts:164-220):pickApprover(session.agent_group_id)→ scoped admins → global admins → ownerspickApprovalDelivery:找到可 DM 的审批者,优先同 channel 类型- 用
deliveryAdapter.deliver()发ask_question卡片到 admin DM - 创建
pending_approvals行,包含action、payload(JSON)、approval_id、session_id
Phase 4: Admin 响应
approvals/response-handler.ts:24-43→handleApprovalsResponse():- Reject(line 72-77):调
notify()告知 agent - Approve(line 80-105):查
getApprovalHandler(approval.action)注册的 handler,传入{ session, payload, userId, notify }
- Reject(line 72-77):调
Phase 5: 应用修改
-
install_packageshandler —self-mod/apply.ts:22-83:- 去重后追加新 apt/npm 包到 DB 中已有列表(line 37-49)
buildAgentGroupImage()(line 57):构建 per-agent-group Docker 镜像(container-runner.ts:468-515),用docker build -t nanoclaw-agent:<agentGroupId>拉 900s 超时- 写
on_wake: 1消息告知 agent - Kill 容器 with
onExitcallback(line 72-75):killContainer(sessionId, 'rebuild applied', () => { wakeContainer(s) })—— 保证旧容器退出后新容器才 spawn - 重建失败(line 77-82):通知 admin,不 kill 容器
-
add_mcp_serverhandler —self-mod/apply.ts:85-125:- 添加 MCP server 到 DB 的
mcp_serversJSON(line 99-105) - 写
on_wake: 1消息(line 107-120) - Kill 容器 with
onExit→wakeContainercallback(line 121-124) - 不需要重建镜像 —— Bun 直接运行 TS,纯 MCP wiring 变动不需要 Dоcker 构建
- 添加 MCP server 到 DB 的
Phase 6: 新容器启动
onExitcallback 触发 →wakeContainer()→spawnContainer():- 从 DB 物化新
container.json(line 127) composeGroupClaudeMd()重新生成 CLAUDE.md(line 261)clearStaleProcessingAcks()清掉旧 processing ack(connection.ts:175-177)getPendingMessages(isFirstPoll=true)捡起on_wake: 1消息 —— 仅第一轮 poll 可见
- 从 DB 物化新
为什么 on_wake 是防竞态的
messages_in 表 on_wake 列 + getPendingMessages() 中 isFirstPoll 门控:第一轮 poll 包含 on_wake = 1 行,后续轮排除(它们已 completed)。结合 killContainer 的 onExit callback,旧容器绝无可能先于新容器偷走 on_wake 消息。
Q15: 定时任务(cron)怎么实现?
答案
定时任务实现为 messages_in 表中 kind='task' 的行,piggyback 在核心 schema 上,没有专用表。Agent 通过 MCP tool 创建任务,host 把它们写入 inbound.db,recurrence 由 host sweep hook 驱动:克隆已完成的周期性任务为新 pending 行。
创建任务
-
Agent 通过
schedule_taskMCP tool(container/agent-runner/src/mcp-tools/scheduling.ts)写kind: 'system'+action: 'schedule_task'的 outbound message,包含taskId、prompt、processAfter(首次运行 ISO 时间戳)、可选recurrence(cron 表达式) -
Host delivery 拾起 →
handleSystemAction()→ 注册的action: 'schedule_task'handler(scheduling/actions.ts:19-40):- 调
insertTask()(scheduling/db.ts:17-36):插入messages_in行,kind = 'task'、status = 'pending'、process_after = <首次运行时间>、recurrence = <cron-expr>、series_id = <taskId> - 内容存储为 JSON
{ prompt, script }
- 调
-
Agent 也可以创建非周期性调度消息:
schedule_message工具同理,但kind匹配原始消息类型
触发:Host sweep + countDueMessages
-
Host sweep 唤醒容器(
host-sweep.ts:180-186):countDueMessages(inDb)计数status = 'pending' AND process_after <= now的行dueCount > 0 && !isContainerRunning→wakeContainer(session)
-
Container 处理任务:
getPendingMessages()读 pending 行,包括 task 行,格式化后给 provider
Recurrence:host sweep + handleRecurrence
- Recurrence fanout(
scheduling/recurrence.ts:21-53),每 60s sweep 周期调用(host-sweep.ts:205-206):getCompletedRecurring(inDb)(scheduling/db.ts:122-126):找status = 'completed' AND recurrence IS NOT NULL的行- 对每个完成的行:
- 在用户时区(非 UTC)解析 cron 表达式
- 计算
nextRun = interval.next().toISOString() insertRecurrence()(scheduling/db.ts:128-149):复制原行,设置process_after = nextRun、status = 'pending'clearRecurrence()(line 151-153):设原行recurrence = NULL,防止下次周期重复克隆
其他任务生命周期操作
- Cancel/Pause/Resume(
actions.ts:42-70):cancelTask()(line 38-42):通过id OR series_id匹配,设所有pending/paused行为completedpauseTask()(line 44-48)、resumeTask()(line 50-54):同理- 用
series_id匹配意味着 agent 可以引用任意一次执行取消整个系列
Host sweep + recurrence 协作顺序
Host sweep(每 60s):
Step 1: syncProcessingAcks()
Step 2: countDueMessages() → 如果到期 + 容器不在运行 → wakeContainer()
Step 3: enforceRunningContainerSla() ← heartbeat/claim-stuck 检查
Step 4: resetStuckProcessingRows() ← 崩溃容器清理
Step 5: handleRecurrence() ← 扫描已完成周期性任务,克隆下次执行
关键顺序: Step 2(唤醒到期消息)在 Step 4(崩溃容器清理)之前运行,确保新容器有机会在启动时清自己的孤儿 processing_ack。
边界情况
- 没有专用表:任务是
messages_in行——核心messages_inschema 中kind字段足以区分 series_id:周期性任务的每次执行共享同一series_id。Cancel/pause/resume 用id OR series_id匹配,影响整个系列- 时区:Cron 表达式以用户配置的
TIMEZONE解析(来自.env),非 UTC - Pre-task scripts gating:Task 行可带
script字段。applyPreTaskScripts()hook(poll-loop.ts:149,323)先跑 script。如果返回wakeAgent: false,任务标记完成但不唤醒 agent——实现 "仅用户活跃时运行" 等条件 - Agent 不能写 inbound.db:Container 把 task 写成
kind: 'system'的 outbound message;host 的 delivery action handler 才是实际插入messages_in的组件——保持单 writer 不变式