v2: SQLite state adapter, admin commands, compact feedback

- Replace in-memory Chat SDK state with SqliteStateAdapter — thread
  subscriptions now persist across restarts
- Add migration 002 for chat_sdk_kv, subscriptions, locks, lists tables
- Handle /clear in agent-runner (reset sessionId) — SDK has
  supportsNonInteractive:false for this command
- Pass /compact, /context, /cost, /files through to SDK as admin commands
- Skip admin commands in follow-up poll so they start fresh queries
- Emit compact_boundary events as user-visible feedback messages
- Pass NANOCLAW_ADMIN_USER_ID and NANOCLAW_ASSISTANT_NAME to containers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-09 03:58:35 +03:00
parent c31bb02c06
commit 8a06b01646
10 changed files with 283 additions and 47 deletions

View File

@@ -9,7 +9,7 @@ import type { MessageInRow } from './db/messages-in.js';
*/
export type CommandCategory = 'admin' | 'filtered' | 'passthrough' | 'none';
const ADMIN_COMMANDS = new Set(['/remote-control', '/clear', '/compact']);
const ADMIN_COMMANDS = new Set(['/remote-control', '/clear', '/compact', '/context', '/cost', '/files']);
const FILTERED_COMMANDS = new Set(['/help', '/login', '/logout', '/doctor', '/config']);
export interface CommandInfo {

View File

@@ -81,7 +81,6 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
if (cmdInfo.category === 'admin') {
if (!adminUserId || cmdInfo.senderId !== adminUserId) {
// Not admin — send error, mark completed
log(`Admin command denied: ${cmdInfo.command} from ${cmdInfo.senderId} (msg: ${msg.id})`);
writeMessageOut({
id: generateId(),
@@ -94,7 +93,24 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
commandIds.push(msg.id);
continue;
}
// Admin user — format as system command
// Handle admin commands directly
if (cmdInfo.command === '/clear') {
log('Clearing session (resetting sessionId)');
sessionId = undefined;
resumeAt = undefined;
writeMessageOut({
id: generateId(),
kind: 'chat',
platform_id: routing.platformId,
channel_type: routing.channelType,
thread_id: routing.threadId,
content: JSON.stringify({ text: 'Session cleared.' }),
});
commandIds.push(msg.id);
continue;
}
// Other admin commands — pass through to agent
normalMessages.push(msg);
continue;
}
@@ -174,25 +190,16 @@ function formatMessagesWithCommands(messages: MessageInRow[]): string {
for (const msg of messages) {
if (msg.kind === 'chat' || msg.kind === 'chat-sdk') {
const cmdInfo = categorizeMessage(msg);
if (cmdInfo.category === 'passthrough') {
if (cmdInfo.category === 'passthrough' || cmdInfo.category === 'admin') {
// Flush normal batch first
if (normalBatch.length > 0) {
parts.push(formatMessages(normalBatch));
normalBatch.length = 0;
}
// Pass raw command text (no XML wrapping)
// Pass raw command text (no XML wrapping) — SDK handles it natively
parts.push(cmdInfo.text);
continue;
}
if (cmdInfo.category === 'admin') {
// Format admin command as a system command block
if (normalBatch.length > 0) {
parts.push(formatMessages(normalBatch));
normalBatch.length = 0;
}
parts.push(`[SYSTEM COMMAND: ${cmdInfo.command}]\n${cmdInfo.text}`);
continue;
}
}
normalBatch.push(msg);
}
@@ -218,8 +225,15 @@ async function processQuery(query: AgentQuery, routing: RoutingContext, config:
const pollHandle = setInterval(() => {
if (done) return;
// Skip system messages — they're responses for MCP tools (e.g., ask_user_question)
const newMessages = getPendingMessages().filter((m) => m.kind !== 'system');
// Skip system messages (MCP tool responses) and admin commands (need fresh query)
const newMessages = getPendingMessages().filter((m) => {
if (m.kind === 'system') return false;
if (m.kind === 'chat' || m.kind === 'chat-sdk') {
const cmd = categorizeMessage(m);
if (cmd.category === 'admin') return false;
}
return true;
});
if (newMessages.length > 0) {
const newIds = newMessages.map((m) => m.id);
markProcessing(newIds);

View File

@@ -212,6 +212,10 @@ export class ClaudeProvider implements AgentProvider {
yield { type: 'error', message: 'API retry', retryable: true };
} else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'rate_limit_event') {
yield { type: 'error', message: 'Rate limit', retryable: false, classification: 'quota' };
} else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'compact_boundary') {
const meta = (message as { compact_metadata?: { pre_tokens?: number } }).compact_metadata;
const detail = meta?.pre_tokens ? ` (${meta.pre_tokens.toLocaleString()} tokens compacted)` : '';
yield { type: 'result', text: `Context compacted${detail}.` };
} else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'task_notification') {
const tn = message as { summary?: string };
yield { type: 'progress', message: tn.summary || 'Task notification' };

View File

@@ -6,10 +6,18 @@
*/
import http from 'http';
import { Chat, Card, CardText, Actions, Button, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage } from 'chat';
import { createMemoryState } from '@chat-adapter/state-memory';
import {
Chat,
Card,
CardText,
Actions,
Button,
type Adapter,
type ConcurrencyStrategy,
type Message as ChatMessage,
} from 'chat';
import { log } from '../log.js';
import { SqliteStateAdapter } from '../state-sqlite.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js';
/** Adapter with optional gateway support (e.g., Discord). */
@@ -32,7 +40,7 @@ export interface ChatSdkBridgeConfig {
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
const { adapter } = config;
let chat: Chat;
let state: ReturnType<typeof createMemoryState>;
let state: SqliteStateAdapter;
let setupConfig: ChannelSetup;
let conversations: Map<string, ConversationConfig>;
let gatewayAbort: AbortController | null = null;
@@ -62,7 +70,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
setupConfig = hostConfig;
conversations = buildConversationMap(hostConfig.conversations);
state = createMemoryState();
state = new SqliteStateAdapter();
chat = new Chat({
adapters: { [adapter.name]: adapter },
@@ -105,14 +113,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
await chat.initialize();
// Subscribe registered conversations (after initialize connects state)
for (const conv of hostConfig.conversations) {
if (conv.agentGroupId) {
const threadId = adapter.encodeThreadId({ guildId: '', channelId: conv.platformId } as never);
await state.subscribe(threadId);
}
}
// Start Gateway listener for adapters that support it (e.g., Discord)
if (adapter.startGatewayListener) {
gatewayAbort = new AbortController();
@@ -184,11 +184,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
title: '❓ Question',
children: [
CardText(content.question as string),
Actions(
options.map((opt) =>
Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }),
),
),
Actions(options.map((opt) => Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }))),
],
});
await adapter.postMessage(tid, { card, fallbackText: `${content.question}\nOptions: ${options.join(', ')}` });
@@ -229,13 +225,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
updateConversations(configs: ConversationConfig[]) {
conversations = buildConversationMap(configs);
// Subscribe new conversations
for (const conv of configs) {
if (conv.agentGroupId) {
const threadId = adapter.encodeThreadId({ guildId: '', channelId: conv.platformId } as never);
state.subscribe(threadId).catch(() => {});
}
}
},
};
}
@@ -246,7 +235,11 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
* sends ALL raw events (including INTERACTION_CREATE for button clicks)
* to the webhookUrl, which we handle here.
*/
function startLocalWebhookServer(adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string): Promise<string> {
function startLocalWebhookServer(
adapter: GatewayAdapter,
setupConfig: ChannelSetup,
botToken?: string,
): Promise<string> {
return new Promise((resolve) => {
const server = http.createServer((req, res) => {
const chunks: Buffer[] = [];
@@ -275,7 +268,12 @@ function startLocalWebhookServer(adapter: GatewayAdapter, setupConfig: ChannelSe
});
}
async function handleForwardedEvent(body: string, adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string): Promise<void> {
async function handleForwardedEvent(
body: string,
adapter: GatewayAdapter,
setupConfig: ChannelSetup,
botToken?: string,
): Promise<void> {
let event: { type: string; data: Record<string, unknown> };
try {
event = JSON.parse(body);
@@ -305,7 +303,8 @@ async function handleForwardedEvent(body: string, adapter: GatewayAdapter, setup
}
// Update the card to show the selected answer and remove buttons
const originalEmbeds = ((interaction.message as Record<string, unknown>)?.embeds as Array<Record<string, unknown>>) || [];
const originalEmbeds =
((interaction.message as Record<string, unknown>)?.embeds as Array<Record<string, unknown>>) || [];
const originalDescription = (originalEmbeds[0]?.description as string) || '';
try {
await fetch(`https://discord.com/api/v10/interactions/${interactionId}/${interactionToken}/callback`, {

View File

@@ -12,6 +12,7 @@ import { OneCLI } from '@onecli-sh/sdk';
import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZONE } from './config.js';
import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js';
import { getAgentGroup } from './db/agent-groups.js';
import { getMessagingGroup } from './db/messaging-groups.js';
import { log } from './log.js';
import { validateAdditionalMounts } from './mount-security.js';
import {
@@ -227,6 +228,17 @@ async function buildContainerArgs(
args.push('-e', `AGENT_PROVIDER=${session.agent_provider || agentGroup.agent_provider || 'claude'}`);
args.push('-e', `SESSION_DB_PATH=/workspace/session.db`);
// Pass admin user ID and assistant name from messaging group/agent group
if (session.messaging_group_id) {
const mg = getMessagingGroup(session.messaging_group_id);
if (mg?.admin_user_id) {
args.push('-e', `NANOCLAW_ADMIN_USER_ID=${mg.admin_user_id}`);
}
}
if (agentGroup.name) {
args.push('-e', `NANOCLAW_ASSISTANT_NAME=${agentGroup.name}`);
}
// OneCLI gateway
const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier });
if (onecliApplied) {

View File

@@ -62,7 +62,7 @@ describe('migrations', () => {
const db = initTestDb();
runMigrations(db);
const row = db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number };
expect(row.v).toBe(1);
expect(row.v).toBe(2);
});
});

View File

@@ -0,0 +1,36 @@
import type Database from 'better-sqlite3';
import type { Migration } from './index.js';
export const migration002: Migration = {
version: 2,
name: 'chat-sdk-state',
up(db: Database.Database) {
db.exec(`
CREATE TABLE chat_sdk_kv (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
expires_at INTEGER
);
CREATE TABLE chat_sdk_subscriptions (
thread_id TEXT PRIMARY KEY,
subscribed_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE chat_sdk_locks (
thread_id TEXT PRIMARY KEY,
token TEXT NOT NULL,
expires_at INTEGER NOT NULL
);
CREATE TABLE chat_sdk_lists (
key TEXT NOT NULL,
idx INTEGER NOT NULL,
value TEXT NOT NULL,
expires_at INTEGER,
PRIMARY KEY (key, idx)
);
`);
},
};

View File

@@ -2,6 +2,7 @@ import type Database from 'better-sqlite3';
import { log } from '../../log.js';
import { migration001 } from './001-initial.js';
import { migration002 } from './002-chat-sdk-state.js';
export interface Migration {
version: number;
@@ -9,7 +10,7 @@ export interface Migration {
up: (db: Database.Database) => void;
}
const migrations: Migration[] = [migration001];
const migrations: Migration[] = [migration001, migration002];
export function runMigrations(db: Database.Database): void {
db.exec(`

View File

@@ -142,7 +142,17 @@ async function sweepSession(session: Session): Promise<void> {
db.prepare(
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content)
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`,
).run(newId, nextSeq, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content);
).run(
newId,
nextSeq,
msg.kind,
nextRun,
msg.recurrence,
msg.platform_id,
msg.channel_type,
msg.thread_id,
msg.content,
);
// Remove recurrence from the completed message so it doesn't spawn again
db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id);

160
src/state-sqlite.ts Normal file
View File

@@ -0,0 +1,160 @@
/**
* Chat SDK StateAdapter backed by SQLite.
* Persists subscriptions, locks, KV, and lists across restarts.
*
* Ported from feat/chat-sdk-integration branch.
*/
import crypto from 'crypto';
import type Database from 'better-sqlite3';
import type { StateAdapter, QueueEntry } from 'chat';
import { getDb } from './db/connection.js';
interface Lock {
threadId: string;
token: string;
expiresAt: number;
}
export class SqliteStateAdapter implements StateAdapter {
private db!: Database.Database;
async connect(): Promise<void> {
this.db = getDb();
this.cleanup();
}
async disconnect(): Promise<void> {}
// --- Key-value ---
async get<T = unknown>(key: string): Promise<T | null> {
this.cleanup();
const row = this.db
.prepare('SELECT value, expires_at FROM chat_sdk_kv WHERE key = ?')
.get(key) as { value: string; expires_at: number | null } | undefined;
if (!row) return null;
if (row.expires_at && row.expires_at < Date.now()) {
this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key);
return null;
}
return JSON.parse(row.value) as T;
}
async set<T = unknown>(key: string, value: T, ttlMs?: number): Promise<void> {
const expiresAt = ttlMs ? Date.now() + ttlMs : null;
this.db.prepare('INSERT OR REPLACE INTO chat_sdk_kv (key, value, expires_at) VALUES (?, ?, ?)').run(key, JSON.stringify(value), expiresAt);
}
async setIfNotExists(key: string, value: unknown, ttlMs?: number): Promise<boolean> {
const existing = this.db.prepare('SELECT expires_at FROM chat_sdk_kv WHERE key = ?').get(key) as { expires_at: number | null } | undefined;
if (existing?.expires_at && existing.expires_at < Date.now()) {
this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key);
}
const expiresAt = ttlMs ? Date.now() + ttlMs : null;
const result = this.db.prepare('INSERT OR IGNORE INTO chat_sdk_kv (key, value, expires_at) VALUES (?, ?, ?)').run(key, JSON.stringify(value), expiresAt);
return result.changes > 0;
}
async delete(key: string): Promise<void> {
this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key);
}
// --- Subscriptions ---
async subscribe(threadId: string): Promise<void> {
this.db.prepare('INSERT OR REPLACE INTO chat_sdk_subscriptions (thread_id) VALUES (?)').run(threadId);
}
async unsubscribe(threadId: string): Promise<void> {
this.db.prepare('DELETE FROM chat_sdk_subscriptions WHERE thread_id = ?').run(threadId);
}
async isSubscribed(threadId: string): Promise<boolean> {
const row = this.db.prepare('SELECT 1 FROM chat_sdk_subscriptions WHERE thread_id = ? LIMIT 1').get(threadId);
return !!row;
}
// --- Locks ---
async acquireLock(threadId: string, ttlMs: number): Promise<Lock | null> {
const now = Date.now();
const token = crypto.randomUUID();
const expiresAt = now + ttlMs;
this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ? AND expires_at < ?').run(threadId, now);
const result = this.db.prepare('INSERT OR IGNORE INTO chat_sdk_locks (thread_id, token, expires_at) VALUES (?, ?, ?)').run(threadId, token, expiresAt);
if (result.changes === 0) return null;
return { threadId, token, expiresAt };
}
async releaseLock(lock: Lock): Promise<void> {
this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ? AND token = ?').run(lock.threadId, lock.token);
}
async extendLock(lock: Lock, ttlMs: number): Promise<boolean> {
const newExpiry = Date.now() + ttlMs;
const result = this.db.prepare('UPDATE chat_sdk_locks SET expires_at = ? WHERE thread_id = ? AND token = ?').run(newExpiry, lock.threadId, lock.token);
if (result.changes > 0) {
lock.expiresAt = newExpiry;
return true;
}
return false;
}
async forceReleaseLock(threadId: string): Promise<void> {
this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ?').run(threadId);
}
// --- Lists ---
async appendToList(key: string, value: unknown, options?: { maxLength?: number; ttlMs?: number }): Promise<void> {
const expiresAt = options?.ttlMs ? Date.now() + options.ttlMs : null;
const maxRow = this.db.prepare('SELECT MAX(idx) as maxIdx FROM chat_sdk_lists WHERE key = ?').get(key) as { maxIdx: number | null } | undefined;
const nextIdx = (maxRow?.maxIdx ?? -1) + 1;
this.db.prepare('INSERT INTO chat_sdk_lists (key, idx, value, expires_at) VALUES (?, ?, ?, ?)').run(key, nextIdx, JSON.stringify(value), expiresAt);
if (options?.maxLength) {
const cutoff = nextIdx - options.maxLength;
if (cutoff >= 0) {
this.db.prepare('DELETE FROM chat_sdk_lists WHERE key = ? AND idx <= ?').run(key, cutoff);
}
}
}
async getList<T = unknown>(key: string): Promise<T[]> {
const now = Date.now();
const rows = this.db.prepare('SELECT value FROM chat_sdk_lists WHERE key = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY idx ASC').all(key, now) as { value: string }[];
return rows.map((r) => JSON.parse(r.value) as T);
}
// --- Queue ---
async enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise<number> {
const key = `queue:${threadId}`;
await this.appendToList(key, entry, { maxLength: maxSize });
return await this.queueDepth(threadId);
}
async dequeue(threadId: string): Promise<QueueEntry | null> {
const key = `queue:${threadId}`;
const row = this.db.prepare('SELECT idx, value FROM chat_sdk_lists WHERE key = ? ORDER BY idx ASC LIMIT 1').get(key) as { idx: number; value: string } | undefined;
if (!row) return null;
this.db.prepare('DELETE FROM chat_sdk_lists WHERE key = ? AND idx = ?').run(key, row.idx);
return JSON.parse(row.value) as QueueEntry;
}
async queueDepth(threadId: string): Promise<number> {
const key = `queue:${threadId}`;
const row = this.db.prepare('SELECT COUNT(*) as count FROM chat_sdk_lists WHERE key = ?').get(key) as { count: number };
return row.count;
}
// --- Cleanup ---
private cleanup(): void {
const now = Date.now();
this.db.prepare('DELETE FROM chat_sdk_kv WHERE expires_at IS NOT NULL AND expires_at < ?').run(now);
this.db.prepare('DELETE FROM chat_sdk_locks WHERE expires_at < ?').run(now);
this.db.prepare('DELETE FROM chat_sdk_lists WHERE expires_at IS NOT NULL AND expires_at < ?').run(now);
}
}