Files
nanoclaw/src/index.ts
gavrielc c38e5b11a8 fix(channels): wire accumulate mode through the bridge
The router + session DB were already fully plumbed for
ignored_message_policy='accumulate' — fan-out in routeInbound calls
deliverToAgent(wake=false) for non-engaging agents on accumulate wirings,
writeSessionMessage writes trigger=0, countDueMessages filters trigger=1,
container formatter includes all messages regardless of trigger. But the
Chat SDK bridge dropped non-engaging messages before the router ever saw
them, so accumulate was dead on arrival for every adapter that goes
through the bridge.

Expose ignored_message_policy on ConversationConfig, project it in
buildConversationConfigs, and widen shouldEngage's "forward" decision to
"engage OR accumulate" with the union taken across all wirings on a
conversation. stickySubscribe stays gated on a real engage — subscribing
a thread we'd only silently accumulate on would misrepresent the bot's
presence.

shouldEngage return shape is now { forward, stickySubscribe } — engage
was an internal concept the caller never needed, and conflating it with
forward was the source of this bug.

7 new tests cover: non-engaging messages forwarding under accumulate,
mixed drop/accumulate wirings taking the union, accumulate not
triggering sticky subscribe, unknown-conversation drop precedence over
accumulate, and drop policy preserving existing behavior on engaging
messages.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 11:18:43 +03:00

198 lines
6.5 KiB
TypeScript

/**
* NanoClaw — main entry point.
*
* Thin orchestrator: init DB, run migrations, start channel adapters,
* start delivery polls, start sweep, handle shutdown.
*/
import path from 'path';
import { DATA_DIR } from './config.js';
import { initDb } from './db/connection.js';
import { runMigrations } from './db/migrations/index.js';
import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messaging-groups.js';
import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js';
import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js';
import { startHostSweep, stopHostSweep } from './host-sweep.js';
import { routeInbound } from './router.js';
import { log } from './log.js';
// Response + shutdown registries live in response-registry.ts to break the
// circular import cycle: src/index.ts imports src/modules/index.js for side
// effects, and the modules call registerResponseHandler/onShutdown at top
// level — which would hit a TDZ error if the arrays lived here. Re-exported
// here so existing callers see the same surface.
import {
registerResponseHandler,
getResponseHandlers,
onShutdown,
getShutdownCallbacks,
type ResponsePayload,
type ResponseHandler,
} from './response-registry.js';
export { registerResponseHandler, onShutdown };
export type { ResponsePayload, ResponseHandler };
async function dispatchResponse(payload: ResponsePayload): Promise<void> {
for (const handler of getResponseHandlers()) {
try {
const claimed = await handler(payload);
if (claimed) return;
} catch (err) {
log.error('Response handler threw', { questionId: payload.questionId, err });
}
}
log.warn('Unclaimed response', { questionId: payload.questionId, value: payload.value });
}
// Channel barrel — each enabled channel self-registers on import.
// Channel skills uncomment lines in channels/index.ts to enable them.
import './channels/index.js';
// Modules barrel — default modules (typing, mount-security) ship here; skills
// append registry-based modules. Imported for side effects (registrations).
import './modules/index.js';
import type { ChannelAdapter, ChannelSetup, ConversationConfig } from './channels/adapter.js';
import { initChannelAdapters, teardownChannelAdapters, getChannelAdapter } from './channels/channel-registry.js';
async function main(): Promise<void> {
log.info('NanoClaw starting');
// 1. Init central DB
const dbPath = path.join(DATA_DIR, 'v2.db');
const db = initDb(dbPath);
runMigrations(db);
log.info('Central DB ready', { path: dbPath });
// 2. Container runtime
ensureContainerRuntimeRunning();
cleanupOrphans();
// 3. Channel adapters
await initChannelAdapters((adapter: ChannelAdapter): ChannelSetup => {
const conversations = buildConversationConfigs(adapter.channelType);
return {
conversations,
onInbound(platformId, threadId, message) {
routeInbound({
channelType: adapter.channelType,
platformId,
threadId,
message: {
id: message.id,
kind: message.kind,
content: JSON.stringify(message.content),
timestamp: message.timestamp,
},
}).catch((err) => {
log.error('Failed to route inbound message', { channelType: adapter.channelType, err });
});
},
onMetadata(platformId, name, isGroup) {
log.info('Channel metadata discovered', {
channelType: adapter.channelType,
platformId,
name,
isGroup,
});
},
onAction(questionId, selectedOption, userId) {
dispatchResponse({
questionId,
value: selectedOption,
userId,
channelType: adapter.channelType,
// platformId/threadId aren't surfaced by the current onAction
// signature — registered handlers look them up from the
// pending_question / pending_approval row.
platformId: '',
threadId: null,
}).catch((err) => {
log.error('Failed to handle question response', { questionId, err });
});
},
};
});
// 4. Delivery adapter bridge — dispatches to channel adapters
const deliveryAdapter = {
async deliver(
channelType: string,
platformId: string,
threadId: string | null,
kind: string,
content: string,
files?: import('./channels/adapter.js').OutboundFile[],
): Promise<string | undefined> {
const adapter = getChannelAdapter(channelType);
if (!adapter) {
log.warn('No adapter for channel type', { channelType });
return;
}
return adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files });
},
async setTyping(channelType: string, platformId: string, threadId: string | null): Promise<void> {
const adapter = getChannelAdapter(channelType);
await adapter?.setTyping?.(platformId, threadId);
},
};
setDeliveryAdapter(deliveryAdapter);
// 5. Start delivery polls
startActiveDeliveryPoll();
startSweepDeliveryPoll();
log.info('Delivery polls started');
// 6. Start host sweep
startHostSweep();
log.info('Host sweep started');
log.info('NanoClaw running');
}
/** Build ConversationConfig[] for a channel type from the central DB. */
function buildConversationConfigs(channelType: string): ConversationConfig[] {
const groups = getMessagingGroupsByChannel(channelType);
const configs: ConversationConfig[] = [];
for (const mg of groups) {
const agents = getMessagingGroupAgents(mg.id);
for (const agent of agents) {
configs.push({
platformId: mg.platform_id,
agentGroupId: agent.agent_group_id,
engageMode: agent.engage_mode,
engagePattern: agent.engage_pattern,
ignoredMessagePolicy: agent.ignored_message_policy,
sessionMode: agent.session_mode,
});
}
}
return configs;
}
/** Graceful shutdown. */
async function shutdown(signal: string): Promise<void> {
log.info('Shutdown signal received', { signal });
for (const cb of getShutdownCallbacks()) {
try {
await cb();
} catch (err) {
log.error('Shutdown callback threw', { err });
}
}
stopDeliveryPolls();
stopHostSweep();
await teardownChannelAdapters();
process.exit(0);
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
main().catch((err) => {
log.fatal('Startup failed', { err });
process.exit(1);
});