From 9dda75bb21b1cd3b32f8bc894dfed5a639f19b19 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Sun, 12 Apr 2026 00:21:12 +0300 Subject: [PATCH] docs(v2): cross-mount invariants + diagrams; inline a2a routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - session-manager.ts: shrink the cross-mount invariant header from 31 lines to 12, keeping each invariant's cause and consequence inline. - agent-runner/db/connection.ts: parallel cross-mount comment for the container-side reader (inbound.db must be journal_mode=DELETE). - agent-runner/db/messages-out.ts: document that even/odd seq parity is load-bearing — seq is the agent-facing message ID returned by send_message and consumed by edit_message / add_reaction, looked up across both tables. - v2-checklist.md: record the cross-mount invariants and seq parity under Core Architecture so future "simplifications" don't regress them. - scripts/sanity-live-poll.ts: empirical validation harness for the three cross-mount invariants — flips each one and observes silent message loss / corruption. - delivery.ts: inline routeAgentMessage at its single callsite (-17 net lines). The wrapper added more boilerplate than it factored. - docs/v2-architecture-diagram.{md,html}: rendered Mermaid diagrams of the v2 system, message flow, named destinations, entity model, and the two-DB split. - channels/adapter.ts, chat-sdk-bridge.ts, credentials.ts, db/sessions.ts, db/db-v2.test.ts: prettier format pass. Co-Authored-By: Claude Opus 4.6 (1M context) --- container/agent-runner/src/db/connection.ts | 8 + container/agent-runner/src/db/messages-out.ts | 9 +- docs/v2-architecture-diagram.html | 406 ++++++++++++++++++ docs/v2-architecture-diagram.md | 200 +++++++++ docs/v2-checklist.md | 3 + scripts/sanity-live-poll.ts | 93 ++++ src/channels/adapter.ts | 4 +- src/channels/chat-sdk-bridge.ts | 5 +- src/credentials.ts | 18 +- src/db/db-v2.test.ts | 1 - src/db/sessions.ts | 5 +- src/delivery.ts | 91 ++-- src/session-manager.ts | 31 +- 13 files changed, 788 insertions(+), 86 deletions(-) create mode 100644 docs/v2-architecture-diagram.html create mode 100644 docs/v2-architecture-diagram.md create mode 100644 scripts/sanity-live-poll.ts diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 4036232..dcc5ce7 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -8,6 +8,14 @@ * outbound.db — container writes responses + acks here; host opens read-only * * Each file has exactly one writer, so no cross-process lock contention. + * + * ⚠ Cross-mount visibility: inbound.db MUST be journal_mode=DELETE (set by + * the host when the file is created). WAL's `-shm` is memory-mapped and + * VirtioFS does not propagate mmap coherency from host to guest, so a + * WAL-mode inbound.db would leave this reader frozen on an early snapshot + * and it would silently never see new host messages. See + * src/session-manager.ts for the full set of cross-mount invariants and + * scripts/sanity-live-poll.ts for the empirical validation. */ import Database from 'better-sqlite3'; import fs from 'fs'; diff --git a/container/agent-runner/src/db/messages-out.ts b/container/agent-runner/src/db/messages-out.ts index 2d03b37..743b2b8 100644 --- a/container/agent-runner/src/db/messages-out.ts +++ b/container/agent-runner/src/db/messages-out.ts @@ -34,8 +34,13 @@ export interface WriteMessageOut { /** * Write a new outbound message, auto-assigning an odd seq number. - * Container uses odd seq (1, 3, 5...), host uses even (2, 4, 6...) — - * this prevents seq collisions without cross-DB coordination. + * Container uses odd seq (1, 3, 5...), host uses even (2, 4, 6...). + * + * The disjoint namespace is load-bearing, not just collision avoidance: + * seq is the agent-facing message ID returned by send_message and accepted + * by edit_message / add_reaction, and getMessageIdBySeq() below looks up + * by seq across BOTH tables. If inbound and outbound could share a seq, + * the agent's "edit message #5" could resolve to the wrong row. */ export function writeMessageOut(msg: WriteMessageOut): number { const outbound = getOutboundDb(); diff --git a/docs/v2-architecture-diagram.html b/docs/v2-architecture-diagram.html new file mode 100644 index 0000000..652f933 --- /dev/null +++ b/docs/v2-architecture-diagram.html @@ -0,0 +1,406 @@ + + + + + + NanoClaw v2 Architecture + + + + +
+

NanoClaw v2 Architecture

+
Session-DB messaging model · Chat SDK bridge · OneCLI credential gateway · per-session containers
+ +
+ +
+
+

1System Overview

+

+ Inbound messages land at the Chat SDK bridge, which hands off to the + router. The router resolves the messaging group → agent group → session + and writes to the session's inbound.db. The container runner + spawns a per-session container (auth via OneCLI), and the agent-runner + polls its DB, calls Claude, and writes responses to outbound.db. + Delivery polls the outbound DB, re-validates destinations, and ships + messages back through the same bridge. +

+
+
+flowchart TB
+  subgraph Platforms["Messaging Platforms"]
+    P1[Discord]
+    P2[Telegram]
+    P3[Slack]
+    P4[GitHub / Linear]
+    P5[WhatsApp / iMessage / Teams / GChat / Matrix / Webex / Email]
+  end
+
+  subgraph Host["Host Process (Node)"]
+    direction TB
+    Bridge["Chat SDK Bridge
src/channels/chat-sdk-bridge.ts"] + Router["Router
src/router.ts
platformId + threadId → session"] + SessMgr["Session Manager
src/session-manager.ts"] + Runner["Container Runner
src/container-runner.ts
OneCLI ensureAgent + spawn"] + Delivery["Delivery Poller
src/delivery.ts
1s active / 60s sweep"] + Sweep["Host Sweep
src/host-sweep.ts"] + Central[("Central DB · data/v2.db
agent_groups · messaging_groups
messaging_group_agents · sessions
pending_approvals")] + end + + subgraph OneCLI["OneCLI Gateway (0.3.1)"] + Vault["Agent Vault
secrets + OAuth"] + Approvals["configureManualApproval"] + SecretsFacade["onecli-secrets.ts
credential collection"] + end + + subgraph Session["Per-Session Container"] + direction TB + PollLoop["Poll Loop
container/agent-runner"] + Provider["Claude Agent SDK
(codex / opencode planned)"] + MCP["MCP Tools
send_message · send_file · edit_message
send_card · ask_user_question · schedule_task
create_agent · install_packages · add_mcp_server
request_rebuild · trigger_credential_collection"] + InDB[("inbound.db
host writes · even seq")] + OutDB[("outbound.db
container writes · odd seq")] + end + + Folder["Agent Group FS
groups/*
CLAUDE.md · memory · skills"] + + P1 & P2 & P3 & P4 & P5 --> Bridge + Bridge --> Router + Router --> Central + Router --> SessMgr + SessMgr --> InDB + SessMgr --> Runner + Runner --> OneCLI + Runner --> PollLoop + PollLoop --> InDB + PollLoop --> Provider + Provider --> MCP + MCP --> OutDB + OutDB --> Delivery + Delivery --> Central + Delivery --> Bridge + Bridge --> P1 & P2 & P3 & P4 & P5 + Sweep --> InDB + Sweep --> OutDB + Sweep --> Central + Runner -.mounts.-> Folder + MCP -.approval.-> Approvals + Approvals --> Central + MCP -.credential req.-> SecretsFacade + SecretsFacade --> Vault + Provider -.API calls.-> Vault +
+
+
+ +
+

2Message Flow

+

+ End-to-end path of a single message. The host and container never write + to the same SQLite file — the split between inbound and outbound DBs is + what makes this lock-free under concurrent activity. +

+
+
+sequenceDiagram
+  participant P as Platform (Telegram)
+  participant B as Chat SDK Bridge
+  participant R as Router
+  participant SM as Session Manager
+  participant IDB as inbound.db
+  participant C as Container (agent-runner)
+  participant ODB as outbound.db
+  participant D as Delivery Poller
+
+  P->>B: new message
+  B->>R: routeInbound(platformId, threadId, msg)
+  R->>R: resolve messaging_group → agent_group → session
(agent-shared · shared · per-thread) + R->>SM: ensure session + DBs exist + R->>IDB: INSERT messages_in (even seq) + R->>C: wake container (spawn or signal) + C->>IDB: poll messages_in + C->>C: format xml → Claude SDK stream + C->>ODB: INSERT messages_out (odd seq)
parse <message to='name'> blocks + D->>ODB: 1s active poll / 60s sweep + D->>D: hasDestination() re-validate + D->>B: deliver via adapter + B->>P: send · edit · react · file · card +
+
+
+ +
+

3Named Destinations & Agent-to-Agent

+

+ Agents address outputs by local name. The host looks up each name against + the agent's destinations table at delivery time — dropping anything + unauthorized. The same table routes agent-to-agent messages to a sibling + agent's inbound.db with bidirectional permission rows. +

+
+
+flowchart LR
+  subgraph AgentA["Agent Group A (main)"]
+    A_out["<message to='slack'>...</message>
<message to='browser-agent'>...</message>
<internal>scratchpad</internal>"] + end + + subgraph Dests["inbound.db.destinations (per agent)"] + D1["slack → messaging_group 42"] + D2["browser-agent → agent_group 7
(bidirectional)"] + D3["github → messaging_group 13"] + end + + subgraph AgentB["Agent Group B (browser sub-agent)"] + B_session["own inbound.db / outbound.db
inherited destination back to A"] + end + + Slack[Slack] + GitHub[GitHub PR] + + A_out -->|parse + lookup| Dests + D1 -->|deliver| Slack + D2 -->|write to B's inbound.db| B_session + D3 -->|deliver| GitHub + B_session -.reply via 'parent'.-> Dests +
+
+
+ +
+

4Entity Model

+

+ Messaging groups and agent groups are many-to-many, joined via + messaging_group_agents. The session_mode + column selects one of three isolation levels. +

+
+
+erDiagram
+  agent_groups ||--o{ messaging_group_agents : wired
+  messaging_groups ||--o{ messaging_group_agents : wired
+  agent_groups ||--o{ sessions : runs
+  messaging_groups ||--o{ sessions : context
+  agent_groups ||--o{ agent_destinations : owns
+  agent_groups ||--o{ pending_approvals : requests
+
+  agent_groups {
+    int id
+    string name
+    string folder
+    bool is_admin
+    string agent_provider
+    json container_config
+  }
+  messaging_groups {
+    int id
+    string channel_type
+    string platform_id
+    string name
+    bool is_group
+    string admin_user_id
+  }
+  messaging_group_agents {
+    int messaging_group_id
+    int agent_group_id
+    string session_mode
+    json trigger_rules
+    int priority
+  }
+  sessions {
+    int id
+    int agent_group_id
+    int messaging_group_id
+    string sdk_session_id
+    string status
+  }
+
+
+ + + + + + + + + +
Levelsession_modeSharedExample
1 · Shared sessionagent-sharedWorkspace + memory + conversationSlack + GitHub webhooks in one thread
2 · Same agent, separate sessionsshared / per-threadWorkspace + memory onlyOne agent across 3 Telegram chats
3 · Separate agent groups— (different agent_group_id)NothingPersonal vs work channels
+
+ +
+

5Two-DB Split

+

+ Each SQLite file has exactly one writer. The container touches a + heartbeat file instead of UPDATE-ing a liveness row, so host + sweep can detect staleness via stat(mtime) without opening the + DB. Host uses even seq numbers, container uses odd — collision-free. +

+
+
+flowchart LR
+  subgraph Mount["/workspace (volume mount)"]
+    In[("inbound.db")]
+    Out[("outbound.db")]
+    HB["/.heartbeat (file touch)"]
+  end
+
+  Host[Host process] -->|writes · even seq| In
+  Host -->|reads| Out
+  Container[agent-runner] -->|reads| In
+  Container -->|writes · odd seq| Out
+  Container -->|touch every poll| HB
+  HostSweep[Host sweep] -->|stat mtime| HB
+  HostSweep -->|reads processing_ack| In
+
+
+
+ +
NanoClaw v2 · branch v2 · generated from docs/v2-checklist.md, v2-architecture-draft.md, v2-isolation-model.md, v2-setup-wiring.md
+
+ + + + diff --git a/docs/v2-architecture-diagram.md b/docs/v2-architecture-diagram.md new file mode 100644 index 0000000..846a612 --- /dev/null +++ b/docs/v2-architecture-diagram.md @@ -0,0 +1,200 @@ +# NanoClaw v2 Architecture Diagram + +## System Overview + +```mermaid +flowchart TB + subgraph Platforms["Messaging Platforms"] + P1[Discord] + P2[Telegram] + P3[Slack] + P4[GitHub / Linear] + P5[WhatsApp / iMessage / Teams / GChat / Matrix / Webex / Email] + end + + subgraph Host["Host Process (Node)"] + direction TB + Bridge["Chat SDK Bridge
(src/channels/chat-sdk-bridge.ts)"] + Router["Router
(src/router.ts)
platformId + threadId -> messaging_group -> agent_group -> session"] + SessMgr["Session Manager
(src/session-manager.ts)
creates inbound.db + outbound.db"] + Runner["Container Runner
(src/container-runner.ts)
OneCLI ensureAgent + spawn"] + Delivery["Delivery Poller
(src/delivery.ts)
1s active / 60s sweep"] + Sweep["Host Sweep
(src/host-sweep.ts)
heartbeat, retry, recurrence"] + Central[("Central DB
data/v2.db
agent_groups
messaging_groups
messaging_group_agents
sessions
pending_approvals")] + end + + subgraph OneCLI["OneCLI Gateway (0.3.1)"] + Vault["Agent Vault
secrets + OAuth"] + Approvals["configureManualApproval
-> pending_approvals"] + SecretsFacade["src/onecli-secrets.ts
credential collection"] + end + + subgraph Session["Per-Session Container (Docker / Apple Container)"] + direction TB + PollLoop["Poll Loop
(container/agent-runner)"] + Provider["Claude Agent SDK
(providers: claude, mock, todo: codex/opencode)"] + MCP["MCP Tools
send_message, send_file, edit_message,
add_reaction, send_card, ask_user_question,
schedule_task, create_agent,
install_packages, add_mcp_server, request_rebuild,
trigger_credential_collection"] + Skills["Container Skills
(container/skills/)"] + InDB[("inbound.db
host writes
even seq
messages_in
destinations
processing_ack")] + OutDB[("outbound.db
container writes
odd seq
messages_out
heartbeat file")] + end + + subgraph Groups["Agent Group Filesystem (groups/*)"] + Folder["CLAUDE.md
memory
per-group skills
container_config"] + end + + P1 & P2 & P3 & P4 & P5 --> Bridge + Bridge --> Router + Router --> Central + Router --> SessMgr + SessMgr --> InDB + SessMgr --> Runner + Runner --> OneCLI + Runner --> PollLoop + PollLoop --> InDB + PollLoop --> Provider + Provider --> MCP + Provider --> Skills + MCP --> OutDB + OutDB --> Delivery + Delivery --> Central + Delivery --> Bridge + Bridge --> P1 & P2 & P3 & P4 & P5 + Sweep --> InDB + Sweep --> OutDB + Sweep --> Central + Runner -.mounts.-> Folder + MCP -.approval.-> Approvals + Approvals --> Central + MCP -.credential req.-> SecretsFacade + SecretsFacade --> Vault + Provider -.API calls.-> Vault +``` + +## Message Flow (inbound -> agent -> outbound) + +```mermaid +sequenceDiagram + participant P as Platform (e.g. Telegram) + participant B as Chat SDK Bridge + participant R as Router + participant SM as Session Manager + participant IDB as inbound.db + participant C as Container (agent-runner) + participant ODB as outbound.db + participant D as Delivery Poller + + P->>B: new message + B->>R: routeInbound(platformId, threadId, msg) + R->>R: resolve messaging_group -> agent_group -> session
(agent-shared | shared | per-thread) + R->>SM: ensure session + DBs exist + R->>IDB: INSERT messages_in (even seq) + R->>C: wake container (docker run / already running) + C->>IDB: poll messages_in + C->>C: format xml, stream to Claude SDK + C->>ODB: INSERT messages_out (odd seq)
parse blocks + D->>ODB: 1s poll (active) / 60s (sweep) + D->>D: hasDestination() re-validate + D->>B: deliver via adapter + B->>P: send message / edit / react / file / card +``` + +## Named Destinations + Agent-to-Agent + +```mermaid +flowchart LR + subgraph AgentA["Agent Group A (main)"] + A_out["output:
<message to='slack'>...</message>
<message to='browser-agent'>...</message>
<internal>scratchpad</internal>"] + end + + subgraph Dests["inbound.db.destinations (per agent)"] + D1["slack -> messaging_group 42"] + D2["browser-agent -> agent_group 7
(bidirectional row)"] + D3["github -> messaging_group 13"] + end + + subgraph AgentB["Agent Group B (browser sub-agent)"] + B_session["own inbound.db / outbound.db
inherited destination back to A"] + end + + Slack[Slack channel] + GitHub[GitHub PR thread] + + A_out -->|parse + lookup| Dests + D1 -->|deliver| Slack + D2 -->|write to B's inbound.db| B_session + D3 -->|deliver| GitHub + B_session -.reply via 'parent'.-> Dests +``` + +## Entity Model + Isolation Levels + +```mermaid +erDiagram + agent_groups ||--o{ messaging_group_agents : wired + messaging_groups ||--o{ messaging_group_agents : wired + agent_groups ||--o{ sessions : runs + messaging_groups ||--o{ sessions : context + agent_groups ||--o{ agent_destinations : owns + agent_groups ||--o{ pending_approvals : requests + + agent_groups { + int id + string name + string folder + bool is_admin + string agent_provider + json container_config + } + messaging_groups { + int id + string channel_type + string platform_id + string name + bool is_group + string admin_user_id + } + messaging_group_agents { + int messaging_group_id + int agent_group_id + string session_mode "agent-shared | shared | per-thread" + json trigger_rules + int priority + } + sessions { + int id + int agent_group_id + int messaging_group_id + string sdk_session_id + string status + } +``` + +### Isolation Level Cheatsheet + +| Level | `session_mode` | What's shared | Example | +|---|---|---|---| +| 1. Shared session | `agent-shared` | Workspace + memory + conversation | Slack + GitHub webhooks in one thread | +| 2. Same agent, separate sessions | `shared` / `per-thread` | Workspace + memory only | One agent across 3 Telegram chats | +| 3. Separate agent groups | (different `agent_group_id`) | Nothing | Personal vs work channels | + +## Two-DB Split (why) + +```mermaid +flowchart LR + subgraph Mount["/workspace (volume mounted into container)"] + In[("inbound.db")] + Out[("outbound.db")] + HB["/.heartbeat (file touch)"] + end + + Host[Host process] -->|"writes only
(even seq)"| In + Host -->|reads| Out + Container[agent-runner] -->|reads| In + Container -->|"writes only
(odd seq)"| Out + Container -->|touch every poll| HB + HostSweep[Host sweep] -->|stat mtime| HB + HostSweep -->|reads processing_ack| In + + note1["Each file has exactly ONE writer.
Eliminates SQLite cross-process write contention.
Collision-free seq numbering."] +``` diff --git a/docs/v2-checklist.md b/docs/v2-checklist.md index d0aea55..86c8b45 100644 --- a/docs/v2-checklist.md +++ b/docs/v2-checklist.md @@ -8,6 +8,8 @@ Status: [x] done, [~] partial, [ ] not started - [x] Session DB replaces IPC (messages_in / messages_out as sole IO) - [x] Two-DB split: inbound.db (host-owned) + outbound.db (container-owned) — zero cross-process write contention + - **Cross-mount invariants (empirically validated, see `scripts/sanity-live-poll.ts`):** (1) `journal_mode=DELETE` on every session DB — WAL's `-shm` is memory-mapped and VirtioFS does not propagate mmap coherency host→guest, so WAL leaves the container's poll loop frozen on an early snapshot with no error; (2) host opens-writes-closes per operation — the close is what invalidates the container's VirtioFS page cache; (3) one writer per file — DELETE-mode with two writers corrupts because journal-unlink doesn't propagate atomically. Each invariant was individually confirmed by flipping it and observing silent message loss or corruption. Do not "simplify" by unifying the DBs, switching to WAL, or keeping a long-lived host connection. + - **Seq parity is load-bearing, not cleanup:** host writes even seqs, container writes odd seqs. The seq is the agent-facing message ID returned by `send_message` and consumed by `edit_message` / `add_reaction`, and `getMessageIdBySeq()` looks up by seq across both tables. Removing parity would let a single ID resolve to the wrong row. - [x] Central DB (agent groups, messaging groups, sessions, routing) - [x] Host sweep (stale detection via heartbeat file, retry with backoff, recurrence scheduling) - [x] Active delivery polling (1s for running sessions) @@ -166,6 +168,7 @@ Status: [x] done, [~] partial, [ ] not started - [~] Credential collection from chat — `trigger_credential_collection` MCP tool; agent researches API config, card → modal → `onecli secrets create` via internal facade (`src/onecli-secrets.ts`); credential value never enters agent context - [ ] Replace `src/onecli-secrets.ts` shell facade with SDK-native secret management when `@onecli-sh/sdk` adds it - [ ] Per-agent-group secret scoping via OneCLI `agentId` (facade passes it today; CLI ignores it until upstream supports) + - [ ] **Attach newly created secrets to the calling agent** — `trigger_credential_collection` today runs `onecli secrets create` but leaves the secret unassigned, so the agent that requested the credential still gets zero injections. Fix options: (a) follow-up `onecli agents set-secrets` call in `src/onecli-secrets.ts` after create, (b) set the agent to `mode=all`, or (c) upstream ask — `onecli secrets create --assign-to-agent-ids ` so it's a one-shot and orphaned secrets are impossible. Prefer (c); use (a) as the interim. - [ ] **Chat SDK input support beyond Slack (upstream ask)** — today only Slack's Modal surface works for secure input. The platforms themselves support it, but Chat SDK doesn't expose it: - [ ] **Discord** — native modal (`InteractionResponseType.Modal` with `ActionRow([TextInput])`). Map `event.openModal(Modal(...))` to the Discord REST callback. - [ ] **Microsoft Teams** — Adaptive Card with `Input.Text`, delivered as a regular message (inline, no modal-trigger needed). diff --git a/scripts/sanity-live-poll.ts b/scripts/sanity-live-poll.ts new file mode 100644 index 0000000..dad1d4e --- /dev/null +++ b/scripts/sanity-live-poll.ts @@ -0,0 +1,93 @@ +/** + * Cross-mount visibility regression test for the two-DB session architecture. + * + * What this catches: any change that breaks host→container write propagation + * across the Docker bind mount. The v2 session DB design relies on three + * invariants working together: + * + * 1. journal_mode = DELETE on every session DB (not WAL) + * 2. Host opens-writes-closes the DB file on every write + * 3. One writer per file (inbound = host, outbound = container) + * + * This script exercises a long-lived container-side reader polling a DB + * while the host writes. If visibility is working, the reader sees each + * write within one poll period. If any of the invariants regresses, the + * reader either sees nothing, sees only the first write, or sees updates + * only after the host closes its connection for good. + * + * Expected passing output (DELETE mode, close-per-write): + * reader sees each seq within ~1s of it being written. + * Anything else is a regression — investigate BEFORE assuming it's flaky. + * + * Keep this around. It ran for ~20 minutes once to map the failure modes + * and it takes about 60s to run — cheap insurance. + * + * Requires: Docker Desktop running, nanoclaw-agent:latest image built. + */ + +import { spawn, spawnSync } from "node:child_process"; +import { join } from "node:path"; +import { mkdirSync, rmSync } from "node:fs"; +import Database from "better-sqlite3"; + +const dbDir = join("/tmp", `nanoclaw-live-${Date.now()}`); +mkdirSync(dbDir, { recursive: true }); +spawnSync("chmod", ["777", dbDir]); +const dbPath = join(dbDir, "live.db"); + +for (const journalMode of ["DELETE", "WAL"]) { + console.log(`\n=== ${journalMode} ===`); + rmSync(dbPath, { force: true }); + rmSync(dbPath + "-wal", { force: true }); + rmSync(dbPath + "-shm", { force: true }); + rmSync(dbPath + "-journal", { force: true }); + + const db = new Database(dbPath); + db.pragma(`journal_mode = ${journalMode}`); + db.pragma("synchronous = FULL"); + db.exec("CREATE TABLE msgs (seq INTEGER PRIMARY KEY, content TEXT)"); + db.close(); + + // Start container poller in background + const contProc = spawn("docker", [ + "run", "--rm", "-w", "/app", + "-v", `${dbDir}:/workspace`, + "--entrypoint", "node", + "nanoclaw-agent:latest", + "-e", + `const Database = require('better-sqlite3'); + const db = new Database('/workspace/live.db', { readonly: true }); + db.pragma('busy_timeout = 2000'); + const stmt = db.prepare('SELECT COUNT(*) as n, MAX(seq) as hi FROM msgs'); + let count = 0; + const timer = setInterval(() => { + const r = stmt.get(); + console.log('poll t=' + (Date.now() % 100000) + ' count=' + r.n + ' max=' + r.hi); + if (++count >= 10) { clearInterval(timer); db.close(); } + }, 1000);`, + ], { stdio: ["ignore", "pipe", "pipe"] }); + + contProc.stdout.on("data", (d) => process.stdout.write(` [cont] ${d}`)); + contProc.stderr.on("data", (d) => process.stderr.write(` [cont-err] ${d}`)); + + // Give container a moment to start + const waitUntil = Date.now() + 2000; + while (Date.now() < waitUntil) {} + + // Host opens, writes, CLOSES each time (matches production session-manager pattern) + for (let i = 1; i <= 8; i++) { + const h = new Database(dbPath); + h.pragma(`journal_mode = ${journalMode}`); + h.pragma("synchronous = FULL"); + h.prepare("INSERT INTO msgs (seq, content) VALUES (?, ?)").run(i, `msg-${i}`); + h.close(); + console.log(` [host] wrote+closed seq=${i} t=${Date.now() % 100000}`); + const sleepUntil = Date.now() + 1000; + while (Date.now() < sleepUntil) {} + } + + // Wait for container to finish + await new Promise((res) => contProc.once("exit", () => res())); +} + +rmSync(dbDir, { recursive: true, force: true }); diff --git a/src/channels/adapter.ts b/src/channels/adapter.ts index 00e942d..4d18a0e 100644 --- a/src/channels/adapter.ts +++ b/src/channels/adapter.ts @@ -29,7 +29,9 @@ export interface ChannelSetup { onAction(questionId: string, selectedOption: string, userId: string): void; /** Credential collection hooks — used by chat-sdk-bridge to route the modal flow. */ - getCredentialForModal?(credentialId: string): { name: string; description: string | null; hostPattern: string } | null; + getCredentialForModal?( + credentialId: string, + ): { name: string; description: string | null; hostPattern: string } | null; onCredentialReject?(credentialId: string): void; onCredentialSubmit?(credentialId: string, value: string): void; onCredentialChannelUnsupported?(credentialId: string): void; diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index ab49adf..5cbf0c6 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -188,10 +188,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter } try { const modalChildren = [ - CardText( - pending.description ?? - `Enter the value for ${pending.name} (host: ${pending.hostPattern}).`, - ), + CardText(pending.description ?? `Enter the value for ${pending.name} (host: ${pending.hostPattern}).`), TextInput({ id: 'value', label: pending.name, diff --git a/src/credentials.ts b/src/credentials.ts index f4955c2..831ef3d 100644 --- a/src/credentials.ts +++ b/src/credentials.ts @@ -34,10 +34,7 @@ export function setCredentialDeliveryAdapter(adapter: ChannelDeliveryAdapter): v } /** Handle a `request_credential` system action from a container. */ -export async function handleCredentialRequest( - content: Record, - session: Session, -): Promise { +export async function handleCredentialRequest(content: Record, session: Session): Promise { if (!adapterRef) { notifyAgentCredentialResult(session, content.credentialId as string, 'failed', 'delivery adapter not ready'); return; @@ -53,12 +50,7 @@ export async function handleCredentialRequest( const description = (content.description as string) || null; if (!credentialId || !name || !hostPattern) { - notifyAgentCredentialResult( - session, - credentialId, - 'failed', - 'name and hostPattern are required', - ); + notifyAgentCredentialResult(session, credentialId, 'failed', 'name and hostPattern are required'); return; } @@ -299,11 +291,7 @@ function buildCardText(opts: { valueFormat: string | null; description: string | null; }): string { - const lines = [ - `🔑 Credential request: ${opts.name}`, - '', - `Host: \`${opts.hostPattern}\``, - ]; + const lines = [`🔑 Credential request: ${opts.name}`, '', `Host: \`${opts.hostPattern}\``]; if (opts.headerName) lines.push(`Header: \`${opts.headerName}\``); if (opts.valueFormat) lines.push(`Format: \`${opts.valueFormat}\``); if (opts.description) lines.push('', opts.description); diff --git a/src/db/db-v2.test.ts b/src/db/db-v2.test.ts index 095e4be..8e7f05d 100644 --- a/src/db/db-v2.test.ts +++ b/src/db/db-v2.test.ts @@ -57,7 +57,6 @@ describe('migrations', () => { // Running again should not throw runMigrations(db); }); - }); // ── Agent Groups ── diff --git a/src/db/sessions.ts b/src/db/sessions.ts index e3338d0..75aacd2 100644 --- a/src/db/sessions.ts +++ b/src/db/sessions.ts @@ -93,7 +93,10 @@ export function deletePendingQuestion(questionId: string): void { // ── Pending Approvals ── -export function createPendingApproval(pa: Partial & Pick): void { +export function createPendingApproval( + pa: Partial & + Pick, +): void { getDb() .prepare( `INSERT INTO pending_approvals diff --git a/src/delivery.ts b/src/delivery.ts index fdcf054..368b2f9 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -312,9 +312,44 @@ async function deliverMessage( return; } - // Agent-to-agent — route to target session (with permission check) + // Agent-to-agent — route to target session (with permission check). + // Permission is enforced via agent_destinations — the source agent must have + // a row for the target. Content is copied verbatim; the target's formatter + // will look up the source agent in its own local map to display a name. if (msg.channel_type === 'agent') { - await routeAgentMessage(msg, session); + const targetAgentGroupId = msg.platform_id; + if (!targetAgentGroupId) { + log.warn('Agent message missing target agent group ID', { id: msg.id }); + return; + } + if (!hasDestination(session.agent_group_id, 'agent', targetAgentGroupId)) { + log.warn('Unauthorized agent-to-agent message — dropping', { + source: session.agent_group_id, + target: targetAgentGroupId, + }); + return; + } + if (!getAgentGroup(targetAgentGroupId)) { + log.warn('Target agent group not found', { id: msg.id, targetAgentGroupId }); + return; + } + const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared'); + writeSessionMessage(targetAgentGroupId, targetSession.id, { + id: `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'chat', + timestamp: new Date().toISOString(), + platformId: session.agent_group_id, + channelType: 'agent', + threadId: null, + content: msg.content, + }); + log.info('Agent message routed', { + from: session.agent_group_id, + to: targetAgentGroupId, + targetSession: targetSession.id, + }); + const fresh = getSession(targetSession.id); + if (fresh) await wakeContainer(fresh); return; } @@ -393,58 +428,6 @@ async function deliverMessage( return platformMsgId; } -/** - * Route an agent-to-agent message to the target agent's session. - * - * Permission is enforced via agent_destinations — the source agent must have - * a row for the target. Content is copied verbatim; the target's formatter - * will look up the source agent in its own local map to display a name. - */ -async function routeAgentMessage( - msg: { id: string; platform_id: string | null; content: string }, - sourceSession: Session, -): Promise { - const targetAgentGroupId = msg.platform_id; - if (!targetAgentGroupId) { - log.warn('Agent message missing target agent group ID', { id: msg.id }); - return; - } - - if (!hasDestination(sourceSession.agent_group_id, 'agent', targetAgentGroupId)) { - log.warn('Unauthorized agent-to-agent message — dropping', { - source: sourceSession.agent_group_id, - target: targetAgentGroupId, - }); - return; - } - - if (!getAgentGroup(targetAgentGroupId)) { - log.warn('Target agent group not found', { id: msg.id, targetAgentGroupId }); - return; - } - - const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared'); - - writeSessionMessage(targetAgentGroupId, targetSession.id, { - id: `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, - kind: 'chat', - timestamp: new Date().toISOString(), - platformId: sourceSession.agent_group_id, - channelType: 'agent', - threadId: null, - content: msg.content, - }); - - log.info('Agent message routed', { - from: sourceSession.agent_group_id, - to: targetAgentGroupId, - targetSession: targetSession.id, - }); - - const fresh = getSession(targetSession.id); - if (fresh) await wakeContainer(fresh); -} - /** Ensure the delivered table has new columns (migration for existing sessions). */ function migrateDeliveredTable(db: Database.Database): void { const cols = new Set( diff --git a/src/session-manager.ts b/src/session-manager.ts index 1f1d227..aad0717 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -1,10 +1,14 @@ /** - * Session lifecycle management. - * Creates session folders + DBs, writes messages, manages container status. + * Session lifecycle: folders, DBs, messages, container status. * - * Two-DB architecture: each session has inbound.db (host-owned) and outbound.db - * (container-owned). This eliminates SQLite write contention across the - * host-container mount boundary — each file has exactly one writer. + * Two-DB split — inbound.db (host writes) + outbound.db (container writes). + * Three cross-mount invariants are load-bearing: + * 1. journal_mode=DELETE — WAL's mmapped -shm doesn't refresh host→guest; + * the container would silently miss every new message. + * 2. Host opens-writes-CLOSES per op — close invalidates the container's + * page cache; a long-lived connection freezes its view at first read. + * 3. One writer per file — DELETE-mode journal-unlink isn't atomic across + * the mount; concurrent writers corrupt the DB. */ import Database from 'better-sqlite3'; import fs from 'fs'; @@ -260,7 +264,13 @@ export function writeDestinations(agentGroupId: string, sessionId: string): void log.debug('Destination map written', { sessionId, count: resolved.length }); } -/** Write a message to a session's inbound DB (messages_in). Host-only. */ +/** + * Write a message to a session's inbound DB (messages_in). Host-only. + * + * ⚠ Opens and closes the DB on every call. Do not refactor to reuse a + * long-lived connection — see the "Cross-mount visibility invariants" note + * at the top of this file. + */ export function writeSessionMessage( agentGroupId: string, sessionId: string, @@ -285,8 +295,13 @@ export function writeSessionMessage( db.pragma('busy_timeout = 5000'); try { - // Host uses even seq numbers, container uses odd — prevents collisions - // across the two-DB boundary without cross-DB coordination. + // Host uses even seq, container uses odd. This is not just collision + // avoidance between the two DB files — the seq is the agent-facing + // message ID returned by send_message and accepted by edit_message / + // add_reaction, and those tools look up by seq across BOTH tables + // (see container/agent-runner/src/db/messages-out.ts:getMessageIdBySeq). + // So the {messages_in.seq, messages_out.seq} namespace MUST be disjoint, + // or the agent's "edit message #5" could resolve to the wrong row. const maxSeq = (db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); // next even