feat(telegram): self-contained pairing for chat ownership verification

BotFather issues bot tokens with no user binding, so anyone who guesses the
bot's username can DM it and get registered as a channel. Pairing closes that
gap: setup issues a one-time 4-digit code, the operator echoes it back from
the chat they want to register, and the inbound interceptor binds
admin_user_id before the message reaches the router.

- src/channels/telegram-pairing.ts: JSON-backed store with createPairing,
  tryConsume, getStatus, waitForPairing (fs.watch + poll fallback)
- src/channels/telegram.ts: wraps bridge.setup with an onInbound interceptor
  that consumes pairing codes and upserts messaging_groups
- setup/pair-telegram.ts: CLI step issues a code and waits up to 5 min for
  the operator to echo it back, emitting PLATFORM_ID/IS_GROUP/ADMIN_USER_ID
- Skill docs: /setup reorders mounts -> service -> wire (pairing needs a
  live polling adapter); /manage-channels and /add-telegram-v2 use pairing
  instead of asking the user to discover chat IDs

All other channels still bind admin via install-time identity (OAuth/QR/token);
pairing is Telegram-only. The bridge, router, and other adapters are untouched.
This commit is contained in:
Koshkoshinsk
2026-04-13 12:27:02 +00:00
parent af13c23a5a
commit 2017589683
8 changed files with 679 additions and 23 deletions

View File

@@ -0,0 +1,166 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import fs from 'fs';
import path from 'path';
import os from 'os';
vi.mock('../log.js', () => ({ log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() } }));
import {
createPairing,
tryConsume,
getStatus,
waitForPairing,
extractCode,
extractAddressedText,
_setStorePathForTest,
_resetForTest,
} from './telegram-pairing.js';
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'tg-pair-'));
_setStorePathForTest(path.join(tmpDir, 'pairings.json'));
});
afterEach(() => {
_resetForTest();
_setStorePathForTest(null);
fs.rmSync(tmpDir, { recursive: true, force: true });
});
describe('extractAddressedText', () => {
it('strips @botname prefix', () => {
expect(extractAddressedText('@nanobot 1234', 'nanobot')).toBe('1234');
});
it('is case-insensitive', () => {
expect(extractAddressedText('@NanoBot hello', 'nanobot')).toBe('hello');
});
it('returns null when not addressed', () => {
expect(extractAddressedText('hello 1234', 'nanobot')).toBeNull();
});
it('returns null when address is mid-text', () => {
expect(extractAddressedText('hi @nanobot 1234', 'nanobot')).toBeNull();
});
});
describe('extractCode', () => {
it('finds 4-digit code after @botname', () => {
expect(extractCode('@nanobot 0042', 'nanobot')).toBe('0042');
});
it('rejects non-4-digit numbers', () => {
expect(extractCode('@nanobot 12345', 'nanobot')).toBeNull();
expect(extractCode('@nanobot 12', 'nanobot')).toBeNull();
});
it('returns null without addressing', () => {
expect(extractCode('1234', 'nanobot')).toBeNull();
});
});
describe('createPairing', () => {
it('generates a 4-digit code with TTL', async () => {
const r = await createPairing('main', { ttlMs: 60_000 });
expect(r.code).toMatch(/^\d{4}$/);
expect(r.status).toBe('pending');
expect(Date.parse(r.expiresAt)).toBeGreaterThan(Date.now());
});
it('does not collide with active codes', async () => {
const codes = new Set<string>();
for (let i = 0; i < 20; i++) {
const r = await createPairing('main');
expect(codes.has(r.code)).toBe(false);
codes.add(r.code);
}
});
});
describe('tryConsume', () => {
it('matches and marks consumed', async () => {
const r = await createPairing('main');
const consumed = await tryConsume({
text: `@nanobot ${r.code}`,
botUsername: 'nanobot',
platformId: 'telegram:123',
isGroup: false,
adminUserId: 'u1',
});
expect(consumed).not.toBeNull();
expect(consumed!.status).toBe('consumed');
expect(consumed!.consumed?.platformId).toBe('telegram:123');
expect(consumed!.consumed?.adminUserId).toBe('u1');
expect(getStatus(r.code)).toBe('consumed');
});
it('returns null on no match (silent drop)', async () => {
await createPairing('main');
const out = await tryConsume({
text: '@nanobot 9999',
botUsername: 'nanobot',
platformId: 'x',
isGroup: false,
});
expect(out).toBeNull();
});
it('returns null without @botname addressing', async () => {
const r = await createPairing('main');
const out = await tryConsume({
text: r.code,
botUsername: 'nanobot',
platformId: 'x',
isGroup: false,
});
expect(out).toBeNull();
});
it('cannot be consumed twice', async () => {
const r = await createPairing('main');
await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
const second = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
expect(second).toBeNull();
});
it('cannot consume an expired pairing', async () => {
const r = await createPairing('main', { ttlMs: 1 });
await new Promise((res) => setTimeout(res, 10));
const out = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
expect(out).toBeNull();
expect(getStatus(r.code)).toBe('expired');
});
});
describe('getStatus', () => {
it('returns unknown for missing codes', () => {
expect(getStatus('0000')).toBe('unknown');
});
});
describe('waitForPairing', () => {
it('resolves when consumed', async () => {
const r = await createPairing('main', { ttlMs: 5000 });
const p = waitForPairing(r.code, { pollMs: 50 });
setTimeout(() => {
tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'tg:1', isGroup: true, name: 'Group' });
}, 100);
const consumed = await p;
expect(consumed.status).toBe('consumed');
expect(consumed.consumed?.name).toBe('Group');
});
it('rejects on expiry', async () => {
const r = await createPairing('main', { ttlMs: 100 });
await expect(waitForPairing(r.code, { pollMs: 30 })).rejects.toThrow(/expired/);
});
});
describe('intent passthrough', () => {
it('preserves wire-to and new-agent intents', async () => {
const a = await createPairing({ kind: 'wire-to', folder: 'work' });
const b = await createPairing({ kind: 'new-agent', folder: 'side' });
const ca = await tryConsume({ text: `@b ${a.code}`, botUsername: 'b', platformId: 'p1', isGroup: true });
const cb = await tryConsume({ text: `@b ${b.code}`, botUsername: 'b', platformId: 'p2', isGroup: true });
expect(ca!.intent).toEqual({ kind: 'wire-to', folder: 'work' });
expect(cb!.intent).toEqual({ kind: 'new-agent', folder: 'side' });
});
});

View File

@@ -0,0 +1,276 @@
/**
* Telegram pairing — proves the operator owns the chat they're registering.
*
* BotFather hands out tokens with no user binding, so anyone who guesses the
* bot's username can DM it. Pairing closes that gap: setup creates a one-time
* 4-digit code and the operator echoes it back as `@botname CODE` from the
* chat they want to register. The inbound interceptor in telegram.ts matches
* the code and records the chat (with admin_user_id) before it ever reaches
* the router.
*
* Storage is a JSON file at data/telegram-pairings.json — single-process,
* read-modify-write under an in-process mutex.
*/
import fs from 'fs';
import path from 'path';
import { DATA_DIR } from '../config.js';
import { log } from '../log.js';
export type PairingIntent = 'main' | { kind: 'wire-to'; folder: string } | { kind: 'new-agent'; folder: string };
export type PairingStatus = 'pending' | 'consumed' | 'expired' | 'unknown';
export interface ConsumedDetails {
platformId: string;
isGroup: boolean;
name: string | null;
adminUserId: string | null;
consumedAt: string;
}
export interface PairingRecord {
code: string;
intent: PairingIntent;
createdAt: string;
expiresAt: string;
status: Exclude<PairingStatus, 'unknown'>;
consumed?: ConsumedDetails;
}
interface Store {
pairings: PairingRecord[];
}
const DEFAULT_TTL_MS = 5 * 60 * 1000;
const FILE_NAME = 'telegram-pairings.json';
let storePathOverride: string | null = null;
export function _setStorePathForTest(p: string | null): void {
storePathOverride = p;
}
function storePath(): string {
return storePathOverride ?? path.join(DATA_DIR, FILE_NAME);
}
let mutex: Promise<unknown> = Promise.resolve();
function withLock<T>(fn: () => Promise<T> | T): Promise<T> {
const next = mutex.then(() => fn());
mutex = next.catch(() => {});
return next;
}
function readStore(): Store {
try {
const raw = fs.readFileSync(storePath(), 'utf8');
const parsed = JSON.parse(raw) as Store;
if (!Array.isArray(parsed.pairings)) return { pairings: [] };
return parsed;
} catch {
return { pairings: [] };
}
}
function writeStore(store: Store): void {
const p = storePath();
fs.mkdirSync(path.dirname(p), { recursive: true });
const tmp = `${p}.tmp`;
fs.writeFileSync(tmp, JSON.stringify(store, null, 2));
fs.renameSync(tmp, p);
}
function sweep(store: Store, now: number): boolean {
let changed = false;
for (const r of store.pairings) {
if (r.status === 'pending' && Date.parse(r.expiresAt) <= now) {
r.status = 'expired';
changed = true;
}
}
return changed;
}
function generateCode(active: Set<string>): string {
// 4-digit numeric, zero-padded. 10k space, fine for one-at-a-time intents.
for (let i = 0; i < 50; i++) {
const code = Math.floor(Math.random() * 10000)
.toString()
.padStart(4, '0');
if (!active.has(code)) return code;
}
throw new Error('Could not allocate a free pairing code (too many active).');
}
export interface CreatePairingOptions {
ttlMs?: number;
}
export async function createPairing(intent: PairingIntent, opts: CreatePairingOptions = {}): Promise<PairingRecord> {
const ttl = opts.ttlMs ?? DEFAULT_TTL_MS;
return withLock(() => {
const store = readStore();
sweep(store, Date.now());
const active = new Set(store.pairings.filter((r) => r.status === 'pending').map((r) => r.code));
const now = new Date();
const record: PairingRecord = {
code: generateCode(active),
intent,
createdAt: now.toISOString(),
expiresAt: new Date(now.getTime() + ttl).toISOString(),
status: 'pending',
};
store.pairings.push(record);
writeStore(store);
log.info('Pairing created', { code: record.code, intent, expiresAt: record.expiresAt });
return record;
});
}
export interface ConsumeInput {
text: string;
botUsername: string;
platformId: string;
isGroup: boolean;
name?: string | null;
adminUserId?: string | null;
}
/** Strip leading @botname and return the trimmed remainder, or null if not addressed. */
export function extractAddressedText(text: string, botUsername: string): string | null {
const trimmed = text.trim();
const re = new RegExp(`^@${botUsername.replace(/[.*+?^${}()|[\\]\\\\]/g, '\\$&')}\\b`, 'i');
const m = trimmed.match(re);
if (!m) return null;
return trimmed.slice(m[0].length).trim();
}
/** Find a 4-digit code in `@botname CODE`-style text. Returns null if none. */
export function extractCode(text: string, botUsername: string): string | null {
const remainder = extractAddressedText(text, botUsername);
if (remainder === null) return null;
const m = remainder.match(/\b(\d{4})\b/);
return m ? m[1] : null;
}
/**
* Try to match an inbound message against a pending pairing. On match,
* marks the pairing consumed atomically and returns the record. Returns
* null on no match or expiry (silent drop).
*/
export async function tryConsume(input: ConsumeInput): Promise<PairingRecord | null> {
const code = extractCode(input.text, input.botUsername);
if (!code) return null;
return withLock(() => {
const store = readStore();
const now = Date.now();
sweep(store, now);
const record = store.pairings.find((r) => r.code === code && r.status === 'pending');
if (!record) {
writeStore(store);
return null;
}
record.status = 'consumed';
record.consumed = {
platformId: input.platformId,
isGroup: input.isGroup,
name: input.name ?? null,
adminUserId: input.adminUserId ?? null,
consumedAt: new Date(now).toISOString(),
};
writeStore(store);
log.info('Pairing consumed', { code, platformId: input.platformId, intent: record.intent });
return record;
});
}
export function getStatus(code: string): PairingStatus {
const store = readStore();
sweep(store, Date.now());
const r = store.pairings.find((p) => p.code === code);
if (!r) return 'unknown';
return r.status;
}
export function getPairing(code: string): PairingRecord | null {
const store = readStore();
sweep(store, Date.now());
return store.pairings.find((p) => p.code === code) ?? null;
}
export interface WaitForPairingOptions {
/** Total time to wait. Defaults to the pairing's own TTL (read on each tick). */
timeoutMs?: number;
/** Polling interval as a fallback when fs.watch misses an event. */
pollMs?: number;
}
/**
* Resolve when the pairing is consumed; reject when it expires or the timeout
* elapses. Uses fs.watch as the primary signal with a slow poll fallback —
* fs.watch is unreliable across rename-replace on some filesystems.
*/
export async function waitForPairing(code: string, opts: WaitForPairingOptions = {}): Promise<PairingRecord> {
const pollMs = opts.pollMs ?? 1000;
const start = Date.now();
const initial = getPairing(code);
if (!initial) throw new Error(`Unknown pairing code: ${code}`);
const deadline = start + (opts.timeoutMs ?? Math.max(0, Date.parse(initial.expiresAt) - start));
return new Promise<PairingRecord>((resolve, reject) => {
let watcher: fs.FSWatcher | null = null;
let interval: NodeJS.Timeout | null = null;
let settled = false;
const cleanup = () => {
settled = true;
if (watcher)
try {
watcher.close();
} catch {
/* ignore */
}
if (interval) clearInterval(interval);
};
const check = () => {
if (settled) return;
const r = getPairing(code);
if (!r) {
cleanup();
reject(new Error(`Pairing ${code} disappeared`));
return;
}
if (r.status === 'consumed') {
cleanup();
resolve(r);
return;
}
if (r.status === 'expired' || Date.now() >= deadline) {
cleanup();
reject(new Error(`Pairing ${code} expired`));
return;
}
};
try {
const dir = path.dirname(storePath());
fs.mkdirSync(dir, { recursive: true });
watcher = fs.watch(dir, (_event, fname) => {
if (!fname || fname.toString().startsWith(path.basename(storePath()))) check();
});
} catch {
// fs.watch unsupported — poll-only is fine
}
interval = setInterval(check, pollMs);
check();
});
}
/** Test helper — wipe the store. */
export function _resetForTest(): void {
try {
fs.unlinkSync(storePath());
} catch {
// ignore
}
}

View File

@@ -1,12 +1,17 @@
/**
* Telegram channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
* Telegram channel adapter (v2) — uses Chat SDK bridge, with a pairing
* interceptor wrapped around onInbound to verify chat ownership before
* registration. See telegram-pairing.ts for the why.
*/
import { createTelegramAdapter } from '@chat-adapter/telegram';
import { readEnvFile } from '../env.js';
import { log } from '../log.js';
import { createMessagingGroup, getMessagingGroupByPlatform, updateMessagingGroup } from '../db/messaging-groups.js';
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js';
import { tryConsume } from './telegram-pairing.js';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
@@ -18,19 +23,130 @@ function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
};
}
/** Look up the bot username via Telegram getMe. Cached after first call. */
async function fetchBotUsername(token: string): Promise<string | null> {
try {
const res = await fetch(`https://api.telegram.org/bot${token}/getMe`);
const json = (await res.json()) as { ok: boolean; result?: { username?: string } };
return json.ok ? (json.result?.username ?? null) : null;
} catch (err) {
log.warn('Telegram getMe failed', { err });
return null;
}
}
function isGroupPlatformId(platformId: string): boolean {
// platformId is "telegram:<chatId>". Negative chat IDs are groups/channels.
const id = platformId.split(':').pop() ?? '';
return id.startsWith('-');
}
interface InboundFields {
text: string;
authorUserId: string | null;
}
function readInboundFields(message: InboundMessage): InboundFields {
if (message.kind !== 'chat-sdk' || !message.content || typeof message.content !== 'object') {
return { text: '', authorUserId: null };
}
const c = message.content as { text?: string; author?: { userId?: string } };
return { text: c.text ?? '', authorUserId: c.author?.userId ?? null };
}
/**
* Build an onInbound interceptor that consumes pairing codes before they
* reach the router. On match: upserts messaging_groups with admin_user_id
* and short-circuits. On miss: forwards to the host.
*/
function createPairingInterceptor(
botUsernamePromise: Promise<string | null>,
hostOnInbound: ChannelSetup['onInbound'],
): ChannelSetup['onInbound'] {
return (platformId, threadId, message) => {
void (async () => {
const botUsername = await botUsernamePromise;
if (!botUsername) {
hostOnInbound(platformId, threadId, message);
return;
}
const { text, authorUserId } = readInboundFields(message);
if (!text) {
hostOnInbound(platformId, threadId, message);
return;
}
const consumed = await tryConsume({
text,
botUsername,
platformId,
isGroup: isGroupPlatformId(platformId),
adminUserId: authorUserId,
});
if (!consumed) {
hostOnInbound(platformId, threadId, message);
return;
}
// Pairing matched — upsert the messaging_group with admin binding and
// short-circuit. Skip the router entirely so this code-bearing message
// never reaches an agent.
const existing = getMessagingGroupByPlatform('telegram', platformId);
if (existing) {
updateMessagingGroup(existing.id, {
admin_user_id: consumed.consumed!.adminUserId,
is_group: consumed.consumed!.isGroup ? 1 : 0,
});
} else {
createMessagingGroup({
id: `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
channel_type: 'telegram',
platform_id: platformId,
name: consumed.consumed!.name,
is_group: consumed.consumed!.isGroup ? 1 : 0,
admin_user_id: consumed.consumed!.adminUserId,
created_at: new Date().toISOString(),
});
}
log.info('Telegram pairing accepted — chat registered', {
platformId,
adminUserId: consumed.consumed!.adminUserId,
intent: consumed.intent,
});
})().catch((err) => {
log.error('Telegram pairing interceptor error', { err });
// Fail open: pass through so a pairing bug doesn't break normal traffic.
hostOnInbound(platformId, threadId, message);
});
};
}
registerChannelAdapter('telegram', {
factory: () => {
const env = readEnvFile(['TELEGRAM_BOT_TOKEN']);
if (!env.TELEGRAM_BOT_TOKEN) return null;
const token = env.TELEGRAM_BOT_TOKEN;
const telegramAdapter = createTelegramAdapter({
botToken: env.TELEGRAM_BOT_TOKEN,
botToken: token,
mode: 'polling',
});
return createChatSdkBridge({
const bridge = createChatSdkBridge({
adapter: telegramAdapter,
concurrency: 'concurrent',
extractReplyContext,
supportsThreads: false,
});
const botUsernamePromise = fetchBotUsername(token);
const wrapped: ChannelAdapter = {
...bridge,
async setup(hostConfig: ChannelSetup) {
const intercepted: ChannelSetup = {
...hostConfig,
onInbound: createPairingInterceptor(botUsernamePromise, hostConfig.onInbound),
};
return bridge.setup(intercepted);
},
};
return wrapped;
},
});