diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 3f0e364..cd7b7e1 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -1,7 +1,7 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destinations.js'; import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js'; import { writeMessageOut } from './db/messages-out.js'; -import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; +import { touchHeartbeat, clearStaleProcessingAcks, getOutboundDb } from './db/connection.js'; import { getStoredSessionId, setStoredSessionId, clearStoredSessionId } from './db/session-state.js'; import { formatMessages, extractRouting, categorizeMessage, stripInternalTags, type RoutingContext } from './formatter.js'; import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js'; @@ -280,6 +280,17 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise let queryContinuation: string | undefined; let done = false; + // Track the outbound row count between result events. When the agent uses + // send_message (or any MCP tool that writes to messages_out) during a turn, + // the count grows. We pass that signal to dispatchResultText so it can tell + // the difference between "agent wrote text meant as the reply" (send the + // scratchpad) and "agent did explicit tool sends AND then emitted a trailing + // status line" (don't echo the status line back to the channel). + // + // Reset after each result dispatch so subsequent turns in the same query + // (follow-up messages pushed into the stream) are evaluated independently. + let outboundAtLastResult = getOutboundCount(); + // Concurrent polling: push follow-ups into the active query as they arrive. // We do NOT force-end the stream on silence — keeping the query open is // strictly cheaper than close+reopen (no cold prompt cache, no reconnect). @@ -323,7 +334,9 @@ async function processQuery(query: AgentQuery, routing: RoutingContext): Promise if (event.type === 'init') { queryContinuation = event.continuation; } else if (event.type === 'result' && event.text) { - dispatchResultText(event.text, routing); + const hasExplicitSends = getOutboundCount() > outboundAtLastResult; + dispatchResultText(event.text, routing, hasExplicitSends); + outboundAtLastResult = getOutboundCount(); } } } finally { @@ -363,7 +376,7 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void { * This preserves the simple case of one user on one channel — the agent * doesn't need to know about wrapping syntax at all. */ -function dispatchResultText(text: string, routing: RoutingContext): void { +function dispatchResultText(text: string, routing: RoutingContext, hasExplicitSends: boolean): void { const MESSAGE_RE = /([\s\S]*?)<\/message>/g; let match: RegExpExecArray | null; @@ -397,7 +410,15 @@ function dispatchResultText(text: string, routing: RoutingContext): void { // Single-destination shortcut: the agent wrote plain text — send to // the session's originating channel (from session_routing) if available, // otherwise fall back to the single destination. - if (sent === 0 && scratchpad) { + // + // If the agent already sent messages explicitly this turn (via send_message + // or another MCP tool that writes to outbound), treat trailing plain text as + // a status/summary line and DO NOT echo it back to the channel. Without this + // guard, task-driven flows like the onboarding /welcome cause duplicate + // delivery: the skill uses `send_message` to greet the new user, then the + // model emits "Welcome message sent." which used to be dispatched as a + // second chat message to the same recipient. + if (sent === 0 && scratchpad && !hasExplicitSends) { if (routing.channelType && routing.platformId) { // Reply to the channel/thread the message came from writeMessageOut({ @@ -422,11 +443,15 @@ function dispatchResultText(text: string, routing: RoutingContext): void { log(`[scratchpad] ${scratchpad.slice(0, 500)}${scratchpad.length > 500 ? '…' : ''}`); } - if (sent === 0 && text.trim()) { + if (sent === 0 && text.trim() && !hasExplicitSends) { log(`WARNING: agent output had no blocks — nothing was sent`); } } +function getOutboundCount(): number { + return (getOutboundDb().prepare('SELECT COUNT(*) AS c FROM messages_out').get() as { c: number }).c; +} + function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void { const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!; const channelType = dest.type === 'channel' ? dest.channelType! : 'agent'; diff --git a/src/modules/permissions/channel-approval.test.ts b/src/modules/permissions/channel-approval.test.ts index f3ea7e9..340a9ed 100644 --- a/src/modules/permissions/channel-approval.test.ts +++ b/src/modules/permissions/channel-approval.test.ts @@ -247,6 +247,87 @@ describe('unknown-channel registration flow', () => { expect(wakeContainer).toHaveBeenCalled(); }); + it('approve → seeds a /welcome onboarding task into the session', async () => { + const { routeInbound } = await import('../../router.js'); + const { getResponseHandlers } = await import('../../response-registry.js'); + + // `/start` is filtered by the agent-runner (Claude Code slash command), + // so without the seeded onboarding task a Telegram user's first DM would + // produce zero response. The seed ensures the agent runs /welcome regardless. + const startDm = { + channelType: 'telegram', + platformId: 'dm-new-friend', + threadId: null, + message: { + id: 'msg-start', + kind: 'chat' as const, + content: JSON.stringify({ senderId: 'friend', senderName: 'Friend', text: '/start' }), + timestamp: now(), + isMention: true, + }, + }; + await routeInbound(startDm); + await new Promise((r) => setTimeout(r, 10)); + + const { getDb } = await import('../../db/connection.js'); + const pending = getDb() + .prepare('SELECT messaging_group_id, agent_group_id FROM pending_channel_approvals') + .get() as { messaging_group_id: string; agent_group_id: string }; + + for (const handler of getResponseHandlers()) { + const claimed = await handler({ + questionId: pending.messaging_group_id, + value: 'approve', + userId: 'owner', + channelType: 'telegram', + platformId: 'dm-owner', + threadId: null, + }); + if (claimed) break; + } + + // Look up the session that got created, then open its inbound.db and + // confirm an onboarding task with a /welcome prompt landed before the + // replayed chat message. + const session = getDb() + .prepare('SELECT id FROM sessions WHERE agent_group_id = ? AND messaging_group_id = ?') + .get(pending.agent_group_id, pending.messaging_group_id) as { id: string } | undefined; + expect(session).toBeDefined(); + + const Database = (await import('better-sqlite3')).default; + const path = await import('path'); + const inboundPath = path.join(TEST_DIR, 'v2-sessions', pending.agent_group_id, session!.id, 'inbound.db'); + const inbound = new Database(inboundPath, { readonly: true }); + const rows = inbound + .prepare('SELECT kind, content, seq FROM messages_in ORDER BY seq') + .all() as { kind: string; content: string; seq: number }[]; + inbound.close(); + + const taskRow = rows.find((r) => r.kind === 'task'); + expect(taskRow).toBeDefined(); + const prompt: string = JSON.parse(taskRow!.content).prompt; + expect(prompt).toMatch(/\/welcome/); + // Prompt must name the new user — otherwise with multiple destinations + // configured the model may greet the owner instead of the new sender + // (see bug where "Hey Daniel!" landed in the owner's DM). + expect(prompt).toContain('Friend'); + expect(prompt).toContain('dm-new-friend'); + + // Prompt must pin the exact destination by its agent_destinations + // local_name. That name is auto-created by createMessagingGroupAgent + // above; look it up and assert it appears in the prompt verbatim. + const destRow = getDb() + .prepare('SELECT local_name FROM agent_destinations WHERE agent_group_id = ? AND target_id = ?') + .get(pending.agent_group_id, pending.messaging_group_id) as { local_name: string }; + expect(destRow).toBeDefined(); + expect(prompt).toContain(`send_message(to: '${destRow.local_name}'`); + + // Order: task seeded before the replayed /start chat message. + const chatRow = rows.find((r) => r.kind === 'chat'); + expect(chatRow).toBeDefined(); + expect(taskRow!.seq).toBeLessThan(chatRow!.seq); + }); + it('approve on a DM wires with pattern="." defaults', async () => { const { routeInbound } = await import('../../router.js'); const { getResponseHandlers } = await import('../../response-registry.js'); diff --git a/src/modules/permissions/index.ts b/src/modules/permissions/index.ts index 83390d8..a1cd03a 100644 --- a/src/modules/permissions/index.ts +++ b/src/modules/permissions/index.ts @@ -28,6 +28,7 @@ import { import type { InboundEvent } from '../../channels/adapter.js'; import { registerResponseHandler, type ResponsePayload } from '../../response-registry.js'; import { log } from '../../log.js'; +import { resolveSession, writeSessionMessage } from '../../session-manager.js'; import type { MessagingGroup, MessagingGroupAgent } from '../../types.js'; import { canAccessAgentGroup } from './access.js'; import { requestChannelApproval } from './channel-approval.js'; @@ -379,6 +380,75 @@ async function handleChannelApprovalResponse(payload: ResponsePayload): Promise< // path normally. deletePendingChannelApproval(row.messaging_group_id); + // Seed a /welcome onboarding task into the session *before* the replayed + // message so the agent greets the new user on first contact. Mirrors the + // owner-setup path (setup/register.ts). Without this, a Telegram user's + // default `/start` greeting gets silently dropped by the agent-runner's + // command filter and the first interaction produces nothing. + // + // The prompt pins the exact destination by name: createMessagingGroupAgent + // above auto-created an `agent_destinations` row pointing at this messaging + // group, and we look it up here. Without that, the agent — which already + // knows about the owner's DM and earlier friends' DMs as named destinations + // — tends to greet the owner (CLAUDE.md anchors on them). Passing the + // specific destination name removes the ambiguity entirely. + try { + const { session } = resolveSession(row.agent_group_id, row.messaging_group_id, event.threadId, 'shared'); + const parsed = safeParseContent(event.message.content); + const author = + parsed && typeof parsed === 'object' && 'author' in parsed && typeof (parsed as { author?: unknown }).author === 'object' + ? ((parsed as { author: Record }).author) + : undefined; + const senderName = + (typeof (parsed as { senderName?: unknown }).senderName === 'string' ? (parsed as { senderName: string }).senderName : undefined) ?? + (typeof (parsed as { sender?: unknown }).sender === 'string' ? (parsed as { sender: string }).sender : undefined) ?? + (typeof author?.fullName === 'string' ? (author.fullName as string) : undefined) ?? + (typeof author?.userName === 'string' ? (author.userName as string) : undefined) ?? + null; + const senderLabel = senderName ? `${senderName} (${event.platformId})` : event.platformId; + + // Pin the destination. Guarded behind hasTable in case the agent-to-agent + // module isn't installed — without it there are no named destinations at + // all, so the agent's send_message call falls through to session default + // routing (which is this new user). Either way, unambiguous. + let destinationClause: string; + const { hasTable, getDb } = await import('../../db/connection.js'); + if (hasTable(getDb(), 'agent_destinations')) { + const { getDestinationByTarget } = await import('../agent-to-agent/db/agent-destinations.js'); + const dest = getDestinationByTarget(row.agent_group_id, 'channel', row.messaging_group_id); + destinationClause = dest + ? `Send your welcome with send_message(to: '${dest.local_name}', text: ...) — that destination resolves to their DM.` + : `Reply using send_message with no \`to\` — the session's default routing points at their DM.`; + } else { + destinationClause = `Reply using send_message — it will land in their DM.`; + } + + writeSessionMessage(row.agent_group_id, session.id, { + id: `onboard-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'task', + timestamp: new Date().toISOString(), + platformId: event.platformId, + channelType: event.channelType, + content: JSON.stringify({ + prompt: + `A new ${event.channelType} user — ${senderLabel} — just started a conversation. ` + + `Run /welcome to introduce yourself to them. ${destinationClause}`, + }), + }); + log.info('Onboarding message seeded after channel approval', { + sessionId: session.id, + messagingGroupId: row.messaging_group_id, + agentGroupId: row.agent_group_id, + senderName, + }); + } catch (err) { + // Don't block the replay if onboarding write fails. + log.error('Failed to seed onboarding message after channel approval', { + messagingGroupId: row.messaging_group_id, + err, + }); + } + try { await routeInbound(event); } catch (err) {