diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts new file mode 100644 index 0000000..9e71e58 --- /dev/null +++ b/container/agent-runner/src/db/connection.ts @@ -0,0 +1,55 @@ +import Database from 'better-sqlite3'; + +const SESSION_DB_PATH = '/workspace/session.db'; + +let _db: Database.Database | null = null; + +export function getSessionDb(): Database.Database { + if (!_db) { + _db = new Database(process.env.SESSION_DB_PATH || SESSION_DB_PATH); + _db.pragma('journal_mode = WAL'); + _db.pragma('foreign_keys = ON'); + } + return _db; +} + +/** For tests — opens an in-memory DB with session schema. */ +export function initTestSessionDb(): Database.Database { + _db = new Database(':memory:'); + _db.pragma('foreign_keys = ON'); + _db.exec(` + CREATE TABLE messages_in ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + timestamp TEXT NOT NULL, + status TEXT DEFAULT 'pending', + status_changed TEXT, + process_after TEXT, + recurrence TEXT, + tries INTEGER DEFAULT 0, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + content TEXT NOT NULL + ); + CREATE TABLE messages_out ( + id TEXT PRIMARY KEY, + in_reply_to TEXT, + timestamp TEXT NOT NULL, + delivered INTEGER DEFAULT 0, + deliver_after TEXT, + recurrence TEXT, + kind TEXT NOT NULL, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + content TEXT NOT NULL + ); + `); + return _db; +} + +export function closeSessionDb(): void { + _db?.close(); + _db = null; +} diff --git a/container/agent-runner/src/db/index.ts b/container/agent-runner/src/db/index.ts new file mode 100644 index 0000000..63c00d3 --- /dev/null +++ b/container/agent-runner/src/db/index.ts @@ -0,0 +1,5 @@ +export { getSessionDb, initTestSessionDb, closeSessionDb } from './connection.js'; +export { getPendingMessages, markProcessing, markCompleted, markFailed, getMessageIn, findQuestionResponse } from './messages-in.js'; +export type { MessageInRow } from './messages-in.js'; +export { writeMessageOut, getUndeliveredMessages, markDelivered } from './messages-out.js'; +export type { MessageOutRow, WriteMessageOut } from './messages-out.js'; diff --git a/container/agent-runner/src/db/messages-in.ts b/container/agent-runner/src/db/messages-in.ts new file mode 100644 index 0000000..a68071b --- /dev/null +++ b/container/agent-runner/src/db/messages-in.ts @@ -0,0 +1,65 @@ +import { getSessionDb } from './connection.js'; + +export interface MessageInRow { + id: string; + kind: string; + timestamp: string; + status: string; + status_changed: string | null; + process_after: string | null; + recurrence: string | null; + tries: number; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + content: string; +} + +/** Fetch all pending messages that are due for processing. */ +export function getPendingMessages(): MessageInRow[] { + return getSessionDb() + .prepare( + `SELECT * FROM messages_in + WHERE status = 'pending' + AND (process_after IS NULL OR process_after <= datetime('now')) + ORDER BY timestamp ASC`, + ) + .all() as MessageInRow[]; +} + +/** Mark messages as processing. */ +export function markProcessing(ids: string[]): void { + if (ids.length === 0) return; + const db = getSessionDb(); + const stmt = db.prepare("UPDATE messages_in SET status = 'processing', status_changed = datetime('now'), tries = tries + 1 WHERE id = ?"); + db.transaction(() => { + for (const id of ids) stmt.run(id); + })(); +} + +/** Mark messages as completed. */ +export function markCompleted(ids: string[]): void { + if (ids.length === 0) return; + const db = getSessionDb(); + const stmt = db.prepare("UPDATE messages_in SET status = 'completed', status_changed = datetime('now') WHERE id = ?"); + db.transaction(() => { + for (const id of ids) stmt.run(id); + })(); +} + +/** Mark a single message as failed. */ +export function markFailed(id: string): void { + getSessionDb().prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(id); +} + +/** Get a message by ID. */ +export function getMessageIn(id: string): MessageInRow | undefined { + return getSessionDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined; +} + +/** Find a pending response to a question (by questionId in content). */ +export function findQuestionResponse(questionId: string): MessageInRow | undefined { + return getSessionDb() + .prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?") + .get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined; +} diff --git a/container/agent-runner/src/db/messages-out.ts b/container/agent-runner/src/db/messages-out.ts new file mode 100644 index 0000000..97db901 --- /dev/null +++ b/container/agent-runner/src/db/messages-out.ts @@ -0,0 +1,62 @@ +import { getSessionDb } from './connection.js'; + +export interface MessageOutRow { + id: string; + in_reply_to: string | null; + timestamp: string; + delivered: number; + deliver_after: string | null; + recurrence: string | null; + kind: string; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + content: string; +} + +export interface WriteMessageOut { + id: string; + in_reply_to?: string | null; + deliver_after?: string | null; + recurrence?: string | null; + kind: string; + platform_id?: string | null; + channel_type?: string | null; + thread_id?: string | null; + content: string; +} + +/** Write a new outbound message. */ +export function writeMessageOut(msg: WriteMessageOut): void { + getSessionDb() + .prepare( + `INSERT INTO messages_out (id, in_reply_to, timestamp, delivered, deliver_after, recurrence, kind, platform_id, channel_type, thread_id, content) + VALUES (@id, @in_reply_to, datetime('now'), 0, @deliver_after, @recurrence, @kind, @platform_id, @channel_type, @thread_id, @content)`, + ) + .run({ + in_reply_to: null, + deliver_after: null, + recurrence: null, + platform_id: null, + channel_type: null, + thread_id: null, + ...msg, + }); +} + +/** Get undelivered messages (for host polling). */ +export function getUndeliveredMessages(): MessageOutRow[] { + return getSessionDb() + .prepare( + `SELECT * FROM messages_out + WHERE delivered = 0 + AND (deliver_after IS NULL OR deliver_after <= datetime('now')) + ORDER BY timestamp ASC`, + ) + .all() as MessageOutRow[]; +} + +/** Mark a message as delivered. */ +export function markDelivered(id: string): void { + getSessionDb().prepare('UPDATE messages_out SET delivered = 1 WHERE id = ?').run(id); +} diff --git a/src/db/agent-groups.ts b/src/db/agent-groups.ts new file mode 100644 index 0000000..a306616 --- /dev/null +++ b/src/db/agent-groups.ts @@ -0,0 +1,51 @@ +import type { AgentGroup } from '../types-v2.js'; +import { getDb } from './connection.js'; + +export function createAgentGroup(group: AgentGroup): void { + getDb() + .prepare( + `INSERT INTO agent_groups (id, name, folder, is_admin, agent_provider, container_config, created_at) + VALUES (@id, @name, @folder, @is_admin, @agent_provider, @container_config, @created_at)`, + ) + .run(group); +} + +export function getAgentGroup(id: string): AgentGroup | undefined { + return getDb().prepare('SELECT * FROM agent_groups WHERE id = ?').get(id) as AgentGroup | undefined; +} + +export function getAgentGroupByFolder(folder: string): AgentGroup | undefined { + return getDb().prepare('SELECT * FROM agent_groups WHERE folder = ?').get(folder) as AgentGroup | undefined; +} + +export function getAllAgentGroups(): AgentGroup[] { + return getDb().prepare('SELECT * FROM agent_groups ORDER BY name').all() as AgentGroup[]; +} + +export function getAdminAgentGroup(): AgentGroup | undefined { + return getDb().prepare('SELECT * FROM agent_groups WHERE is_admin = 1 LIMIT 1').get() as AgentGroup | undefined; +} + +export function updateAgentGroup( + id: string, + updates: Partial>, +): void { + const fields: string[] = []; + const values: Record = { id }; + + for (const [key, value] of Object.entries(updates)) { + if (value !== undefined) { + fields.push(`${key} = @${key}`); + values[key] = value; + } + } + if (fields.length === 0) return; + + getDb() + .prepare(`UPDATE agent_groups SET ${fields.join(', ')} WHERE id = @id`) + .run(values); +} + +export function deleteAgentGroup(id: string): void { + getDb().prepare('DELETE FROM agent_groups WHERE id = ?').run(id); +} diff --git a/src/db/connection.ts b/src/db/connection.ts new file mode 100644 index 0000000..6d13774 --- /dev/null +++ b/src/db/connection.ts @@ -0,0 +1,33 @@ +import Database from 'better-sqlite3'; +import fs from 'fs'; +import path from 'path'; + +import { log } from '../log.js'; + +let _db: Database.Database | null = null; + +export function getDb(): Database.Database { + if (!_db) throw new Error('Database not initialized. Call initDb() first.'); + return _db; +} + +export function initDb(dbPath: string): Database.Database { + fs.mkdirSync(path.dirname(dbPath), { recursive: true }); + _db = new Database(dbPath); + _db.pragma('journal_mode = WAL'); + _db.pragma('foreign_keys = ON'); + log.info('Central DB initialized', { path: dbPath }); + return _db; +} + +/** For tests only — creates an in-memory DB and runs migrations. */ +export function initTestDb(): Database.Database { + _db = new Database(':memory:'); + _db.pragma('foreign_keys = ON'); + return _db; +} + +export function closeDb(): void { + _db?.close(); + _db = null; +} diff --git a/src/db/db-v2.test.ts b/src/db/db-v2.test.ts new file mode 100644 index 0000000..daa9576 --- /dev/null +++ b/src/db/db-v2.test.ts @@ -0,0 +1,405 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; + +import { + initTestDb, + closeDb, + runMigrations, + createAgentGroup, + getAgentGroup, + getAgentGroupByFolder, + getAllAgentGroups, + getAdminAgentGroup, + updateAgentGroup, + deleteAgentGroup, + createMessagingGroup, + getMessagingGroup, + getMessagingGroupByPlatform, + getAllMessagingGroups, + updateMessagingGroup, + deleteMessagingGroup, + createMessagingGroupAgent, + getMessagingGroupAgents, + getMessagingGroupAgent, + updateMessagingGroupAgent, + deleteMessagingGroupAgent, + createSession, + getSession, + findSession, + getSessionsByAgentGroup, + getActiveSessions, + getRunningSessions, + updateSession, + deleteSession, + createPendingQuestion, + getPendingQuestion, + deletePendingQuestion, +} from './index.js'; + +function now() { + return new Date().toISOString(); +} + +beforeEach(() => { + const db = initTestDb(); + runMigrations(db); +}); + +afterEach(() => { + closeDb(); +}); + +// ── Migrations ── + +describe('migrations', () => { + it('should be idempotent', () => { + const db = initTestDb(); + runMigrations(db); + // Running again should not throw + runMigrations(db); + }); + + it('should track schema version', () => { + const db = initTestDb(); + runMigrations(db); + const row = db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number }; + expect(row.v).toBe(1); + }); +}); + +// ── Agent Groups ── + +describe('agent groups', () => { + const ag = () => ({ + id: 'ag-1', + name: 'Test Agent', + folder: 'test-agent', + is_admin: 0, + agent_provider: null, + container_config: null, + created_at: now(), + }); + + it('should create and retrieve', () => { + createAgentGroup(ag()); + const result = getAgentGroup('ag-1'); + expect(result).toBeDefined(); + expect(result!.name).toBe('Test Agent'); + expect(result!.folder).toBe('test-agent'); + }); + + it('should find by folder', () => { + createAgentGroup(ag()); + const result = getAgentGroupByFolder('test-agent'); + expect(result).toBeDefined(); + expect(result!.id).toBe('ag-1'); + }); + + it('should list all', () => { + createAgentGroup(ag()); + createAgentGroup({ ...ag(), id: 'ag-2', name: 'Another', folder: 'another' }); + expect(getAllAgentGroups()).toHaveLength(2); + }); + + it('should find admin group', () => { + createAgentGroup(ag()); + createAgentGroup({ ...ag(), id: 'ag-admin', name: 'Admin', folder: 'admin', is_admin: 1 }); + const admin = getAdminAgentGroup(); + expect(admin).toBeDefined(); + expect(admin!.id).toBe('ag-admin'); + }); + + it('should update', () => { + createAgentGroup(ag()); + updateAgentGroup('ag-1', { name: 'Updated' }); + expect(getAgentGroup('ag-1')!.name).toBe('Updated'); + }); + + it('should delete', () => { + createAgentGroup(ag()); + deleteAgentGroup('ag-1'); + expect(getAgentGroup('ag-1')).toBeUndefined(); + }); + + it('should enforce unique folder', () => { + createAgentGroup(ag()); + expect(() => createAgentGroup({ ...ag(), id: 'ag-dup' })).toThrow(); + }); +}); + +// ── Messaging Groups ── + +describe('messaging groups', () => { + const mg = () => ({ + id: 'mg-1', + channel_type: 'discord', + platform_id: 'chan-123', + name: 'General', + is_group: 1, + admin_user_id: 'user-1', + created_at: now(), + }); + + it('should create and retrieve', () => { + createMessagingGroup(mg()); + const result = getMessagingGroup('mg-1'); + expect(result).toBeDefined(); + expect(result!.channel_type).toBe('discord'); + }); + + it('should find by platform', () => { + createMessagingGroup(mg()); + const result = getMessagingGroupByPlatform('discord', 'chan-123'); + expect(result).toBeDefined(); + expect(result!.id).toBe('mg-1'); + }); + + it('should enforce unique channel_type + platform_id', () => { + createMessagingGroup(mg()); + expect(() => createMessagingGroup({ ...mg(), id: 'mg-dup' })).toThrow(); + }); + + it('should update', () => { + createMessagingGroup(mg()); + updateMessagingGroup('mg-1', { name: 'Updated' }); + expect(getMessagingGroup('mg-1')!.name).toBe('Updated'); + }); + + it('should delete', () => { + createMessagingGroup(mg()); + deleteMessagingGroup('mg-1'); + expect(getMessagingGroup('mg-1')).toBeUndefined(); + }); +}); + +// ── Messaging Group Agents ── + +describe('messaging group agents', () => { + beforeEach(() => { + createAgentGroup({ + id: 'ag-1', + name: 'Agent', + folder: 'agent', + is_admin: 0, + agent_provider: null, + container_config: null, + created_at: now(), + }); + createMessagingGroup({ + id: 'mg-1', + channel_type: 'discord', + platform_id: 'chan-1', + name: 'Gen', + is_group: 1, + admin_user_id: null, + created_at: now(), + }); + }); + + const mga = () => ({ + id: 'mga-1', + messaging_group_id: 'mg-1', + agent_group_id: 'ag-1', + trigger_rules: null, + response_scope: 'all' as const, + session_mode: 'shared' as const, + priority: 0, + created_at: now(), + }); + + it('should create and list by messaging group', () => { + createMessagingGroupAgent(mga()); + const results = getMessagingGroupAgents('mg-1'); + expect(results).toHaveLength(1); + expect(results[0].agent_group_id).toBe('ag-1'); + }); + + it('should order by priority descending', () => { + createMessagingGroupAgent(mga()); + createAgentGroup({ + id: 'ag-2', + name: 'Agent2', + folder: 'agent2', + is_admin: 0, + agent_provider: null, + container_config: null, + created_at: now(), + }); + createMessagingGroupAgent({ ...mga(), id: 'mga-2', agent_group_id: 'ag-2', priority: 10 }); + const results = getMessagingGroupAgents('mg-1'); + expect(results[0].agent_group_id).toBe('ag-2'); + expect(results[1].agent_group_id).toBe('ag-1'); + }); + + it('should enforce unique messaging_group + agent_group', () => { + createMessagingGroupAgent(mga()); + expect(() => createMessagingGroupAgent({ ...mga(), id: 'mga-dup' })).toThrow(); + }); + + it('should update', () => { + createMessagingGroupAgent(mga()); + updateMessagingGroupAgent('mga-1', { priority: 5 }); + expect(getMessagingGroupAgent('mga-1')!.priority).toBe(5); + }); + + it('should delete', () => { + createMessagingGroupAgent(mga()); + deleteMessagingGroupAgent('mga-1'); + expect(getMessagingGroupAgents('mg-1')).toHaveLength(0); + }); + + it('should enforce foreign key on agent_group_id', () => { + expect(() => createMessagingGroupAgent({ ...mga(), agent_group_id: 'nonexistent' })).toThrow(); + }); +}); + +// ── Sessions ── + +describe('sessions', () => { + beforeEach(() => { + createAgentGroup({ + id: 'ag-1', + name: 'Agent', + folder: 'agent', + is_admin: 0, + agent_provider: null, + container_config: null, + created_at: now(), + }); + createMessagingGroup({ + id: 'mg-1', + channel_type: 'discord', + platform_id: 'chan-1', + name: 'Gen', + is_group: 1, + admin_user_id: null, + created_at: now(), + }); + }); + + const sess = () => ({ + id: 'sess-1', + agent_group_id: 'ag-1', + messaging_group_id: 'mg-1', + thread_id: null, + agent_provider: null, + status: 'active' as const, + container_status: 'stopped' as const, + last_active: null, + created_at: now(), + }); + + it('should create and retrieve', () => { + createSession(sess()); + const result = getSession('sess-1'); + expect(result).toBeDefined(); + expect(result!.agent_group_id).toBe('ag-1'); + }); + + it('should find by messaging group (shared, no thread)', () => { + createSession(sess()); + const result = findSession('mg-1', null); + expect(result).toBeDefined(); + expect(result!.id).toBe('sess-1'); + }); + + it('should find by messaging group + thread', () => { + createSession({ ...sess(), thread_id: 'thread-1' }); + expect(findSession('mg-1', 'thread-1')).toBeDefined(); + expect(findSession('mg-1', 'thread-2')).toBeUndefined(); + expect(findSession('mg-1', null)).toBeUndefined(); + }); + + it('should only find active sessions', () => { + createSession({ ...sess(), status: 'closed' }); + expect(findSession('mg-1', null)).toBeUndefined(); + }); + + it('should list by agent group', () => { + createSession(sess()); + createSession({ ...sess(), id: 'sess-2', thread_id: 'thread-1' }); + expect(getSessionsByAgentGroup('ag-1')).toHaveLength(2); + }); + + it('should list active sessions', () => { + createSession(sess()); + createSession({ ...sess(), id: 'sess-closed', status: 'closed', thread_id: 'thread-x' }); + expect(getActiveSessions()).toHaveLength(1); + }); + + it('should list running sessions', () => { + createSession({ ...sess(), container_status: 'running' }); + createSession({ ...sess(), id: 'sess-idle', container_status: 'idle', thread_id: 'thread-1' }); + createSession({ ...sess(), id: 'sess-stopped', container_status: 'stopped', thread_id: 'thread-2' }); + expect(getRunningSessions()).toHaveLength(2); + }); + + it('should update', () => { + createSession(sess()); + updateSession('sess-1', { container_status: 'running', last_active: now() }); + const result = getSession('sess-1')!; + expect(result.container_status).toBe('running'); + expect(result.last_active).not.toBeNull(); + }); + + it('should delete', () => { + createSession(sess()); + deleteSession('sess-1'); + expect(getSession('sess-1')).toBeUndefined(); + }); +}); + +// ── Pending Questions ── + +describe('pending questions', () => { + beforeEach(() => { + createAgentGroup({ + id: 'ag-1', + name: 'Agent', + folder: 'agent', + is_admin: 0, + agent_provider: null, + container_config: null, + created_at: now(), + }); + createSession({ + id: 'sess-1', + agent_group_id: 'ag-1', + messaging_group_id: null, + thread_id: null, + agent_provider: null, + status: 'active', + container_status: 'stopped', + last_active: null, + created_at: now(), + }); + }); + + it('should create and retrieve', () => { + createPendingQuestion({ + question_id: 'q-1', + session_id: 'sess-1', + message_out_id: 'msg-out-1', + platform_id: 'chan-1', + channel_type: 'discord', + thread_id: null, + created_at: now(), + }); + const result = getPendingQuestion('q-1'); + expect(result).toBeDefined(); + expect(result!.session_id).toBe('sess-1'); + }); + + it('should delete', () => { + createPendingQuestion({ + question_id: 'q-1', + session_id: 'sess-1', + message_out_id: 'msg-out-1', + platform_id: null, + channel_type: null, + thread_id: null, + created_at: now(), + }); + deletePendingQuestion('q-1'); + expect(getPendingQuestion('q-1')).toBeUndefined(); + }); +}); diff --git a/src/db/index.ts b/src/db/index.ts new file mode 100644 index 0000000..35645cb --- /dev/null +++ b/src/db/index.ts @@ -0,0 +1,37 @@ +export { initDb, initTestDb, getDb, closeDb } from './connection.js'; +export { runMigrations } from './migrations/index.js'; +export { + createAgentGroup, + getAgentGroup, + getAgentGroupByFolder, + getAllAgentGroups, + getAdminAgentGroup, + updateAgentGroup, + deleteAgentGroup, +} from './agent-groups.js'; +export { + createMessagingGroup, + getMessagingGroup, + getMessagingGroupByPlatform, + getAllMessagingGroups, + updateMessagingGroup, + deleteMessagingGroup, + createMessagingGroupAgent, + getMessagingGroupAgents, + getMessagingGroupAgent, + updateMessagingGroupAgent, + deleteMessagingGroupAgent, +} from './messaging-groups.js'; +export { + createSession, + getSession, + findSession, + getSessionsByAgentGroup, + getActiveSessions, + getRunningSessions, + updateSession, + deleteSession, + createPendingQuestion, + getPendingQuestion, + deletePendingQuestion, +} from './sessions.js'; diff --git a/src/db/messaging-groups.ts b/src/db/messaging-groups.ts new file mode 100644 index 0000000..40a9702 --- /dev/null +++ b/src/db/messaging-groups.ts @@ -0,0 +1,98 @@ +import type { MessagingGroup, MessagingGroupAgent } from '../types-v2.js'; +import { getDb } from './connection.js'; + +// ── Messaging Groups ── + +export function createMessagingGroup(group: MessagingGroup): void { + getDb() + .prepare( + `INSERT INTO messaging_groups (id, channel_type, platform_id, name, is_group, admin_user_id, created_at) + VALUES (@id, @channel_type, @platform_id, @name, @is_group, @admin_user_id, @created_at)`, + ) + .run(group); +} + +export function getMessagingGroup(id: string): MessagingGroup | undefined { + return getDb().prepare('SELECT * FROM messaging_groups WHERE id = ?').get(id) as MessagingGroup | undefined; +} + +export function getMessagingGroupByPlatform(channelType: string, platformId: string): MessagingGroup | undefined { + return getDb() + .prepare('SELECT * FROM messaging_groups WHERE channel_type = ? AND platform_id = ?') + .get(channelType, platformId) as MessagingGroup | undefined; +} + +export function getAllMessagingGroups(): MessagingGroup[] { + return getDb().prepare('SELECT * FROM messaging_groups ORDER BY name').all() as MessagingGroup[]; +} + +export function updateMessagingGroup( + id: string, + updates: Partial>, +): void { + const fields: string[] = []; + const values: Record = { id }; + + for (const [key, value] of Object.entries(updates)) { + if (value !== undefined) { + fields.push(`${key} = @${key}`); + values[key] = value; + } + } + if (fields.length === 0) return; + + getDb() + .prepare(`UPDATE messaging_groups SET ${fields.join(', ')} WHERE id = @id`) + .run(values); +} + +export function deleteMessagingGroup(id: string): void { + getDb().prepare('DELETE FROM messaging_groups WHERE id = ?').run(id); +} + +// ── Messaging Group Agents ── + +export function createMessagingGroupAgent(mga: MessagingGroupAgent): void { + getDb() + .prepare( + `INSERT INTO messaging_group_agents (id, messaging_group_id, agent_group_id, trigger_rules, response_scope, session_mode, priority, created_at) + VALUES (@id, @messaging_group_id, @agent_group_id, @trigger_rules, @response_scope, @session_mode, @priority, @created_at)`, + ) + .run(mga); +} + +export function getMessagingGroupAgents(messagingGroupId: string): MessagingGroupAgent[] { + return getDb() + .prepare('SELECT * FROM messaging_group_agents WHERE messaging_group_id = ? ORDER BY priority DESC') + .all(messagingGroupId) as MessagingGroupAgent[]; +} + +export function getMessagingGroupAgent(id: string): MessagingGroupAgent | undefined { + return getDb().prepare('SELECT * FROM messaging_group_agents WHERE id = ?').get(id) as + | MessagingGroupAgent + | undefined; +} + +export function updateMessagingGroupAgent( + id: string, + updates: Partial>, +): void { + const fields: string[] = []; + const values: Record = { id }; + + for (const [key, value] of Object.entries(updates)) { + if (value !== undefined) { + fields.push(`${key} = @${key}`); + values[key] = value; + } + } + if (fields.length === 0) return; + + getDb() + .prepare(`UPDATE messaging_group_agents SET ${fields.join(', ')} WHERE id = @id`) + .run(values); +} + +export function deleteMessagingGroupAgent(id: string): void { + getDb().prepare('DELETE FROM messaging_group_agents WHERE id = ?').run(id); +} diff --git a/src/db/migrations/001-initial.ts b/src/db/migrations/001-initial.ts new file mode 100644 index 0000000..d32b3c2 --- /dev/null +++ b/src/db/migrations/001-initial.ts @@ -0,0 +1,68 @@ +import type Database from 'better-sqlite3'; + +import type { Migration } from './index.js'; + +export const migration001: Migration = { + version: 1, + name: 'initial-v2-schema', + up(db: Database.Database) { + db.exec(` + CREATE TABLE agent_groups ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + folder TEXT NOT NULL UNIQUE, + is_admin INTEGER DEFAULT 0, + agent_provider TEXT, + container_config TEXT, + created_at TEXT NOT NULL + ); + + CREATE TABLE messaging_groups ( + id TEXT PRIMARY KEY, + channel_type TEXT NOT NULL, + platform_id TEXT NOT NULL, + name TEXT, + is_group INTEGER DEFAULT 0, + admin_user_id TEXT, + created_at TEXT NOT NULL, + UNIQUE(channel_type, platform_id) + ); + + CREATE TABLE messaging_group_agents ( + id TEXT PRIMARY KEY, + messaging_group_id TEXT NOT NULL REFERENCES messaging_groups(id), + agent_group_id TEXT NOT NULL REFERENCES agent_groups(id), + trigger_rules TEXT, + response_scope TEXT DEFAULT 'all', + session_mode TEXT DEFAULT 'shared', + priority INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + UNIQUE(messaging_group_id, agent_group_id) + ); + + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + agent_group_id TEXT NOT NULL REFERENCES agent_groups(id), + messaging_group_id TEXT REFERENCES messaging_groups(id), + thread_id TEXT, + agent_provider TEXT, + status TEXT DEFAULT 'active', + container_status TEXT DEFAULT 'stopped', + last_active TEXT, + created_at TEXT NOT NULL + ); + CREATE INDEX idx_sessions_agent_group ON sessions(agent_group_id); + CREATE INDEX idx_sessions_lookup ON sessions(messaging_group_id, thread_id); + + CREATE TABLE pending_questions ( + question_id TEXT PRIMARY KEY, + session_id TEXT NOT NULL REFERENCES sessions(id), + message_out_id TEXT NOT NULL, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + created_at TEXT NOT NULL + ); + `); + }, +}; diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts new file mode 100644 index 0000000..54e848c --- /dev/null +++ b/src/db/migrations/index.ts @@ -0,0 +1,46 @@ +import type Database from 'better-sqlite3'; + +import { log } from '../../log.js'; +import { migration001 } from './001-initial.js'; + +export interface Migration { + version: number; + name: string; + up: (db: Database.Database) => void; +} + +const migrations: Migration[] = [migration001]; + +export function runMigrations(db: Database.Database): void { + db.exec(` + CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY, + name TEXT NOT NULL, + applied TEXT NOT NULL + ); + `); + + const currentVersion = + (db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number | null })?.v ?? 0; + + const pending = migrations.filter((m) => m.version > currentVersion); + if (pending.length === 0) return; + + log.info('Running migrations', { + from: currentVersion, + to: pending[pending.length - 1].version, + count: pending.length, + }); + + for (const m of pending) { + db.transaction(() => { + m.up(db); + db.prepare('INSERT INTO schema_version (version, name, applied) VALUES (?, ?, ?)').run( + m.version, + m.name, + new Date().toISOString(), + ); + })(); + log.info('Migration applied', { version: m.version, name: m.name }); + } +} diff --git a/src/db/schema.ts b/src/db/schema.ts new file mode 100644 index 0000000..2d50d18 --- /dev/null +++ b/src/db/schema.ts @@ -0,0 +1,103 @@ +/** + * Reference copy of the current v2 schema. + * Read this to understand the DB structure. + * Actual creation is done by migrations — do not use this at runtime. + */ + +export const SCHEMA = ` +-- Agent workspaces: folder, skills, CLAUDE.md, container config +CREATE TABLE agent_groups ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + folder TEXT NOT NULL UNIQUE, + is_admin INTEGER DEFAULT 0, + agent_provider TEXT, + container_config TEXT, + created_at TEXT NOT NULL +); + +-- Platform groups/channels +CREATE TABLE messaging_groups ( + id TEXT PRIMARY KEY, + channel_type TEXT NOT NULL, + platform_id TEXT NOT NULL, + name TEXT, + is_group INTEGER DEFAULT 0, + admin_user_id TEXT, + created_at TEXT NOT NULL, + UNIQUE(channel_type, platform_id) +); + +-- Which agent groups handle which messaging groups +CREATE TABLE messaging_group_agents ( + id TEXT PRIMARY KEY, + messaging_group_id TEXT NOT NULL REFERENCES messaging_groups(id), + agent_group_id TEXT NOT NULL REFERENCES agent_groups(id), + trigger_rules TEXT, + response_scope TEXT DEFAULT 'all', + session_mode TEXT DEFAULT 'shared', + priority INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + UNIQUE(messaging_group_id, agent_group_id) +); + +-- Sessions: one folder = one session = one container when running +CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + agent_group_id TEXT NOT NULL REFERENCES agent_groups(id), + messaging_group_id TEXT REFERENCES messaging_groups(id), + thread_id TEXT, + agent_provider TEXT, + status TEXT DEFAULT 'active', + container_status TEXT DEFAULT 'stopped', + last_active TEXT, + created_at TEXT NOT NULL +); +CREATE INDEX idx_sessions_agent_group ON sessions(agent_group_id); +CREATE INDEX idx_sessions_lookup ON sessions(messaging_group_id, thread_id); + +-- Pending interactive questions +CREATE TABLE pending_questions ( + question_id TEXT PRIMARY KEY, + session_id TEXT NOT NULL REFERENCES sessions(id), + message_out_id TEXT NOT NULL, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + created_at TEXT NOT NULL +); +`; + +/** + * Session DB schema — created fresh by the host for each session. + */ +export const SESSION_SCHEMA = ` +CREATE TABLE messages_in ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + timestamp TEXT NOT NULL, + status TEXT DEFAULT 'pending', + status_changed TEXT, + process_after TEXT, + recurrence TEXT, + tries INTEGER DEFAULT 0, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + content TEXT NOT NULL +); + +CREATE TABLE messages_out ( + id TEXT PRIMARY KEY, + in_reply_to TEXT, + timestamp TEXT NOT NULL, + delivered INTEGER DEFAULT 0, + deliver_after TEXT, + recurrence TEXT, + kind TEXT NOT NULL, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + content TEXT NOT NULL +); +`; diff --git a/src/db/sessions.ts b/src/db/sessions.ts new file mode 100644 index 0000000..57f00b9 --- /dev/null +++ b/src/db/sessions.ts @@ -0,0 +1,85 @@ +import type { PendingQuestion, Session } from '../types-v2.js'; +import { getDb } from './connection.js'; + +// ── Sessions ── + +export function createSession(session: Session): void { + getDb() + .prepare( + `INSERT INTO sessions (id, agent_group_id, messaging_group_id, thread_id, agent_provider, status, container_status, last_active, created_at) + VALUES (@id, @agent_group_id, @messaging_group_id, @thread_id, @agent_provider, @status, @container_status, @last_active, @created_at)`, + ) + .run(session); +} + +export function getSession(id: string): Session | undefined { + return getDb().prepare('SELECT * FROM sessions WHERE id = ?').get(id) as Session | undefined; +} + +export function findSession(messagingGroupId: string, threadId: string | null): Session | undefined { + if (threadId) { + return getDb() + .prepare('SELECT * FROM sessions WHERE messaging_group_id = ? AND thread_id = ? AND status = ?') + .get(messagingGroupId, threadId, 'active') as Session | undefined; + } + return getDb() + .prepare('SELECT * FROM sessions WHERE messaging_group_id = ? AND thread_id IS NULL AND status = ?') + .get(messagingGroupId, 'active') as Session | undefined; +} + +export function getSessionsByAgentGroup(agentGroupId: string): Session[] { + return getDb().prepare('SELECT * FROM sessions WHERE agent_group_id = ?').all(agentGroupId) as Session[]; +} + +export function getActiveSessions(): Session[] { + return getDb().prepare("SELECT * FROM sessions WHERE status = 'active'").all() as Session[]; +} + +export function getRunningSessions(): Session[] { + return getDb().prepare("SELECT * FROM sessions WHERE container_status IN ('running', 'idle')").all() as Session[]; +} + +export function updateSession( + id: string, + updates: Partial>, +): void { + const fields: string[] = []; + const values: Record = { id }; + + for (const [key, value] of Object.entries(updates)) { + if (value !== undefined) { + fields.push(`${key} = @${key}`); + values[key] = value; + } + } + if (fields.length === 0) return; + + getDb() + .prepare(`UPDATE sessions SET ${fields.join(', ')} WHERE id = @id`) + .run(values); +} + +export function deleteSession(id: string): void { + getDb().prepare('DELETE FROM sessions WHERE id = ?').run(id); +} + +// ── Pending Questions ── + +export function createPendingQuestion(pq: PendingQuestion): void { + getDb() + .prepare( + `INSERT INTO pending_questions (question_id, session_id, message_out_id, platform_id, channel_type, thread_id, created_at) + VALUES (@question_id, @session_id, @message_out_id, @platform_id, @channel_type, @thread_id, @created_at)`, + ) + .run(pq); +} + +export function getPendingQuestion(questionId: string): PendingQuestion | undefined { + return getDb().prepare('SELECT * FROM pending_questions WHERE question_id = ?').get(questionId) as + | PendingQuestion + | undefined; +} + +export function deletePendingQuestion(questionId: string): void { + getDb().prepare('DELETE FROM pending_questions WHERE question_id = ?').run(questionId); +} diff --git a/src/log.ts b/src/log.ts new file mode 100644 index 0000000..d1e820c --- /dev/null +++ b/src/log.ts @@ -0,0 +1,64 @@ +const LEVELS = { debug: 20, info: 30, warn: 40, error: 50, fatal: 60 } as const; +type Level = keyof typeof LEVELS; + +const COLORS: Record = { + debug: '\x1b[34m', + info: '\x1b[32m', + warn: '\x1b[33m', + error: '\x1b[31m', + fatal: '\x1b[41m\x1b[37m', +}; +const KEY_COLOR = '\x1b[35m'; +const MSG_COLOR = '\x1b[36m'; +const RESET = '\x1b[39m'; +const FULL_RESET = '\x1b[0m'; + +const threshold = LEVELS[(process.env.LOG_LEVEL as Level) || 'info'] ?? LEVELS.info; + +function formatErr(err: unknown): string { + if (err instanceof Error) { + return `{ type: "${err.constructor.name}", message: "${err.message}", stack: ${err.stack} }`; + } + return JSON.stringify(err); +} + +function formatData(data: Record): string { + const parts: string[] = []; + for (const [k, v] of Object.entries(data)) { + parts.push(`${KEY_COLOR}${k}${RESET}=${k === 'err' ? formatErr(v) : JSON.stringify(v)}`); + } + return parts.length ? ' ' + parts.join(' ') : ''; +} + +function ts(): string { + const d = new Date(); + const hh = String(d.getHours()).padStart(2, '0'); + const mm = String(d.getMinutes()).padStart(2, '0'); + const ss = String(d.getSeconds()).padStart(2, '0'); + const ms = String(d.getMilliseconds()).padStart(3, '0'); + return `${hh}:${mm}:${ss}.${ms}`; +} + +function emit(level: Level, msg: string, data?: Record): void { + if (LEVELS[level] < threshold) return; + const tag = `${COLORS[level]}${level.toUpperCase()}${level === 'fatal' ? FULL_RESET : RESET}`; + const stream = LEVELS[level] >= LEVELS.warn ? process.stderr : process.stdout; + stream.write(`[${ts()}] ${tag} ${MSG_COLOR}${msg}${RESET}${data ? formatData(data) : ''}\n`); +} + +export const log = { + debug: (msg: string, data?: Record) => emit('debug', msg, data), + info: (msg: string, data?: Record) => emit('info', msg, data), + warn: (msg: string, data?: Record) => emit('warn', msg, data), + error: (msg: string, data?: Record) => emit('error', msg, data), + fatal: (msg: string, data?: Record) => emit('fatal', msg, data), +}; + +process.on('uncaughtException', (err) => { + log.fatal('Uncaught exception', { err }); + process.exit(1); +}); + +process.on('unhandledRejection', (reason) => { + log.error('Unhandled rejection', { err: reason }); +}); diff --git a/src/types-v2.ts b/src/types-v2.ts new file mode 100644 index 0000000..7b202bb --- /dev/null +++ b/src/types-v2.ts @@ -0,0 +1,90 @@ +// ── Central DB entities ── + +export interface AgentGroup { + id: string; + name: string; + folder: string; + is_admin: number; // 0 | 1 + agent_provider: string | null; + container_config: string | null; // JSON: { additionalMounts, timeout } + created_at: string; +} + +export interface MessagingGroup { + id: string; + channel_type: string; + platform_id: string; + name: string | null; + is_group: number; // 0 | 1 + admin_user_id: string | null; + created_at: string; +} + +export interface MessagingGroupAgent { + id: string; + messaging_group_id: string; + agent_group_id: string; + trigger_rules: string | null; // JSON: { pattern, mentionOnly, excludeSenders, includeSenders } + response_scope: 'all' | 'triggered' | 'allowlisted'; + session_mode: 'shared' | 'per-thread'; + priority: number; + created_at: string; +} + +export interface Session { + id: string; + agent_group_id: string; + messaging_group_id: string | null; + thread_id: string | null; + agent_provider: string | null; + status: 'active' | 'closed'; + container_status: 'running' | 'idle' | 'stopped'; + last_active: string | null; + created_at: string; +} + +// ── Session DB entities ── + +export type MessageInKind = 'chat' | 'chat-sdk' | 'task' | 'webhook' | 'system'; +export type MessageInStatus = 'pending' | 'processing' | 'completed' | 'failed'; + +export interface MessageIn { + id: string; + kind: MessageInKind; + timestamp: string; + status: MessageInStatus; + status_changed: string | null; + process_after: string | null; + recurrence: string | null; + tries: number; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + content: string; // JSON blob +} + +export interface MessageOut { + id: string; + in_reply_to: string | null; + timestamp: string; + delivered: number; // 0 | 1 + deliver_after: string | null; + recurrence: string | null; + kind: string; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + content: string; // JSON blob +} + +// ── Pending questions (central DB) ── + +export interface PendingQuestion { + question_id: string; + session_id: string; + message_out_id: string; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + created_at: string; +}