v2 phase 5: scheduling fixes, media handling, command processing

- Host sweep: fix DELETE journal mode, busy_timeout, seq in recurrence INSERT
- Outbound files: delivery reads from outbox dir, passes buffers to adapter,
  cleans up after delivery. Chat SDK bridge sends files via postMessage.
- Inbound attachments: formatter includes attachment info in prompts
- Commands: categorize /commands as admin, filtered, or passthrough.
  Admin commands check sender against NANOCLAW_ADMIN_USER_ID.
  Filtered commands silently dropped. Passthrough sent raw to agent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-09 02:59:33 +03:00
parent afbc20a6c4
commit c348fabf22
7 changed files with 266 additions and 43 deletions

View File

@@ -34,10 +34,17 @@ export interface InboundMessage {
timestamp: string;
}
/** A file attachment to deliver alongside a message. */
export interface OutboundFile {
filename: string;
data: Buffer;
}
/** Outbound message from host to adapter. */
export interface OutboundMessage {
kind: string;
content: unknown; // parsed JSON from messages_out
files?: OutboundFile[]; // file attachments from the session outbox
}
/** Discovered conversation info (from syncConversations). */

View File

@@ -104,31 +104,33 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
if (gatewayAbort?.signal.aborted) return;
// Capture the long-running listener promise via waitUntil
let listenerPromise: Promise<unknown> | undefined;
adapter
.startGatewayListener!(
{ waitUntil: (p: Promise<unknown>) => { listenerPromise = p; } },
24 * 60 * 60 * 1000,
gatewayAbort!.signal,
)
.then(() => {
// startGatewayListener resolves immediately with a Response;
// the actual work is in the listenerPromise passed to waitUntil
if (listenerPromise) {
listenerPromise
.then(() => {
if (!gatewayAbort?.signal.aborted) {
log.info('Gateway listener expired, restarting', { adapter: adapter.name });
startGateway();
}
})
.catch((err) => {
if (!gatewayAbort?.signal.aborted) {
log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err });
setTimeout(startGateway, 5000);
}
});
}
});
adapter.startGatewayListener!(
{
waitUntil: (p: Promise<unknown>) => {
listenerPromise = p;
},
},
24 * 60 * 60 * 1000,
gatewayAbort!.signal,
).then(() => {
// startGatewayListener resolves immediately with a Response;
// the actual work is in the listenerPromise passed to waitUntil
if (listenerPromise) {
listenerPromise
.then(() => {
if (!gatewayAbort?.signal.aborted) {
log.info('Gateway listener expired, restarting', { adapter: adapter.name });
startGateway();
}
})
.catch((err) => {
if (!gatewayAbort?.signal.aborted) {
log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err });
setTimeout(startGateway, 5000);
}
});
}
});
};
startGateway();
log.info('Gateway listener started', { adapter: adapter.name });
@@ -156,7 +158,17 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
// Normal message
const text = (content.markdown as string) || (content.text as string);
if (text) {
await adapter.postMessage(tid, { markdown: text });
// Attach files if present (FileUpload format: { data, filename })
const fileUploads = message.files?.map((f) => ({ data: f.data, filename: f.filename }));
if (fileUploads && fileUploads.length > 0) {
await adapter.postMessage(tid, { markdown: text, files: fileUploads });
} else {
await adapter.postMessage(tid, { markdown: text });
}
} else if (message.files && message.files.length > 0) {
// Files only, no text
const fileUploads = message.files.map((f) => ({ data: f.data, filename: f.filename }));
await adapter.postMessage(tid, { markdown: '', files: fileUploads });
}
},

View File

@@ -3,12 +3,15 @@
* Polls active session DBs for undelivered messages_out, delivers through channel adapters.
*/
import Database from 'better-sqlite3';
import fs from 'fs';
import path from 'path';
import { getRunningSessions, getActiveSessions } from './db/sessions.js';
import { getAgentGroup } from './db/agent-groups.js';
import { log } from './log.js';
import { openSessionDb, sessionDbPath } from './session-manager.js';
import { openSessionDb, sessionDir } from './session-manager.js';
import { resetContainerIdleTimer } from './container-runner-v2.js';
import type { OutboundFile } from './channels/adapter.js';
import type { Session } from './types-v2.js';
const ACTIVE_POLL_MS = 1000;
@@ -21,6 +24,7 @@ export interface ChannelDeliveryAdapter {
threadId: string | null,
kind: string,
content: string,
files?: OutboundFile[],
): Promise<void>;
setTyping?(channelType: string, platformId: string, threadId: string | null): Promise<void>;
}
@@ -159,8 +163,29 @@ async function deliverMessage(
return;
}
await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content);
log.info('Message delivered', { id: msg.id, channelType: msg.channel_type, platformId: msg.platform_id });
// Read file attachments from outbox if the content declares files
let files: OutboundFile[] | undefined;
const outboxDir = path.join(sessionDir(session.agent_group_id, session.id), 'outbox', msg.id);
if (Array.isArray(content.files) && content.files.length > 0 && fs.existsSync(outboxDir)) {
files = [];
for (const filename of content.files as string[]) {
const filePath = path.join(outboxDir, filename);
if (fs.existsSync(filePath)) {
files.push({ filename, data: fs.readFileSync(filePath) });
} else {
log.warn('Outbox file not found', { messageId: msg.id, filename });
}
}
if (files.length === 0) files = undefined;
}
await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content, files);
log.info('Message delivered', { id: msg.id, channelType: msg.channel_type, platformId: msg.platform_id, fileCount: files?.length });
// Clean up outbox directory after successful delivery
if (fs.existsSync(outboxDir)) {
fs.rmSync(outboxDir, { recursive: true, force: true });
}
}
export function stopDeliveryPolls(): void {

View File

@@ -58,7 +58,8 @@ async function sweepSession(session: Session): Promise<void> {
let db: Database.Database;
try {
db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('journal_mode = DELETE');
db.pragma('busy_timeout = 5000');
} catch {
return;
}
@@ -125,10 +126,23 @@ async function sweepSession(session: Session): Promise<void> {
const nextRun = interval.next().toISOString();
const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
// Compute next seq from both tables (same pattern as session-manager.ts)
const nextSeq = (
db
.prepare(
`SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM (
SELECT seq FROM messages_in WHERE seq IS NOT NULL
UNION ALL
SELECT seq FROM messages_out WHERE seq IS NOT NULL
)`,
)
.get() as { next: number }
).next;
db.prepare(
`INSERT INTO messages_in (id, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content)
VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`,
).run(newId, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content);
`INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content)
VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`,
).run(newId, nextSeq, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content);
// Remove recurrence from the completed message so it doesn't spawn again
db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id);

View File

@@ -68,13 +68,13 @@ async function main(): Promise<void> {
// 4. Delivery adapter bridge — dispatches to channel adapters
setDeliveryAdapter({
async deliver(channelType, platformId, threadId, kind, content) {
async deliver(channelType, platformId, threadId, kind, content, files) {
const adapter = getChannelAdapter(channelType);
if (!adapter) {
log.warn('No adapter for channel type', { channelType });
return;
}
await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) });
await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files });
},
async setTyping(channelType, platformId, threadId) {
const adapter = getChannelAdapter(channelType);