510 lines
20 KiB
TypeScript
510 lines
20 KiB
TypeScript
import { findByName, type DestinationEntry } from './destinations.js';
|
|
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
|
|
import { writeMessageOut } from './db/messages-out.js';
|
|
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
|
import {
|
|
clearContinuation,
|
|
migrateLegacyContinuation,
|
|
setContinuation,
|
|
} from './db/session-state.js';
|
|
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, isRunnerCommand, stripInternalTags, type RoutingContext } from './formatter.js';
|
|
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
|
|
|
|
const POLL_INTERVAL_MS = 1000;
|
|
const ACTIVE_POLL_INTERVAL_MS = 500;
|
|
|
|
function log(msg: string): void {
|
|
console.error(`[poll-loop] ${msg}`);
|
|
}
|
|
|
|
function generateId(): string {
|
|
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
|
}
|
|
|
|
export interface PollLoopConfig {
|
|
provider: AgentProvider;
|
|
/**
|
|
* Name of the provider (e.g. "claude", "codex", "opencode"). Used to key
|
|
* the stored continuation per-provider so flipping providers doesn't
|
|
* resurrect a stale id from a different backend.
|
|
*/
|
|
providerName: string;
|
|
cwd: string;
|
|
systemContext?: {
|
|
instructions?: string;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Main poll loop. Runs indefinitely until the process is killed.
|
|
*
|
|
* 1. Poll messages_in for pending rows
|
|
* 2. Format into prompt, call provider.query()
|
|
* 3. While query active: continue polling, push new messages via provider.push()
|
|
* 4. On result: write messages_out
|
|
* 5. Mark messages completed
|
|
* 6. Loop
|
|
*/
|
|
export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
|
// Resume the agent's prior session from a previous container run if one
|
|
// was persisted. The continuation is opaque to the poll-loop — the
|
|
// provider decides how to use it (Claude resumes a .jsonl transcript,
|
|
// other providers may reload a thread ID, etc.). Keyed per-provider so
|
|
// a Codex thread id never gets handed to Claude or vice versa.
|
|
let continuation: string | undefined = migrateLegacyContinuation(config.providerName);
|
|
|
|
if (continuation) {
|
|
log(`Resuming agent session ${continuation}`);
|
|
}
|
|
|
|
// Clear leftover 'processing' acks from a previous crashed container.
|
|
// This lets the new container re-process those messages.
|
|
clearStaleProcessingAcks();
|
|
|
|
let pollCount = 0;
|
|
while (true) {
|
|
// Skip system messages — they're responses for MCP tools (e.g., ask_user_question)
|
|
const messages = getPendingMessages().filter((m) => m.kind !== 'system');
|
|
pollCount++;
|
|
|
|
// Periodic heartbeat so we know the loop is alive
|
|
if (pollCount % 30 === 0) {
|
|
log(`Poll heartbeat (${pollCount} iterations, ${messages.length} pending)`);
|
|
}
|
|
|
|
if (messages.length === 0) {
|
|
await sleep(POLL_INTERVAL_MS);
|
|
continue;
|
|
}
|
|
|
|
// Accumulate gate: if the batch contains only trigger=0 rows
|
|
// (context-only, router-stored under ignored_message_policy='accumulate'),
|
|
// don't wake the agent. Leave them `pending` — they'll ride along the
|
|
// next time a real trigger=1 message lands via this same getPendingMessages
|
|
// query. Without this gate, a warm container keeps processing
|
|
// (and potentially responding to) every accumulate-only batch, defeating
|
|
// the "store as context, don't engage" contract. Host-side countDueMessages
|
|
// gates the same way for wake-from-cold (see src/db/session-db.ts).
|
|
if (!messages.some((m) => m.trigger === 1)) {
|
|
await sleep(POLL_INTERVAL_MS);
|
|
continue;
|
|
}
|
|
|
|
const ids = messages.map((m) => m.id);
|
|
markProcessing(ids);
|
|
|
|
const routing = extractRouting(messages);
|
|
|
|
// Command handling: the host router gates filtered and unauthorized
|
|
// admin commands before they reach the container. The only command
|
|
// the runner handles directly is /clear (session reset).
|
|
const normalMessages: MessageInRow[] = [];
|
|
const commandIds: string[] = [];
|
|
|
|
for (const msg of messages) {
|
|
if ((msg.kind === 'chat' || msg.kind === 'chat-sdk') && isClearCommand(msg)) {
|
|
log('Clearing session (resetting continuation)');
|
|
continuation = undefined;
|
|
clearContinuation(config.providerName);
|
|
writeMessageOut({
|
|
id: generateId(),
|
|
kind: 'chat',
|
|
platform_id: routing.platformId,
|
|
channel_type: routing.channelType,
|
|
thread_id: routing.threadId,
|
|
content: JSON.stringify({ text: 'Session cleared.' }),
|
|
});
|
|
commandIds.push(msg.id);
|
|
continue;
|
|
}
|
|
normalMessages.push(msg);
|
|
}
|
|
|
|
if (commandIds.length > 0) {
|
|
markCompleted(commandIds);
|
|
}
|
|
|
|
if (normalMessages.length === 0) {
|
|
const remainingIds = ids.filter((id) => !commandIds.includes(id));
|
|
if (remainingIds.length > 0) markCompleted(remainingIds);
|
|
log(`All ${messages.length} message(s) were commands, skipping query`);
|
|
continue;
|
|
}
|
|
|
|
// Pre-task scripts: for any task rows with a `script`, run it before the
|
|
// provider call. Scripts returning wakeAgent=false (or erroring) gate
|
|
// their own task row only — surviving messages still go to the agent.
|
|
// Without the scheduling module, the marker block is empty, `keep`
|
|
// falls back to `normalMessages`, and no gating happens.
|
|
let keep: MessageInRow[] = normalMessages;
|
|
let skipped: string[] = [];
|
|
// MODULE-HOOK:scheduling-pre-task:start
|
|
const { applyPreTaskScripts } = await import('./scheduling/task-script.js');
|
|
const preTask = await applyPreTaskScripts(normalMessages);
|
|
keep = preTask.keep;
|
|
skipped = preTask.skipped;
|
|
if (skipped.length > 0) {
|
|
markCompleted(skipped);
|
|
log(`Pre-task script skipped ${skipped.length} task(s): ${skipped.join(', ')}`);
|
|
}
|
|
// MODULE-HOOK:scheduling-pre-task:end
|
|
|
|
if (keep.length === 0) {
|
|
log(`All ${normalMessages.length} non-command message(s) gated by script, skipping query`);
|
|
continue;
|
|
}
|
|
|
|
// Format messages: passthrough commands get raw text (only if the
|
|
// provider natively handles slash commands), others get XML.
|
|
const prompt = formatMessagesWithCommands(keep, config.provider.supportsNativeSlashCommands);
|
|
|
|
log(`Processing ${keep.length} message(s), kinds: ${[...new Set(keep.map((m) => m.kind))].join(',')}`);
|
|
|
|
const query = config.provider.query({
|
|
prompt,
|
|
continuation,
|
|
cwd: config.cwd,
|
|
systemContext: config.systemContext,
|
|
});
|
|
|
|
// Process the query while concurrently polling for new messages
|
|
const skippedSet = new Set(skipped);
|
|
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
|
try {
|
|
const result = await processQuery(query, routing, processingIds, config.providerName);
|
|
if (result.continuation && result.continuation !== continuation) {
|
|
continuation = result.continuation;
|
|
setContinuation(config.providerName, continuation);
|
|
}
|
|
} catch (err) {
|
|
const errMsg = err instanceof Error ? err.message : String(err);
|
|
log(`Query error: ${errMsg}`);
|
|
|
|
// Stale/corrupt continuation recovery: ask the provider whether
|
|
// this error means the stored continuation is unusable, and clear
|
|
// it so the next attempt starts fresh.
|
|
if (continuation && config.provider.isSessionInvalid(err)) {
|
|
log(`Stale session detected (${continuation}) — clearing for next retry`);
|
|
continuation = undefined;
|
|
clearContinuation(config.providerName);
|
|
}
|
|
|
|
// Write error response so the user knows something went wrong
|
|
writeMessageOut({
|
|
id: generateId(),
|
|
kind: 'chat',
|
|
platform_id: routing.platformId,
|
|
channel_type: routing.channelType,
|
|
thread_id: routing.threadId,
|
|
content: JSON.stringify({ text: `Error: ${errMsg}` }),
|
|
});
|
|
}
|
|
|
|
// Ensure completed even if processQuery ended without a result event
|
|
// (e.g. stream closed unexpectedly).
|
|
markCompleted(processingIds);
|
|
log(`Completed ${ids.length} message(s)`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Format messages, handling passthrough commands differently.
|
|
* When the provider handles slash commands natively (Claude Code),
|
|
* passthrough commands are sent raw (no XML wrapping) so the SDK can
|
|
* dispatch them. Otherwise they fall through to standard XML formatting.
|
|
*/
|
|
function formatMessagesWithCommands(messages: MessageInRow[], nativeSlashCommands: boolean): string {
|
|
const parts: string[] = [];
|
|
const normalBatch: MessageInRow[] = [];
|
|
|
|
for (const msg of messages) {
|
|
if (nativeSlashCommands && (msg.kind === 'chat' || msg.kind === 'chat-sdk')) {
|
|
const cmdInfo = categorizeMessage(msg);
|
|
if (cmdInfo.category === 'passthrough' || cmdInfo.category === 'admin') {
|
|
// Flush normal batch first
|
|
if (normalBatch.length > 0) {
|
|
parts.push(formatMessages(normalBatch));
|
|
normalBatch.length = 0;
|
|
}
|
|
// Pass raw command text (no XML wrapping) — SDK handles it natively
|
|
parts.push(cmdInfo.text);
|
|
continue;
|
|
}
|
|
}
|
|
normalBatch.push(msg);
|
|
}
|
|
|
|
if (normalBatch.length > 0) {
|
|
parts.push(formatMessages(normalBatch));
|
|
}
|
|
|
|
return parts.join('\n\n');
|
|
}
|
|
|
|
interface QueryResult {
|
|
continuation?: string;
|
|
}
|
|
|
|
async function processQuery(
|
|
query: AgentQuery,
|
|
routing: RoutingContext,
|
|
initialBatchIds: string[],
|
|
providerName: string,
|
|
): Promise<QueryResult> {
|
|
let queryContinuation: string | undefined;
|
|
let done = false;
|
|
|
|
// Concurrent polling: push follow-ups into the active query as they arrive.
|
|
// We do NOT force-end the stream on silence — keeping the query open avoids
|
|
// re-spawning the SDK subprocess (~few seconds) and re-loading the .jsonl
|
|
// transcript on every turn. The Anthropic prompt cache is server-side with
|
|
// a 5-min TTL keyed on prefix hash, so stream lifecycle does NOT affect
|
|
// cache lifetime — close+reopen within 5 min still gets cache hits.
|
|
// Stream liveness is decided host-side via the heartbeat file + processing
|
|
// claim age (see src/host-sweep.ts); if something is truly stuck, the host
|
|
// will kill the container and messages get reset to pending.
|
|
let pollInFlight = false;
|
|
let endedForCommand = false;
|
|
const pollHandle = setInterval(() => {
|
|
if (done || pollInFlight || endedForCommand) return;
|
|
pollInFlight = true;
|
|
|
|
void (async () => {
|
|
try {
|
|
const pending = getPendingMessages();
|
|
|
|
// Slash commands need a fresh query: /clear resets the SDK's
|
|
// resume id (fixed at sdkQuery() time); admin/passthrough commands
|
|
// (/compact, /cost, …) only dispatch when they're the first input
|
|
// of a query — pushed mid-stream they arrive as plain text and
|
|
// the SDK never runs them. End the stream and leave the rows
|
|
// pending; the outer loop handles them on next iteration via the
|
|
// canonical command path + formatMessagesWithCommands.
|
|
if (pending.some((m) => isRunnerCommand(m))) {
|
|
log('Pending slash command — ending stream so outer loop can process');
|
|
endedForCommand = true;
|
|
query.end();
|
|
return;
|
|
}
|
|
|
|
// Skip system messages (MCP tool responses).
|
|
// Thread routing is the router's concern — if a message landed in this
|
|
// session, the agent should see it. Per-thread sessions already isolate
|
|
// threads into separate containers; shared sessions intentionally merge
|
|
// everything. Filtering on thread_id here caused deadlocks when the
|
|
// initial batch and follow-ups had mismatched thread_ids (e.g. a
|
|
// host-generated welcome trigger with null thread vs a Discord DM reply).
|
|
const newMessages = pending.filter((m) => m.kind !== 'system');
|
|
if (newMessages.length === 0) return;
|
|
|
|
const newIds = newMessages.map((m) => m.id);
|
|
markProcessing(newIds);
|
|
|
|
// Run pre-task scripts on follow-ups too — without this, a task that
|
|
// arrives during an active query (e.g. a */10 monitoring cron) bypasses
|
|
// its script gate and always wakes the agent, defeating the gate.
|
|
// Mirrors the initial-batch hook above.
|
|
let keep = newMessages;
|
|
let skipped: string[] = [];
|
|
// MODULE-HOOK:scheduling-pre-task-followup:start
|
|
const { applyPreTaskScripts } = await import('./scheduling/task-script.js');
|
|
const preTask = await applyPreTaskScripts(newMessages);
|
|
keep = preTask.keep;
|
|
skipped = preTask.skipped;
|
|
if (skipped.length > 0) {
|
|
markCompleted(skipped);
|
|
log(`Pre-task script skipped ${skipped.length} follow-up task(s): ${skipped.join(', ')}`);
|
|
}
|
|
// MODULE-HOOK:scheduling-pre-task-followup:end
|
|
|
|
if (keep.length === 0) return;
|
|
// Re-check done — the outer query may have finished while the script
|
|
// was awaited. Pushing into a closed stream is wasted work; the
|
|
// claimed messages get released by the host's processing-claim sweep.
|
|
if (done) return;
|
|
|
|
const keptIds = keep.map((m) => m.id);
|
|
const prompt = formatMessages(keep);
|
|
log(`Pushing ${keep.length} follow-up message(s) into active query`);
|
|
query.push(prompt);
|
|
markCompleted(keptIds);
|
|
} catch (err) {
|
|
// Without this catch the rejection escapes the void IIFE and Node
|
|
// terminates the container on unhandled-rejection. The initial-batch
|
|
// path is wrapped by processQuery's outer try/catch; the follow-up
|
|
// path is not, so it needs its own.
|
|
const errMsg = err instanceof Error ? err.message : String(err);
|
|
log(`Follow-up poll error: ${errMsg}`);
|
|
} finally {
|
|
pollInFlight = false;
|
|
}
|
|
})();
|
|
}, ACTIVE_POLL_INTERVAL_MS);
|
|
|
|
try {
|
|
for await (const event of query.events) {
|
|
handleEvent(event, routing);
|
|
touchHeartbeat();
|
|
|
|
if (event.type === 'init') {
|
|
queryContinuation = event.continuation;
|
|
// Persist immediately so a mid-turn container crash still lets the
|
|
// next wake resume the conversation. Without this, the session id
|
|
// was only written after the full stream completed — if the
|
|
// container died between `init` and `result`, the SDK session was
|
|
// effectively orphaned and the next message started a blank
|
|
// Claude session with no prior context.
|
|
setContinuation(providerName, event.continuation);
|
|
} else if (event.type === 'result') {
|
|
// A result — with or without text — means the turn is done. Mark
|
|
// the initial batch completed now so the host sweep doesn't see
|
|
// stale 'processing' claims while the query stays open for
|
|
// follow-up pushes. The agent may have responded via MCP
|
|
// (send_message) mid-turn, or the message may not need a response
|
|
// at all — either way the turn is finished.
|
|
markCompleted(initialBatchIds);
|
|
if (event.text) {
|
|
dispatchResultText(event.text, routing);
|
|
}
|
|
} else if (event.type === 'compacted') {
|
|
// The SDK auto-compacted the conversation. After compaction the
|
|
// model often drops the learned `<message to="…">` wrapping
|
|
// discipline (the destinations are still in the system prompt,
|
|
// but the behavioral pattern is summarized away). Inject a
|
|
// reminder back into the live query so the next turn re-anchors
|
|
// on the destination model. Only do this when there's >1
|
|
// destination — single-destination groups have a fallback that
|
|
// works without wrapping. See qwibitai/nanoclaw#2325.
|
|
const destinations = getAllDestinations();
|
|
if (destinations.length > 1) {
|
|
const names = destinations.map((d) => d.name).join(', ');
|
|
query.push(
|
|
`[system] Context was just compacted. Reminder: you have ${destinations.length} destinations (${names}). ` +
|
|
`Use <message to="name"> blocks to address them. Bare text goes to the scratchpad fallback only.`,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
done = true;
|
|
clearInterval(pollHandle);
|
|
}
|
|
|
|
return { continuation: queryContinuation };
|
|
}
|
|
|
|
function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
|
|
switch (event.type) {
|
|
case 'init':
|
|
log(`Session: ${event.continuation}`);
|
|
break;
|
|
case 'result':
|
|
log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`);
|
|
break;
|
|
case 'error':
|
|
log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`);
|
|
break;
|
|
case 'progress':
|
|
log(`Progress: ${event.message}`);
|
|
break;
|
|
case 'compacted':
|
|
log(`Compacted: ${event.text}`);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Parse the agent's final text for <message to="name">...</message> blocks
|
|
* and dispatch each one to its resolved destination. Text outside of blocks
|
|
* (including <internal>...</internal>) is scratchpad — logged but not sent.
|
|
*
|
|
* The agent must always wrap output in <message to="name">...</message>
|
|
* blocks, even with a single destination. Bare text is scratchpad only.
|
|
*/
|
|
function dispatchResultText(text: string, routing: RoutingContext): void {
|
|
const MESSAGE_RE = /<message\s+to="([^"]+)"\s*>([\s\S]*?)<\/message>/g;
|
|
|
|
let match: RegExpExecArray | null;
|
|
let sent = 0;
|
|
let lastIndex = 0;
|
|
const scratchpadParts: string[] = [];
|
|
|
|
while ((match = MESSAGE_RE.exec(text)) !== null) {
|
|
if (match.index > lastIndex) {
|
|
scratchpadParts.push(text.slice(lastIndex, match.index));
|
|
}
|
|
const toName = match[1];
|
|
const body = match[2].trim();
|
|
lastIndex = MESSAGE_RE.lastIndex;
|
|
|
|
const dest = findByName(toName);
|
|
if (!dest) {
|
|
log(`Unknown destination in <message to="${toName}">, dropping block`);
|
|
scratchpadParts.push(`[dropped: unknown destination "${toName}"] ${body}`);
|
|
continue;
|
|
}
|
|
sendToDestination(dest, body, routing);
|
|
sent++;
|
|
}
|
|
if (lastIndex < text.length) {
|
|
scratchpadParts.push(text.slice(lastIndex));
|
|
}
|
|
|
|
const scratchpad = stripInternalTags(scratchpadParts.join(''));
|
|
|
|
if (scratchpad) {
|
|
log(`[scratchpad] ${scratchpad.slice(0, 500)}${scratchpad.length > 500 ? '…' : ''}`);
|
|
}
|
|
|
|
if (sent === 0 && text.trim()) {
|
|
log(`WARNING: agent output had no <message to="..."> blocks — nothing was sent`);
|
|
}
|
|
}
|
|
|
|
function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void {
|
|
const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!;
|
|
const channelType = dest.type === 'channel' ? dest.channelType! : 'agent';
|
|
// Resolve thread_id per-destination from the most recent inbound message
|
|
// that came from this same channel+platform. In agent-shared sessions,
|
|
// different destinations have different thread contexts — using a single
|
|
// routing.threadId would stamp one channel's thread onto another.
|
|
const destRouting = resolveDestinationThread(channelType, platformId);
|
|
writeMessageOut({
|
|
id: generateId(),
|
|
in_reply_to: destRouting?.inReplyTo ?? routing.inReplyTo,
|
|
kind: 'chat',
|
|
platform_id: platformId,
|
|
channel_type: channelType,
|
|
thread_id: destRouting?.threadId ?? null,
|
|
content: JSON.stringify({ text: body }),
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Find the thread_id and message id from the most recent inbound message
|
|
* matching the given channel+platform. Returns null if no match found.
|
|
*/
|
|
function resolveDestinationThread(
|
|
channelType: string,
|
|
platformId: string,
|
|
): { threadId: string | null; inReplyTo: string | null } | null {
|
|
try {
|
|
const db = getInboundDb();
|
|
const row = db
|
|
.prepare(
|
|
`SELECT thread_id, id FROM messages_in
|
|
WHERE channel_type = ? AND platform_id = ?
|
|
ORDER BY seq DESC LIMIT 1`,
|
|
)
|
|
.get(channelType, platformId) as { thread_id: string | null; id: string } | undefined;
|
|
if (row) return { threadId: row.thread_id, inReplyTo: row.id };
|
|
} catch (err) {
|
|
log(`resolveDestinationThread error: ${err instanceof Error ? err.message : String(err)}`);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function sleep(ms: number): Promise<void> {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|