/** * Dashboard pusher — collects NanoClaw state and POSTs a JSON * snapshot to the dashboard's /api/ingest endpoint every interval. */ import fs from 'fs'; import path from 'path'; import http from 'http'; import Database from 'better-sqlite3'; import { getAllAgentGroups, getAgentGroup } from './db/agent-groups.js'; import { getSessionsByAgentGroup } from './db/sessions.js'; import { getAllMessagingGroups, getMessagingGroupAgents } from './db/messaging-groups.js'; import { getDestinations } from './db/agent-destinations.js'; import { getMembers } from './db/agent-group-members.js'; import { getAllUsers, getUser } from './db/users.js'; import { getUserRoles, getAdminsOfAgentGroup } from './db/user-roles.js'; import { getUserDmsForUser } from './db/user-dms.js'; import { getActiveAdapters, getRegisteredChannelNames } from './channels/channel-registry.js'; import { DATA_DIR, ASSISTANT_NAME } from './config.js'; import { getDb } from './db/connection.js'; import { log } from './log.js'; interface PusherConfig { port: number; secret: string; intervalMs?: number; } let timer: ReturnType | null = null; let logTimer: ReturnType | null = null; let logOffset = 0; export function startDashboardPusher(config: PusherConfig): void { const interval = config.intervalMs || 60000; // Push immediately on start, then on interval push(config).catch((err) => log.error('Dashboard push failed', { err })); timer = setInterval(() => { push(config).catch((err) => log.error('Dashboard push failed', { err })); }, interval); // Start log file tailing startLogTail(config); log.info('Dashboard pusher started', { intervalMs: interval }); } export function stopDashboardPusher(): void { if (timer) { clearInterval(timer); timer = null; } if (logTimer) { clearInterval(logTimer); logTimer = null; } } /** Fire-and-forget POST to the dashboard. */ function postJson(config: PusherConfig, urlPath: string, data: unknown): void { const body = JSON.stringify(data); const req = http.request({ hostname: '127.0.0.1', port: config.port, path: urlPath, method: 'POST', headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body), Authorization: `Bearer ${config.secret}`, }, }); req.on('error', () => {}); req.write(body); req.end(); } const ANSI_RE = /\x1b\[[0-9;]*m/g; function startLogTail(config: PusherConfig): void { const logFile = path.resolve(process.cwd(), 'logs', 'nanoclaw.log'); if (!fs.existsSync(logFile)) return; // Send last 200 lines as backfill try { const allLines = fs.readFileSync(logFile, 'utf-8').split('\n').filter((l) => l.trim()); logOffset = fs.statSync(logFile).size; const tail = allLines.slice(-200).map((l) => l.replace(ANSI_RE, '')); if (tail.length > 0) postJson(config, '/api/logs/push', { lines: tail }); } catch { return; } // Poll every 2s for new lines logTimer = setInterval(() => { try { const stat = fs.statSync(logFile); if (stat.size <= logOffset) { logOffset = stat.size; return; } const buf = Buffer.alloc(stat.size - logOffset); const fd = fs.openSync(logFile, 'r'); fs.readSync(fd, buf, 0, buf.length, logOffset); fs.closeSync(fd); logOffset = stat.size; const lines = buf.toString().split('\n').filter((l) => l.trim()).map((l) => l.replace(ANSI_RE, '')); if (lines.length > 0) postJson(config, '/api/logs/push', { lines }); } catch { /* ignore */ } }, 2000); } async function push(config: PusherConfig): Promise { const snapshot = collectSnapshot(); postJson(config, '/api/ingest', snapshot); log.debug('Dashboard snapshot pushed'); } function collectSnapshot(): Record { return { timestamp: new Date().toISOString(), assistant_name: ASSISTANT_NAME, uptime: Math.floor(process.uptime()), agent_groups: collectAgentGroups(), sessions: collectSessions(), channels: collectChannels(), users: collectUsers(), tokens: collectTokens(), context_windows: collectContextWindows(), activity: collectActivity(), messages: collectMessages(), }; } function collectAgentGroups() { return getAllAgentGroups().map((g) => { const sessions = getSessionsByAgentGroup(g.id); const running = sessions.filter((s) => s.container_status === 'running' || s.container_status === 'idle'); const destinations = getDestinations(g.id); const members = getMembers(g.id).map((m) => { const user = getUser(m.user_id); return { ...m, display_name: user?.display_name ?? null }; }); const admins = getAdminsOfAgentGroup(g.id).map((a) => { const user = getUser(a.user_id); return { ...a, display_name: user?.display_name ?? null }; }); // Wirings const db = getDb(); const wirings = db .prepare( `SELECT mga.*, mg.channel_type, mg.platform_id, mg.name as mg_name, mg.is_group, mg.unknown_sender_policy FROM messaging_group_agents mga JOIN messaging_groups mg ON mg.id = mga.messaging_group_id WHERE mga.agent_group_id = ?`, ) .all(g.id) as Array>; return { id: g.id, name: g.name, folder: g.folder, agent_provider: g.agent_provider, container_config: g.container_config ? JSON.parse(g.container_config) : null, sessionCount: sessions.length, runningSessions: running.length, wirings, destinations, members, admins, created_at: g.created_at, }; }); } function collectSessions() { const db = getDb(); return db .prepare( `SELECT s.*, ag.name as agent_group_name, ag.folder as agent_group_folder, mg.channel_type, mg.platform_id, mg.name as messaging_group_name FROM sessions s LEFT JOIN agent_groups ag ON ag.id = s.agent_group_id LEFT JOIN messaging_groups mg ON mg.id = s.messaging_group_id ORDER BY s.last_active DESC NULLS LAST`, ) .all() as Array>; } function collectChannels() { const messagingGroups = getAllMessagingGroups(); const liveAdapters = getActiveAdapters().map((a) => a.channelType); const registeredChannels = getRegisteredChannelNames(); const byType: Record = {}; for (const mg of messagingGroups) { if (!byType[mg.channel_type]) { byType[mg.channel_type] = { channelType: mg.channel_type, isLive: liveAdapters.includes(mg.channel_type), isRegistered: registeredChannels.includes(mg.channel_type), groups: [], }; } const agents = getMessagingGroupAgents(mg.id).map((a) => { const group = getAgentGroup(a.agent_group_id); return { agent_group_id: a.agent_group_id, agent_group_name: group?.name ?? null, priority: a.priority }; }); byType[mg.channel_type].groups.push({ messagingGroup: { id: mg.id, platform_id: mg.platform_id, name: mg.name, is_group: mg.is_group, unknown_sender_policy: (mg as unknown as Record).unknown_sender_policy ?? 'strict', }, agents, }); } // Include live adapters with no messaging groups for (const ct of liveAdapters) { if (!byType[ct]) { byType[ct] = { channelType: ct, isLive: true, isRegistered: true, groups: [] }; } } return Object.values(byType).sort((a, b) => a.channelType.localeCompare(b.channelType)); } function collectUsers() { return getAllUsers().map((u) => { const roles = getUserRoles(u.id); const dms = getUserDmsForUser(u.id); const db = getDb(); const memberships = db .prepare( `SELECT agm.agent_group_id, ag.name as agent_group_name FROM agent_group_members agm JOIN agent_groups ag ON ag.id = agm.agent_group_id WHERE agm.user_id = ?`, ) .all(u.id) as Array>; let privilege = 'none'; if (roles.some((r) => r.role === 'owner')) privilege = 'owner'; else if (roles.some((r) => r.role === 'admin' && !r.agent_group_id)) privilege = 'global_admin'; else if (roles.some((r) => r.role === 'admin')) privilege = 'admin'; else if (memberships.length > 0) privilege = 'member'; return { id: u.id, kind: u.kind, display_name: u.display_name, privilege, roles, memberships, dmChannels: dms.map((d) => ({ channel_type: d.channel_type })), created_at: u.created_at, }; }); } function collectTokens() { const sessionsDir = path.join(DATA_DIR, 'v2-sessions'); const allEntries: Array<{ model: string; inputTokens: number; outputTokens: number; cacheReadTokens: number; cacheCreationTokens: number; agentGroupId: string }> = []; const agentGroups = getAllAgentGroups(); const nameMap = new Map(agentGroups.map((g) => [g.id, g.name])); if (fs.existsSync(sessionsDir)) { for (const agDir of fs.readdirSync(sessionsDir).filter((d) => d.startsWith('ag-'))) { const entries = scanJsonlTokens(path.join(sessionsDir, agDir)); allEntries.push(...entries.map((e) => ({ ...e, agentGroupId: agDir }))); } } const byModel: Record = {}; const byGroup: Record = {}; const totals = { requests: 0, inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheCreationTokens: 0 }; for (const e of allEntries) { if (!byModel[e.model]) byModel[e.model] = { requests: 0, inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheCreationTokens: 0 }; byModel[e.model].requests++; byModel[e.model].inputTokens += e.inputTokens; byModel[e.model].outputTokens += e.outputTokens; byModel[e.model].cacheReadTokens += e.cacheReadTokens; byModel[e.model].cacheCreationTokens += e.cacheCreationTokens; if (!byGroup[e.agentGroupId]) byGroup[e.agentGroupId] = { requests: 0, inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheCreationTokens: 0, name: nameMap.get(e.agentGroupId) || e.agentGroupId }; byGroup[e.agentGroupId].requests++; byGroup[e.agentGroupId].inputTokens += e.inputTokens; byGroup[e.agentGroupId].outputTokens += e.outputTokens; byGroup[e.agentGroupId].cacheReadTokens += e.cacheReadTokens; byGroup[e.agentGroupId].cacheCreationTokens += e.cacheCreationTokens; totals.requests++; totals.inputTokens += e.inputTokens; totals.outputTokens += e.outputTokens; totals.cacheReadTokens += e.cacheReadTokens; totals.cacheCreationTokens += e.cacheCreationTokens; } return { totals, byModel, byGroup }; } function scanJsonlTokens(agentDir: string) { const claudeDir = path.join(agentDir, '.claude-shared', 'projects'); if (!fs.existsSync(claudeDir)) return []; const entries: Array<{ model: string; inputTokens: number; outputTokens: number; cacheReadTokens: number; cacheCreationTokens: number }> = []; const walk = (dir: string): void => { try { for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { const full = path.join(dir, entry.name); if (entry.isDirectory()) walk(full); else if (entry.name.endsWith('.jsonl')) { try { for (const line of fs.readFileSync(full, 'utf-8').split('\n')) { if (!line.trim()) continue; try { const r = JSON.parse(line); if (r.type === 'assistant' && r.message?.usage) { const u = r.message.usage; entries.push({ model: r.message.model || 'unknown', inputTokens: u.input_tokens || 0, outputTokens: u.output_tokens || 0, cacheReadTokens: u.cache_read_input_tokens || 0, cacheCreationTokens: u.cache_creation_input_tokens || 0, }); } } catch { /* skip line */ } } } catch { /* skip file */ } } } } catch { /* skip dir */ } }; walk(claudeDir); return entries; } function collectContextWindows() { const sessionsDir = path.join(DATA_DIR, 'v2-sessions'); if (!fs.existsSync(sessionsDir)) return []; const results: unknown[] = []; const agentGroups = getAllAgentGroups(); const nameMap = new Map(agentGroups.map((g) => [g.id, g.name])); for (const agDir of fs.readdirSync(sessionsDir).filter((d) => d.startsWith('ag-'))) { const claudeDir = path.join(sessionsDir, agDir, '.claude-shared', 'projects'); if (!fs.existsSync(claudeDir)) continue; // Find most recent JSONL const jsonlFiles: string[] = []; const walk = (dir: string): void => { try { for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { const full = path.join(dir, entry.name); if (entry.isDirectory()) walk(full); else if (entry.name.endsWith('.jsonl')) jsonlFiles.push(full); } } catch { /* skip */ } }; walk(claudeDir); if (jsonlFiles.length === 0) continue; jsonlFiles.sort((a, b) => { try { return fs.statSync(b).mtimeMs - fs.statSync(a).mtimeMs; } catch { return 0; } }); // Read last assistant turn from newest file const content = fs.readFileSync(jsonlFiles[0], 'utf-8'); const lines = content.split('\n'); for (let i = lines.length - 1; i >= 0; i--) { if (!lines[i].trim()) continue; try { const r = JSON.parse(lines[i]); if (r.type === 'assistant' && r.message?.usage) { const u = r.message.usage; const model = r.message.model || 'unknown'; const ctx = (u.input_tokens || 0) + (u.cache_read_input_tokens || 0) + (u.cache_creation_input_tokens || 0); const max = 200000; results.push({ agentGroupId: agDir, agentGroupName: nameMap.get(agDir), sessionId: path.basename(jsonlFiles[0], '.jsonl'), model, contextTokens: ctx, outputTokens: u.output_tokens || 0, cacheReadTokens: u.cache_read_input_tokens || 0, cacheCreationTokens: u.cache_creation_input_tokens || 0, maxContext: max, usagePercent: max > 0 ? Math.round((ctx / max) * 100) : 0, timestamp: r.timestamp || '', }); break; } } catch { /* skip */ } } } return results; } function collectActivity() { const now = Date.now(); const buckets: Record = {}; for (let i = 0; i < 24; i++) { const key = new Date(now - i * 3600000).toISOString().slice(0, 13); buckets[key] = { inbound: 0, outbound: 0 }; } const sessionsDir = path.join(DATA_DIR, 'v2-sessions'); if (!fs.existsSync(sessionsDir)) return toBucketArray(buckets); const cutoff = new Date(now - 86400000).toISOString(); try { for (const agDir of fs.readdirSync(sessionsDir).filter((d) => d.startsWith('ag-'))) { const agPath = path.join(sessionsDir, agDir); for (const sessDir of fs.readdirSync(agPath).filter((d) => d.startsWith('sess-'))) { for (const [dbName, direction] of [['outbound.db', 'outbound'], ['inbound.db', 'inbound']] as const) { const dbPath = path.join(agPath, sessDir, dbName); if (!fs.existsSync(dbPath)) continue; try { const db = new Database(dbPath, { readonly: true }); const table = direction === 'outbound' ? 'messages_out' : 'messages_in'; const rows = db.prepare(`SELECT timestamp FROM ${table} WHERE timestamp > ?`).all(cutoff) as { timestamp: string }[]; for (const row of rows) { const key = row.timestamp.slice(0, 13); if (buckets[key]) buckets[key][direction]++; } db.close(); } catch { /* skip */ } } } } } catch { /* skip */ } return toBucketArray(buckets); } function toBucketArray(buckets: Record) { return Object.entries(buckets) .map(([hour, counts]) => ({ hour, ...counts })) .sort((a, b) => a.hour.localeCompare(b.hour)); } function collectMessages() { const sessionsDir = path.join(DATA_DIR, 'v2-sessions'); if (!fs.existsSync(sessionsDir)) return []; const results: Array<{ agentGroupId: string; sessionId: string; inbound: unknown[]; outbound: unknown[] }> = []; const limit = 50; try { for (const agDir of fs.readdirSync(sessionsDir).filter((d) => d.startsWith('ag-'))) { const agPath = path.join(sessionsDir, agDir); for (const sessDir of fs.readdirSync(agPath).filter((d) => d.startsWith('sess-'))) { const inbound: unknown[] = []; const outbound: unknown[] = []; const inDbPath = path.join(agPath, sessDir, 'inbound.db'); if (fs.existsSync(inDbPath)) { try { const db = new Database(inDbPath, { readonly: true }); const rows = db.prepare('SELECT * FROM messages_in ORDER BY seq DESC LIMIT ?').all(limit); inbound.push(...(rows as unknown[]).reverse()); db.close(); } catch { /* skip */ } } const outDbPath = path.join(agPath, sessDir, 'outbound.db'); if (fs.existsSync(outDbPath)) { try { const db = new Database(outDbPath, { readonly: true }); const rows = db.prepare('SELECT * FROM messages_out ORDER BY seq DESC LIMIT ?').all(limit); outbound.push(...(rows as unknown[]).reverse()); db.close(); } catch { /* skip */ } } if (inbound.length > 0 || outbound.length > 0) { results.push({ agentGroupId: agDir, sessionId: sessDir, inbound, outbound }); } } } } catch { /* skip */ } return results; }