From 201758968330515a07c2aea5847309e90c7e9890 Mon Sep 17 00:00:00 2001 From: Koshkoshinsk Date: Mon, 13 Apr 2026 12:27:02 +0000 Subject: [PATCH] feat(telegram): self-contained pairing for chat ownership verification BotFather issues bot tokens with no user binding, so anyone who guesses the bot's username can DM it and get registered as a channel. Pairing closes that gap: setup issues a one-time 4-digit code, the operator echoes it back from the chat they want to register, and the inbound interceptor binds admin_user_id before the message reaches the router. - src/channels/telegram-pairing.ts: JSON-backed store with createPairing, tryConsume, getStatus, waitForPairing (fs.watch + poll fallback) - src/channels/telegram.ts: wraps bridge.setup with an onInbound interceptor that consumes pairing codes and upserts messaging_groups - setup/pair-telegram.ts: CLI step issues a code and waits up to 5 min for the operator to echo it back, emitting PLATFORM_ID/IS_GROUP/ADMIN_USER_ID - Skill docs: /setup reorders mounts -> service -> wire (pairing needs a live polling adapter); /manage-channels and /add-telegram-v2 use pairing instead of asking the user to discover chat IDs All other channels still bind admin via install-time identity (OAuth/QR/token); pairing is Telegram-only. The bridge, router, and other adapters are untouched. --- .claude/skills/add-telegram-v2/SKILL.md | 2 +- .claude/skills/manage-channels/SKILL.md | 10 +- .claude/skills/setup/SKILL.md | 26 +-- setup/index.ts | 1 + setup/pair-telegram.ts | 97 +++++++++ src/channels/telegram-pairing.test.ts | 166 ++++++++++++++ src/channels/telegram-pairing.ts | 276 ++++++++++++++++++++++++ src/channels/telegram.ts | 124 ++++++++++- 8 files changed, 679 insertions(+), 23 deletions(-) create mode 100644 setup/pair-telegram.ts create mode 100644 src/channels/telegram-pairing.test.ts create mode 100644 src/channels/telegram-pairing.ts diff --git a/.claude/skills/add-telegram-v2/SKILL.md b/.claude/skills/add-telegram-v2/SKILL.md index b767e55..fc18cc5 100644 --- a/.claude/skills/add-telegram-v2/SKILL.md +++ b/.claude/skills/add-telegram-v2/SKILL.md @@ -68,7 +68,7 @@ Otherwise, run `/manage-channels` to wire this channel to an agent group. - **type**: `telegram` - **terminology**: Telegram calls them "groups" and "chats." A "group" has multiple members; a "chat" is a 1:1 conversation with the bot. -- **how-to-find-id**: Send a message in the group/chat, then visit `https://api.telegram.org/bot/getUpdates` — the `chat.id` field is the platform ID. Group IDs are negative numbers. +- **how-to-find-id**: Do NOT ask the user for a chat ID. Telegram registration uses pairing — run `npx tsx setup/index.ts --step pair-telegram -- --intent `, show the user the 4-digit `CODE` from the `PAIR_TELEGRAM_ISSUED` block, and tell them to send `@ CODE` from the chat they want to register (DM the bot for `main`, post in the group otherwise). The step waits up to 5 minutes and emits a `PAIR_TELEGRAM` block with `PLATFORM_ID`, `IS_GROUP`, and `ADMIN_USER_ID` once the user echoes the code. The service must be running for this to work (the polling adapter is what observes the code). - **supports-threads**: no - **typical-use**: Interactive chat — direct messages or small groups - **default-isolation**: Same agent group if you're the only participant across multiple chats. Separate agent group if different people are in different groups. diff --git a/.claude/skills/manage-channels/SKILL.md b/.claude/skills/manage-channels/SKILL.md index ee68656..2e656ea 100644 --- a/.claude/skills/manage-channels/SKILL.md +++ b/.claude/skills/manage-channels/SKILL.md @@ -17,8 +17,8 @@ Categorize channels as: **wired** (has DB entities), **configured but unwired** 1. Ask the assistant name (default: project name or "Andy") 2. Ask which channel is the primary/admin channel -3. Ask for the platform ID — read the channel's SKILL.md `## Channel Info` > `how-to-find-id` to guide them -4. Register: +3. **Telegram special case:** if the chosen channel is `telegram`, do not ask for an ID. Run `npx tsx setup/index.ts --step pair-telegram -- --intent main`, show the user the 4-digit CODE from the `PAIR_TELEGRAM_ISSUED` block, and tell them to DM the bot with `@ CODE` from the chat they want as their main. Wait for the `PAIR_TELEGRAM` block — `PLATFORM_ID`, `IS_GROUP`, `ADMIN_USER_ID` come back from there. Skip step 4 of this list (the messaging group is already created with admin binding); instead run only the agent-group + wiring portion via `setup --step register` with the returned `PLATFORM_ID`. +4. Otherwise (non-Telegram), ask for the platform ID — read the channel's SKILL.md `## Channel Info` > `how-to-find-id` to guide them, then register: ```bash npx tsx setup/index.ts --step register -- \ @@ -64,10 +64,8 @@ For separate agents, also ask for a folder name and optionally a different assis When adding another group/chat on an already-configured platform (e.g. a second Telegram group): -1. Read the channel's SKILL.md `## Channel Info` for terminology and how-to-find-id -2. Ask for the new group/chat ID -3. Ask the isolation question -4. Register — no package or credential changes needed +1. **Telegram:** ask the isolation question first to determine intent (`wire-to:` for an existing agent, `new-agent:` for a fresh one). Run `npx tsx setup/index.ts --step pair-telegram -- --intent `, show the CODE, and tell the user to post `@ CODE` in the target group (or DM the bot for a private chat). Wait for the `PAIR_TELEGRAM` block, then run `setup --step register` with the returned `PLATFORM_ID` and the chosen folder/session-mode. The messaging group row is already created with `admin_user_id` set — `register` only needs to add the wiring. +2. **Other channels:** read the channel's SKILL.md `## Channel Info` for terminology and how-to-find-id. Ask for the new group/chat ID, ask the isolation question, then register. No package or credential changes needed. ## Change Wiring diff --git a/.claude/skills/setup/SKILL.md b/.claude/skills/setup/SKILL.md index 205b806..0543e59 100644 --- a/.claude/skills/setup/SKILL.md +++ b/.claude/skills/setup/SKILL.md @@ -288,17 +288,6 @@ npm install && npm run build If the build fails, read the error output and fix it (usually a missing dependency). Then continue to step 5a. -## 5a. Wire Channels to Agent Groups - -Invoke `/manage-channels` to wire the installed channels to agent groups. This step: -1. Creates the agent group(s) and assigns a name to the assistant -2. Asks for each channel's platform-specific ID (guided by channel-specific instructions) -3. Decides the isolation level — whether channels share an agent, session, or are fully separate - -The `/manage-channels` skill reads each channel's `## Channel Info` section from its SKILL.md for platform-specific guidance (terminology, how to find IDs, recommended isolation). - -**This step is required.** Without it, channels are installed but not wired — messages will be silently dropped because the router has no agent group to route to. - ## 6. Mount Allowlist AskUserQuestion: Agent access to external directories? @@ -336,6 +325,19 @@ Replace `USERNAME` with the actual username (from `whoami`). Run the two `sudo` - Linux: check `systemctl --user status nanoclaw`. - Re-run the service step after fixing. +## 7a. Wire Channels to Agent Groups + +The service is now running, so polling-based adapters (Telegram) can observe inbound messages — required for pairing. + +Invoke `/manage-channels` to wire the installed channels to agent groups. This step: +1. Creates the agent group(s) and assigns a name to the assistant +2. Resolves each channel's platform-specific ID (Telegram via pairing code; other channels via the platform's own ID lookup) +3. Decides the isolation level — whether channels share an agent, session, or are fully separate + +The `/manage-channels` skill reads each channel's `## Channel Info` section from its SKILL.md for platform-specific guidance (terminology, how to find IDs, recommended isolation). + +**This step is required.** Without it, channels are installed but not wired — messages will be silently dropped because the router has no agent group to route to. + ## 8. Verify Run `npx tsx setup/index.ts --step verify` and parse the status block. @@ -345,7 +347,7 @@ Run `npx tsx setup/index.ts --step verify` and parse the status block. - SERVICE=not_found → re-run step 7 - CREDENTIALS=missing → re-run step 4 (Docker: check `onecli secrets list`; Apple Container: check `.env` for credentials) - CHANNEL_AUTH shows `not_found` for any channel → re-invoke that channel's skill (e.g. `/add-telegram`) -- REGISTERED_GROUPS=0 → re-invoke `/manage-channels` from step 5a +- REGISTERED_GROUPS=0 → re-invoke `/manage-channels` from step 7a - MOUNT_ALLOWLIST=missing → `npx tsx setup/index.ts --step mounts -- --empty` Tell user to test: send a message in their registered chat. Show: `tail -f logs/nanoclaw.log` diff --git a/setup/index.ts b/setup/index.ts index 9975022..0e0db7b 100644 --- a/setup/index.ts +++ b/setup/index.ts @@ -14,6 +14,7 @@ const STEPS: Record< container: () => import('./container.js'), groups: () => import('./groups.js'), register: () => import('./register.js'), + 'pair-telegram': () => import('./pair-telegram.js'), mounts: () => import('./mounts.js'), service: () => import('./service.js'), verify: () => import('./verify.js'), diff --git a/setup/pair-telegram.ts b/setup/pair-telegram.ts new file mode 100644 index 0000000..21c8467 --- /dev/null +++ b/setup/pair-telegram.ts @@ -0,0 +1,97 @@ +/** + * Step: pair-telegram — issue a one-time pairing code and wait for the + * operator to send `@botname CODE` from the chat they want to register. + * + * On success, prints platformId / isGroup / adminUserId / intent. The caller + * (skill) then runs `setup --step register` with those values. + * + * The service must already be running so the telegram adapter is polling. + */ +import { initDb } from '../src/db/connection.js'; +import { runMigrations } from '../src/db/migrations/index.js'; +import { DATA_DIR } from '../src/config.js'; +import path from 'path'; + +import { + createPairing, + waitForPairing, + type PairingIntent, +} from '../src/channels/telegram-pairing.js'; +import { emitStatus } from './status.js'; + +interface Args { + intent: PairingIntent; + ttlMs: number; +} + +function parseArgs(args: string[]): Args { + let intent: PairingIntent = 'main'; + let ttlMs = 5 * 60 * 1000; + for (let i = 0; i < args.length; i++) { + switch (args[i]) { + case '--intent': { + const raw = args[++i] || 'main'; + if (raw === 'main') { + intent = 'main'; + } else if (raw.startsWith('wire-to:')) { + intent = { kind: 'wire-to', folder: raw.slice('wire-to:'.length) }; + } else if (raw.startsWith('new-agent:')) { + intent = { kind: 'new-agent', folder: raw.slice('new-agent:'.length) }; + } else { + throw new Error(`Unknown intent: ${raw}`); + } + break; + } + case '--ttl-ms': + ttlMs = parseInt(args[++i] || '300000', 10); + break; + } + } + return { intent, ttlMs }; +} + +function intentToString(intent: PairingIntent): string { + if (intent === 'main') return 'main'; + return `${intent.kind}:${intent.folder}`; +} + +export async function run(args: string[]): Promise { + const { intent, ttlMs } = parseArgs(args); + + // Pairing reads/writes its JSON store under DATA_DIR; the DB isn't strictly + // required for the pairing primitive itself, but the inbound interceptor + // (running in the live service) needs it. Touch it here so a fresh install + // doesn't blow up on the first match. + const db = initDb(path.join(DATA_DIR, 'v2.db')); + runMigrations(db); + + const record = await createPairing(intent, { ttlMs }); + + // Tell the user what to do. The skill prints this as user-facing text. + emitStatus('PAIR_TELEGRAM_ISSUED', { + CODE: record.code, + INTENT: intentToString(intent), + EXPIRES_AT: record.expiresAt, + INSTRUCTIONS: `Send "@ ${record.code}" from the Telegram chat you want to register.`, + }); + + try { + const consumed = await waitForPairing(record.code, { timeoutMs: ttlMs }); + emitStatus('PAIR_TELEGRAM', { + STATUS: 'success', + CODE: record.code, + INTENT: intentToString(consumed.intent), + PLATFORM_ID: consumed.consumed!.platformId, + IS_GROUP: consumed.consumed!.isGroup, + ADMIN_USER_ID: consumed.consumed!.adminUserId ?? '', + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + emitStatus('PAIR_TELEGRAM', { + STATUS: 'failed', + CODE: record.code, + ERROR: message, + }); + process.exit(2); + } +} diff --git a/src/channels/telegram-pairing.test.ts b/src/channels/telegram-pairing.test.ts new file mode 100644 index 0000000..0af26b0 --- /dev/null +++ b/src/channels/telegram-pairing.test.ts @@ -0,0 +1,166 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import fs from 'fs'; +import path from 'path'; +import os from 'os'; + +vi.mock('../log.js', () => ({ log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() } })); + +import { + createPairing, + tryConsume, + getStatus, + waitForPairing, + extractCode, + extractAddressedText, + _setStorePathForTest, + _resetForTest, +} from './telegram-pairing.js'; + +let tmpDir: string; + +beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'tg-pair-')); + _setStorePathForTest(path.join(tmpDir, 'pairings.json')); +}); + +afterEach(() => { + _resetForTest(); + _setStorePathForTest(null); + fs.rmSync(tmpDir, { recursive: true, force: true }); +}); + +describe('extractAddressedText', () => { + it('strips @botname prefix', () => { + expect(extractAddressedText('@nanobot 1234', 'nanobot')).toBe('1234'); + }); + it('is case-insensitive', () => { + expect(extractAddressedText('@NanoBot hello', 'nanobot')).toBe('hello'); + }); + it('returns null when not addressed', () => { + expect(extractAddressedText('hello 1234', 'nanobot')).toBeNull(); + }); + it('returns null when address is mid-text', () => { + expect(extractAddressedText('hi @nanobot 1234', 'nanobot')).toBeNull(); + }); +}); + +describe('extractCode', () => { + it('finds 4-digit code after @botname', () => { + expect(extractCode('@nanobot 0042', 'nanobot')).toBe('0042'); + }); + it('rejects non-4-digit numbers', () => { + expect(extractCode('@nanobot 12345', 'nanobot')).toBeNull(); + expect(extractCode('@nanobot 12', 'nanobot')).toBeNull(); + }); + it('returns null without addressing', () => { + expect(extractCode('1234', 'nanobot')).toBeNull(); + }); +}); + +describe('createPairing', () => { + it('generates a 4-digit code with TTL', async () => { + const r = await createPairing('main', { ttlMs: 60_000 }); + expect(r.code).toMatch(/^\d{4}$/); + expect(r.status).toBe('pending'); + expect(Date.parse(r.expiresAt)).toBeGreaterThan(Date.now()); + }); + + it('does not collide with active codes', async () => { + const codes = new Set(); + for (let i = 0; i < 20; i++) { + const r = await createPairing('main'); + expect(codes.has(r.code)).toBe(false); + codes.add(r.code); + } + }); +}); + +describe('tryConsume', () => { + it('matches and marks consumed', async () => { + const r = await createPairing('main'); + const consumed = await tryConsume({ + text: `@nanobot ${r.code}`, + botUsername: 'nanobot', + platformId: 'telegram:123', + isGroup: false, + adminUserId: 'u1', + }); + expect(consumed).not.toBeNull(); + expect(consumed!.status).toBe('consumed'); + expect(consumed!.consumed?.platformId).toBe('telegram:123'); + expect(consumed!.consumed?.adminUserId).toBe('u1'); + expect(getStatus(r.code)).toBe('consumed'); + }); + + it('returns null on no match (silent drop)', async () => { + await createPairing('main'); + const out = await tryConsume({ + text: '@nanobot 9999', + botUsername: 'nanobot', + platformId: 'x', + isGroup: false, + }); + expect(out).toBeNull(); + }); + + it('returns null without @botname addressing', async () => { + const r = await createPairing('main'); + const out = await tryConsume({ + text: r.code, + botUsername: 'nanobot', + platformId: 'x', + isGroup: false, + }); + expect(out).toBeNull(); + }); + + it('cannot be consumed twice', async () => { + const r = await createPairing('main'); + await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false }); + const second = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false }); + expect(second).toBeNull(); + }); + + it('cannot consume an expired pairing', async () => { + const r = await createPairing('main', { ttlMs: 1 }); + await new Promise((res) => setTimeout(res, 10)); + const out = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false }); + expect(out).toBeNull(); + expect(getStatus(r.code)).toBe('expired'); + }); +}); + +describe('getStatus', () => { + it('returns unknown for missing codes', () => { + expect(getStatus('0000')).toBe('unknown'); + }); +}); + +describe('waitForPairing', () => { + it('resolves when consumed', async () => { + const r = await createPairing('main', { ttlMs: 5000 }); + const p = waitForPairing(r.code, { pollMs: 50 }); + setTimeout(() => { + tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'tg:1', isGroup: true, name: 'Group' }); + }, 100); + const consumed = await p; + expect(consumed.status).toBe('consumed'); + expect(consumed.consumed?.name).toBe('Group'); + }); + + it('rejects on expiry', async () => { + const r = await createPairing('main', { ttlMs: 100 }); + await expect(waitForPairing(r.code, { pollMs: 30 })).rejects.toThrow(/expired/); + }); +}); + +describe('intent passthrough', () => { + it('preserves wire-to and new-agent intents', async () => { + const a = await createPairing({ kind: 'wire-to', folder: 'work' }); + const b = await createPairing({ kind: 'new-agent', folder: 'side' }); + const ca = await tryConsume({ text: `@b ${a.code}`, botUsername: 'b', platformId: 'p1', isGroup: true }); + const cb = await tryConsume({ text: `@b ${b.code}`, botUsername: 'b', platformId: 'p2', isGroup: true }); + expect(ca!.intent).toEqual({ kind: 'wire-to', folder: 'work' }); + expect(cb!.intent).toEqual({ kind: 'new-agent', folder: 'side' }); + }); +}); diff --git a/src/channels/telegram-pairing.ts b/src/channels/telegram-pairing.ts new file mode 100644 index 0000000..7c3e194 --- /dev/null +++ b/src/channels/telegram-pairing.ts @@ -0,0 +1,276 @@ +/** + * Telegram pairing — proves the operator owns the chat they're registering. + * + * BotFather hands out tokens with no user binding, so anyone who guesses the + * bot's username can DM it. Pairing closes that gap: setup creates a one-time + * 4-digit code and the operator echoes it back as `@botname CODE` from the + * chat they want to register. The inbound interceptor in telegram.ts matches + * the code and records the chat (with admin_user_id) before it ever reaches + * the router. + * + * Storage is a JSON file at data/telegram-pairings.json — single-process, + * read-modify-write under an in-process mutex. + */ +import fs from 'fs'; +import path from 'path'; + +import { DATA_DIR } from '../config.js'; +import { log } from '../log.js'; + +export type PairingIntent = 'main' | { kind: 'wire-to'; folder: string } | { kind: 'new-agent'; folder: string }; +export type PairingStatus = 'pending' | 'consumed' | 'expired' | 'unknown'; + +export interface ConsumedDetails { + platformId: string; + isGroup: boolean; + name: string | null; + adminUserId: string | null; + consumedAt: string; +} + +export interface PairingRecord { + code: string; + intent: PairingIntent; + createdAt: string; + expiresAt: string; + status: Exclude; + consumed?: ConsumedDetails; +} + +interface Store { + pairings: PairingRecord[]; +} + +const DEFAULT_TTL_MS = 5 * 60 * 1000; +const FILE_NAME = 'telegram-pairings.json'; + +let storePathOverride: string | null = null; +export function _setStorePathForTest(p: string | null): void { + storePathOverride = p; +} + +function storePath(): string { + return storePathOverride ?? path.join(DATA_DIR, FILE_NAME); +} + +let mutex: Promise = Promise.resolve(); +function withLock(fn: () => Promise | T): Promise { + const next = mutex.then(() => fn()); + mutex = next.catch(() => {}); + return next; +} + +function readStore(): Store { + try { + const raw = fs.readFileSync(storePath(), 'utf8'); + const parsed = JSON.parse(raw) as Store; + if (!Array.isArray(parsed.pairings)) return { pairings: [] }; + return parsed; + } catch { + return { pairings: [] }; + } +} + +function writeStore(store: Store): void { + const p = storePath(); + fs.mkdirSync(path.dirname(p), { recursive: true }); + const tmp = `${p}.tmp`; + fs.writeFileSync(tmp, JSON.stringify(store, null, 2)); + fs.renameSync(tmp, p); +} + +function sweep(store: Store, now: number): boolean { + let changed = false; + for (const r of store.pairings) { + if (r.status === 'pending' && Date.parse(r.expiresAt) <= now) { + r.status = 'expired'; + changed = true; + } + } + return changed; +} + +function generateCode(active: Set): string { + // 4-digit numeric, zero-padded. 10k space, fine for one-at-a-time intents. + for (let i = 0; i < 50; i++) { + const code = Math.floor(Math.random() * 10000) + .toString() + .padStart(4, '0'); + if (!active.has(code)) return code; + } + throw new Error('Could not allocate a free pairing code (too many active).'); +} + +export interface CreatePairingOptions { + ttlMs?: number; +} + +export async function createPairing(intent: PairingIntent, opts: CreatePairingOptions = {}): Promise { + const ttl = opts.ttlMs ?? DEFAULT_TTL_MS; + return withLock(() => { + const store = readStore(); + sweep(store, Date.now()); + const active = new Set(store.pairings.filter((r) => r.status === 'pending').map((r) => r.code)); + const now = new Date(); + const record: PairingRecord = { + code: generateCode(active), + intent, + createdAt: now.toISOString(), + expiresAt: new Date(now.getTime() + ttl).toISOString(), + status: 'pending', + }; + store.pairings.push(record); + writeStore(store); + log.info('Pairing created', { code: record.code, intent, expiresAt: record.expiresAt }); + return record; + }); +} + +export interface ConsumeInput { + text: string; + botUsername: string; + platformId: string; + isGroup: boolean; + name?: string | null; + adminUserId?: string | null; +} + +/** Strip leading @botname and return the trimmed remainder, or null if not addressed. */ +export function extractAddressedText(text: string, botUsername: string): string | null { + const trimmed = text.trim(); + const re = new RegExp(`^@${botUsername.replace(/[.*+?^${}()|[\\]\\\\]/g, '\\$&')}\\b`, 'i'); + const m = trimmed.match(re); + if (!m) return null; + return trimmed.slice(m[0].length).trim(); +} + +/** Find a 4-digit code in `@botname CODE`-style text. Returns null if none. */ +export function extractCode(text: string, botUsername: string): string | null { + const remainder = extractAddressedText(text, botUsername); + if (remainder === null) return null; + const m = remainder.match(/\b(\d{4})\b/); + return m ? m[1] : null; +} + +/** + * Try to match an inbound message against a pending pairing. On match, + * marks the pairing consumed atomically and returns the record. Returns + * null on no match or expiry (silent drop). + */ +export async function tryConsume(input: ConsumeInput): Promise { + const code = extractCode(input.text, input.botUsername); + if (!code) return null; + return withLock(() => { + const store = readStore(); + const now = Date.now(); + sweep(store, now); + const record = store.pairings.find((r) => r.code === code && r.status === 'pending'); + if (!record) { + writeStore(store); + return null; + } + record.status = 'consumed'; + record.consumed = { + platformId: input.platformId, + isGroup: input.isGroup, + name: input.name ?? null, + adminUserId: input.adminUserId ?? null, + consumedAt: new Date(now).toISOString(), + }; + writeStore(store); + log.info('Pairing consumed', { code, platformId: input.platformId, intent: record.intent }); + return record; + }); +} + +export function getStatus(code: string): PairingStatus { + const store = readStore(); + sweep(store, Date.now()); + const r = store.pairings.find((p) => p.code === code); + if (!r) return 'unknown'; + return r.status; +} + +export function getPairing(code: string): PairingRecord | null { + const store = readStore(); + sweep(store, Date.now()); + return store.pairings.find((p) => p.code === code) ?? null; +} + +export interface WaitForPairingOptions { + /** Total time to wait. Defaults to the pairing's own TTL (read on each tick). */ + timeoutMs?: number; + /** Polling interval as a fallback when fs.watch misses an event. */ + pollMs?: number; +} + +/** + * Resolve when the pairing is consumed; reject when it expires or the timeout + * elapses. Uses fs.watch as the primary signal with a slow poll fallback — + * fs.watch is unreliable across rename-replace on some filesystems. + */ +export async function waitForPairing(code: string, opts: WaitForPairingOptions = {}): Promise { + const pollMs = opts.pollMs ?? 1000; + const start = Date.now(); + const initial = getPairing(code); + if (!initial) throw new Error(`Unknown pairing code: ${code}`); + const deadline = start + (opts.timeoutMs ?? Math.max(0, Date.parse(initial.expiresAt) - start)); + + return new Promise((resolve, reject) => { + let watcher: fs.FSWatcher | null = null; + let interval: NodeJS.Timeout | null = null; + let settled = false; + + const cleanup = () => { + settled = true; + if (watcher) + try { + watcher.close(); + } catch { + /* ignore */ + } + if (interval) clearInterval(interval); + }; + + const check = () => { + if (settled) return; + const r = getPairing(code); + if (!r) { + cleanup(); + reject(new Error(`Pairing ${code} disappeared`)); + return; + } + if (r.status === 'consumed') { + cleanup(); + resolve(r); + return; + } + if (r.status === 'expired' || Date.now() >= deadline) { + cleanup(); + reject(new Error(`Pairing ${code} expired`)); + return; + } + }; + + try { + const dir = path.dirname(storePath()); + fs.mkdirSync(dir, { recursive: true }); + watcher = fs.watch(dir, (_event, fname) => { + if (!fname || fname.toString().startsWith(path.basename(storePath()))) check(); + }); + } catch { + // fs.watch unsupported — poll-only is fine + } + interval = setInterval(check, pollMs); + check(); + }); +} + +/** Test helper — wipe the store. */ +export function _resetForTest(): void { + try { + fs.unlinkSync(storePath()); + } catch { + // ignore + } +} diff --git a/src/channels/telegram.ts b/src/channels/telegram.ts index 31bb197..eb99f8a 100644 --- a/src/channels/telegram.ts +++ b/src/channels/telegram.ts @@ -1,12 +1,17 @@ /** - * Telegram channel adapter (v2) — uses Chat SDK bridge. - * Self-registers on import. + * Telegram channel adapter (v2) — uses Chat SDK bridge, with a pairing + * interceptor wrapped around onInbound to verify chat ownership before + * registration. See telegram-pairing.ts for the why. */ import { createTelegramAdapter } from '@chat-adapter/telegram'; import { readEnvFile } from '../env.js'; +import { log } from '../log.js'; +import { createMessagingGroup, getMessagingGroupByPlatform, updateMessagingGroup } from '../db/messaging-groups.js'; import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js'; import { registerChannelAdapter } from './channel-registry.js'; +import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js'; +import { tryConsume } from './telegram-pairing.js'; // eslint-disable-next-line @typescript-eslint/no-explicit-any function extractReplyContext(raw: Record): ReplyContext | null { @@ -18,19 +23,130 @@ function extractReplyContext(raw: Record): ReplyContext | null { }; } +/** Look up the bot username via Telegram getMe. Cached after first call. */ +async function fetchBotUsername(token: string): Promise { + try { + const res = await fetch(`https://api.telegram.org/bot${token}/getMe`); + const json = (await res.json()) as { ok: boolean; result?: { username?: string } }; + return json.ok ? (json.result?.username ?? null) : null; + } catch (err) { + log.warn('Telegram getMe failed', { err }); + return null; + } +} + +function isGroupPlatformId(platformId: string): boolean { + // platformId is "telegram:". Negative chat IDs are groups/channels. + const id = platformId.split(':').pop() ?? ''; + return id.startsWith('-'); +} + +interface InboundFields { + text: string; + authorUserId: string | null; +} + +function readInboundFields(message: InboundMessage): InboundFields { + if (message.kind !== 'chat-sdk' || !message.content || typeof message.content !== 'object') { + return { text: '', authorUserId: null }; + } + const c = message.content as { text?: string; author?: { userId?: string } }; + return { text: c.text ?? '', authorUserId: c.author?.userId ?? null }; +} + +/** + * Build an onInbound interceptor that consumes pairing codes before they + * reach the router. On match: upserts messaging_groups with admin_user_id + * and short-circuits. On miss: forwards to the host. + */ +function createPairingInterceptor( + botUsernamePromise: Promise, + hostOnInbound: ChannelSetup['onInbound'], +): ChannelSetup['onInbound'] { + return (platformId, threadId, message) => { + void (async () => { + const botUsername = await botUsernamePromise; + if (!botUsername) { + hostOnInbound(platformId, threadId, message); + return; + } + const { text, authorUserId } = readInboundFields(message); + if (!text) { + hostOnInbound(platformId, threadId, message); + return; + } + const consumed = await tryConsume({ + text, + botUsername, + platformId, + isGroup: isGroupPlatformId(platformId), + adminUserId: authorUserId, + }); + if (!consumed) { + hostOnInbound(platformId, threadId, message); + return; + } + // Pairing matched — upsert the messaging_group with admin binding and + // short-circuit. Skip the router entirely so this code-bearing message + // never reaches an agent. + const existing = getMessagingGroupByPlatform('telegram', platformId); + if (existing) { + updateMessagingGroup(existing.id, { + admin_user_id: consumed.consumed!.adminUserId, + is_group: consumed.consumed!.isGroup ? 1 : 0, + }); + } else { + createMessagingGroup({ + id: `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + channel_type: 'telegram', + platform_id: platformId, + name: consumed.consumed!.name, + is_group: consumed.consumed!.isGroup ? 1 : 0, + admin_user_id: consumed.consumed!.adminUserId, + created_at: new Date().toISOString(), + }); + } + log.info('Telegram pairing accepted — chat registered', { + platformId, + adminUserId: consumed.consumed!.adminUserId, + intent: consumed.intent, + }); + })().catch((err) => { + log.error('Telegram pairing interceptor error', { err }); + // Fail open: pass through so a pairing bug doesn't break normal traffic. + hostOnInbound(platformId, threadId, message); + }); + }; +} + registerChannelAdapter('telegram', { factory: () => { const env = readEnvFile(['TELEGRAM_BOT_TOKEN']); if (!env.TELEGRAM_BOT_TOKEN) return null; + const token = env.TELEGRAM_BOT_TOKEN; const telegramAdapter = createTelegramAdapter({ - botToken: env.TELEGRAM_BOT_TOKEN, + botToken: token, mode: 'polling', }); - return createChatSdkBridge({ + const bridge = createChatSdkBridge({ adapter: telegramAdapter, concurrency: 'concurrent', extractReplyContext, supportsThreads: false, }); + + const botUsernamePromise = fetchBotUsername(token); + + const wrapped: ChannelAdapter = { + ...bridge, + async setup(hostConfig: ChannelSetup) { + const intercepted: ChannelSetup = { + ...hostConfig, + onInbound: createPairingInterceptor(botUsernamePromise, hostConfig.onInbound), + }; + return bridge.setup(intercepted); + }, + }; + return wrapped; }, });