Files
nanoclaw/container/agent-runner/src/db/messages-in.ts
gavrielc e92b245399 feat(v2): OneCLI 0.3.1 — approvals, credential collection, threaded routing
Three features built on top of @onecli-sh/sdk 0.3.1, landed together because
they share wiring surfaces (session DB schema, delivery dispatcher, Chat SDK
bridge, channel adapter contract).

## OneCLI manual-approval handler

* `src/onecli-approvals.ts` — long-polls OneCLI via the SDK's
  `configureManualApproval`; on each request, delivers an `ask_question` card
  to the admin agent group's first messaging group, persists a
  `pending_approvals` row, and waits on an in-memory Promise resolved by the
  admin's button click or an expiry timer. Expired cards are edited to
  "Expired (...)" and a startup sweep flushes any rows left over from a
  previous process.
* Short 11-byte approval id (`oa-<8 base36>`) instead of the SDK's UUID so the
  Telegram 64-byte `callback_data` limit is respected; the OneCLI UUID stays
  in the persisted payload for audit.
* Migration 003 consolidated: `pending_approvals` now has the OneCLI-aware
  columns from the start (`agent_group_id`, `channel_type`, `platform_id`,
  `platform_message_id`, `expires_at`, `status`), `session_id` relaxed to
  nullable so cross-session approvals fit.
* `handleQuestionResponse` in `src/index.ts` now routes OneCLI approvals
  through `resolveOneCLIApproval` before falling back to the
  session-bound approval path.

## Credential collection from chat

New `trigger_credential_collection` MCP tool — the agent researches a
third-party API, calls the tool with `{name, hostPattern, headerName,
valueFormat, description}`, and blocks until the host reports saved, rejected,
or failed. The credential value never enters the agent's context: the user
submits it into a Chat SDK Modal on the host side, the host writes it to
OneCLI via a thin facade (`src/onecli-secrets.ts` — shells out to
`onecli secrets create`, shape mirrors the SDK we expect upstream), and only
the status string flows back to the container via a system message.

* `src/credentials.ts` — host-side handler: delivers the card to the
  conversation's own channel (not the admin channel — credential collection
  is a user-facing flow, distinct from admin approval), persists a
  `pending_credentials` row, drives the submit → `createSecret` → notify
  pipeline. Falls back gracefully when the channel doesn't support modals.
* `src/db/credentials.ts` + migration 005: `pending_credentials` table.
* `src/channels/chat-sdk-bridge.ts`: renders a `credential_request` card,
  handles the `nccr:` action prefix by opening a Modal with a TextInput,
  registers an `onModalSubmit` handler for the `nccm:` callback prefix.
* `container/agent-runner/src/mcp-tools/credentials.ts`: the blocking MCP
  tool, mirroring the `ask_user_question` polling pattern.
* `container/agent-runner/src/db/messages-in.ts`: `findCredentialResponse`
  helper to pick up the system message the host writes back.

## Threaded adapter routing

The destination layer previously didn't carry thread context, so agent replies
to Discord always landed in the root channel regardless of which thread the
inbound came from.

* `ChannelAdapter.supportsThreads: boolean` — declared by every channel skill
  at `createChatSdkBridge`. Threaded: Discord, Slack, Teams, Google Chat,
  Linear, GitHub, Webex. Non-threaded: Telegram, WhatsApp Cloud, Matrix,
  Resend, iMessage.
* `src/router.ts`: non-threaded adapters strip `threadId` at ingest (threads
  collapse to channel-level sessions). Threaded adapters override the
  wiring's `session_mode` to `'per-thread'` so each thread = a session
  (except `agent-shared`, which is preserved as a cross-channel intent the
  adapter can't know about).
* `session_routing` table in `inbound.db` — single-row default reply routing
  written by the host on every container wake from
  `session.messaging_group_id` + `session.thread_id`. Forward-compat
  `CREATE TABLE IF NOT EXISTS` handles older session DBs lazily.
* `container/agent-runner/src/db/session-routing.ts` — container-side reader.
* `send_message` / `send_file` / `ask_user_question` / `send_card` /
  scheduling tools all default their routing (channel, platform, **and**
  thread) from the session when no explicit `to` is given. Explicit `to`
  uses the destination's channel with `thread_id = null` (cross-destination
  sends start a new conversation elsewhere).
* `poll-loop.ts::sendToDestination` (the final-text single-destination
  shortcut) now inherits `thread_id` from `RoutingContext` too — this was
  the root cause of Discord replies landing in the root channel even after
  `send_message` was wired correctly.

## Related cleanups

* `src/container-runner.ts`: OneCLI agent identifier switched from the lossy
  folder-derived string to `agent_group.id`, making `getAgentGroup(externalId)`
  a trivial reverse lookup for per-agent scoping.
* `wakeContainer` race fix via an in-flight promise map — concurrent wakes
  during the async buildContainerArgs / OneCLI `applyContainerConfig` window
  no longer double-spawn containers against the same session directory.
* `src/db/db-v2.test.ts`: dropped the brittle `expect(row.v).toBe(N)` schema
  version assertion — it had to be bumped on every migration addition.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 17:18:21 +03:00

132 lines
4.3 KiB
TypeScript

/**
* Inbound message operations (container side).
*
* Reads from inbound.db (host-owned, opened read-only).
* Writes processing status to processing_ack in outbound.db (container-owned).
*
* The container never writes to inbound.db — all status tracking goes through
* processing_ack. The host reads processing_ack to sync message lifecycle.
*/
import { getInboundDb, getOutboundDb } from './connection.js';
export interface MessageInRow {
id: string;
seq: number | null;
kind: string;
timestamp: string;
status: string;
process_after: string | null;
recurrence: string | null;
tries: number;
platform_id: string | null;
channel_type: string | null;
thread_id: string | null;
content: string;
}
/**
* Fetch pending messages that are due for processing.
* Reads from inbound.db (read-only), filters against processing_ack in outbound.db
* to skip messages already picked up by this or a previous container run.
*/
export function getPendingMessages(): MessageInRow[] {
const inbound = getInboundDb();
const outbound = getOutboundDb();
const pending = inbound
.prepare(
`SELECT * FROM messages_in
WHERE status = 'pending'
AND (process_after IS NULL OR process_after <= datetime('now'))
ORDER BY timestamp ASC`,
)
.all() as MessageInRow[];
if (pending.length === 0) return [];
// Filter out messages already acknowledged in outbound.db
const ackedIds = new Set(
(outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map(
(r) => r.message_id,
),
);
return pending.filter((m) => !ackedIds.has(m.id));
}
/** Mark messages as processing — writes to processing_ack in outbound.db. */
export function markProcessing(ids: string[]): void {
if (ids.length === 0) return;
const db = getOutboundDb();
const stmt = db.prepare(
"INSERT OR REPLACE INTO processing_ack (message_id, status, status_changed) VALUES (?, 'processing', datetime('now'))",
);
db.transaction(() => {
for (const id of ids) stmt.run(id);
})();
}
/** Mark messages as completed — updates processing_ack in outbound.db. */
export function markCompleted(ids: string[]): void {
if (ids.length === 0) return;
const db = getOutboundDb();
const stmt = db.prepare(
"INSERT OR REPLACE INTO processing_ack (message_id, status, status_changed) VALUES (?, 'completed', datetime('now'))",
);
db.transaction(() => {
for (const id of ids) stmt.run(id);
})();
}
/** Mark a single message as failed — writes to processing_ack in outbound.db. */
export function markFailed(id: string): void {
getOutboundDb()
.prepare(
"INSERT OR REPLACE INTO processing_ack (message_id, status, status_changed) VALUES (?, 'failed', datetime('now'))",
)
.run(id);
}
/** Get a message by ID (read from inbound.db). */
export function getMessageIn(id: string): MessageInRow | undefined {
return getInboundDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined;
}
/**
* Find a pending response to a question (by questionId in content).
* Reads from inbound.db, checks processing_ack to skip already-handled responses.
*/
export function findQuestionResponse(questionId: string): MessageInRow | undefined {
const inbound = getInboundDb();
const outbound = getOutboundDb();
const response = inbound
.prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?")
.get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined;
if (!response) return undefined;
// Check it hasn't been acked already
const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id);
if (acked) return undefined;
return response;
}
/** Find a pending credential_response system message for a given credential id. */
export function findCredentialResponse(credentialId: string): MessageInRow | undefined {
const inbound = getInboundDb();
const outbound = getOutboundDb();
const response = inbound
.prepare("SELECT * FROM messages_in WHERE status = 'pending' AND kind = 'system' AND content LIKE ?")
.get(`%"credentialId":"${credentialId}"%`) as MessageInRow | undefined;
if (!response) return undefined;
const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id);
if (acked) return undefined;
return response;
}