Merge branch 'main' into feat/migrate-from-v1
This commit is contained in:
@@ -28,11 +28,37 @@ let _inbound: Database | null = null;
|
||||
let _outbound: Database | null = null;
|
||||
let _heartbeatPath: string = DEFAULT_HEARTBEAT_PATH;
|
||||
|
||||
/** Inbound DB — container opens read-only (host is the sole writer). */
|
||||
/**
|
||||
* Avoid all cached db reads; open inbound.db read-only with mmap and page cache disabled.
|
||||
*
|
||||
* Use this (not getInboundDb) for readers that need to see host-written rows
|
||||
* promptly — e.g. messages_in polling. Caller must .close() the returned
|
||||
* connection (try/finally).
|
||||
*
|
||||
* Needed for mounts where host writes don't reliably invalidate
|
||||
* SQLite's caches: virtiofs (Colima, Lima, Podman Machine, Apple
|
||||
* Container), NFS.
|
||||
*
|
||||
* Cost is microseconds per query, so safe for universal use.
|
||||
*/
|
||||
export function openInboundDb(): Database {
|
||||
const db = new Database(DEFAULT_INBOUND_PATH, { readonly: true });
|
||||
db.exec('PRAGMA busy_timeout = 5000');
|
||||
db.exec('PRAGMA mmap_size = 0');
|
||||
return db;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inbound DB — long-lived singleton, OK for tables the host writes once
|
||||
* at spawn and never again (destinations, session_routing). For
|
||||
* messages_in polling — where the host writes continuously and a stale
|
||||
* view causes the pollHandle hang — use `openInboundDb()` instead.
|
||||
*/
|
||||
export function getInboundDb(): Database {
|
||||
if (!_inbound) {
|
||||
_inbound = new Database(DEFAULT_INBOUND_PATH, { readonly: true });
|
||||
_inbound.exec('PRAGMA busy_timeout = 5000');
|
||||
_inbound.exec('PRAGMA mmap_size = 0');
|
||||
}
|
||||
return _inbound;
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
* processing_ack. The host reads processing_ack to sync message lifecycle.
|
||||
*/
|
||||
import { getConfig } from '../config.js';
|
||||
import { getInboundDb, getOutboundDb } from './connection.js';
|
||||
import { openInboundDb, getOutboundDb } from './connection.js';
|
||||
|
||||
export interface MessageInRow {
|
||||
id: string;
|
||||
@@ -50,31 +50,35 @@ function getMaxMessagesPerPrompt(): number {
|
||||
* trigger=1 separately (see src/db/session-db.ts).
|
||||
*/
|
||||
export function getPendingMessages(): MessageInRow[] {
|
||||
const inbound = getInboundDb();
|
||||
const inbound = openInboundDb();
|
||||
const outbound = getOutboundDb();
|
||||
|
||||
const pending = inbound
|
||||
.prepare(
|
||||
`SELECT * FROM messages_in
|
||||
WHERE status = 'pending'
|
||||
AND (process_after IS NULL OR datetime(process_after) <= datetime('now'))
|
||||
ORDER BY seq DESC
|
||||
LIMIT ?`,
|
||||
)
|
||||
.all(getMaxMessagesPerPrompt()) as MessageInRow[];
|
||||
try {
|
||||
const pending = inbound
|
||||
.prepare(
|
||||
`SELECT * FROM messages_in
|
||||
WHERE status = 'pending'
|
||||
AND (process_after IS NULL OR datetime(process_after) <= datetime('now'))
|
||||
ORDER BY seq DESC
|
||||
LIMIT ?`,
|
||||
)
|
||||
.all(getMaxMessagesPerPrompt()) as MessageInRow[];
|
||||
|
||||
if (pending.length === 0) return [];
|
||||
if (pending.length === 0) return [];
|
||||
|
||||
// Filter out messages already acknowledged in outbound.db
|
||||
const ackedIds = new Set(
|
||||
(outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map(
|
||||
(r) => r.message_id,
|
||||
),
|
||||
);
|
||||
// Filter out messages already acknowledged in outbound.db
|
||||
const ackedIds = new Set(
|
||||
(outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map(
|
||||
(r) => r.message_id,
|
||||
),
|
||||
);
|
||||
|
||||
// Reverse: we fetched DESC to take the most recent N, but the agent
|
||||
// should see them in chronological order (oldest first).
|
||||
return pending.filter((m) => !ackedIds.has(m.id)).reverse();
|
||||
// Reverse: we fetched DESC to take the most recent N, but the agent
|
||||
// should see them in chronological order (oldest first).
|
||||
return pending.filter((m) => !ackedIds.has(m.id)).reverse();
|
||||
} finally {
|
||||
inbound.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Mark messages as processing — writes to processing_ack in outbound.db. */
|
||||
@@ -112,7 +116,12 @@ export function markFailed(id: string): void {
|
||||
|
||||
/** Get a message by ID (read from inbound.db). */
|
||||
export function getMessageIn(id: string): MessageInRow | undefined {
|
||||
return getInboundDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined;
|
||||
const inbound = openInboundDb();
|
||||
try {
|
||||
return inbound.prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined;
|
||||
} finally {
|
||||
inbound.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -120,19 +129,23 @@ export function getMessageIn(id: string): MessageInRow | undefined {
|
||||
* Reads from inbound.db, checks processing_ack to skip already-handled responses.
|
||||
*/
|
||||
export function findQuestionResponse(questionId: string): MessageInRow | undefined {
|
||||
const inbound = getInboundDb();
|
||||
const inbound = openInboundDb();
|
||||
const outbound = getOutboundDb();
|
||||
|
||||
const response = inbound
|
||||
.prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?")
|
||||
.get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined;
|
||||
try {
|
||||
const response = inbound
|
||||
.prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?")
|
||||
.get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined;
|
||||
|
||||
if (!response) return undefined;
|
||||
if (!response) return undefined;
|
||||
|
||||
// Check it hasn't been acked already
|
||||
const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id);
|
||||
if (acked) return undefined;
|
||||
// Check it hasn't been acked already
|
||||
const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id);
|
||||
if (acked) return undefined;
|
||||
|
||||
return response;
|
||||
return response;
|
||||
} finally {
|
||||
inbound.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "nanoclaw",
|
||||
"version": "2.0.23",
|
||||
"version": "2.0.25",
|
||||
"description": "Personal Claude assistant. Lightweight, secure, customizable.",
|
||||
"type": "module",
|
||||
"packageManager": "pnpm@10.33.0",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="90" height="20" role="img" aria-label="139k tokens, 69% of context window">
|
||||
<title>139k tokens, 69% of context window</title>
|
||||
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="90" height="20" role="img" aria-label="140k tokens, 70% of context window">
|
||||
<title>140k tokens, 70% of context window</title>
|
||||
<linearGradient id="s" x2="0" y2="100%">
|
||||
<stop offset="0" stop-color="#bbb" stop-opacity=".1"/>
|
||||
<stop offset="1" stop-opacity=".1"/>
|
||||
@@ -10,13 +10,13 @@
|
||||
<a xlink:href="https://github.com/qwibitai/nanoclaw/tree/main/repo-tokens">
|
||||
<g clip-path="url(#r)">
|
||||
<rect width="52" height="20" fill="#555"/>
|
||||
<rect x="52" width="38" height="20" fill="#dfb317"/>
|
||||
<rect x="52" width="38" height="20" fill="#e05d44"/>
|
||||
<rect width="90" height="20" fill="url(#s)"/>
|
||||
<g fill="#fff" text-anchor="middle" font-family="Verdana,Geneva,DejaVu Sans,sans-serif" font-size="11">
|
||||
<text aria-hidden="true" x="26" y="15" fill="#010101" fill-opacity=".3">tokens</text>
|
||||
<text x="26" y="14">tokens</text>
|
||||
<text aria-hidden="true" x="71" y="15" fill="#010101" fill-opacity=".3">139k</text>
|
||||
<text x="71" y="14">139k</text>
|
||||
<text aria-hidden="true" x="71" y="15" fill="#010101" fill-opacity=".3">140k</text>
|
||||
<text x="71" y="14">140k</text>
|
||||
</g>
|
||||
</g>
|
||||
</a>
|
||||
|
||||
|
Before Width: | Height: | Size: 1.1 KiB After Width: | Height: | Size: 1.1 KiB |
@@ -495,14 +495,6 @@ async function main(): Promise<void> {
|
||||
6,
|
||||
),
|
||||
);
|
||||
} else {
|
||||
const agentPing = res.terminal?.fields.AGENT_PING;
|
||||
if (agentPing && agentPing !== 'ok' && agentPing !== 'skipped') {
|
||||
notes.push(
|
||||
"• Your assistant didn't reply to a test message. " +
|
||||
'Check `logs/nanoclaw.log` for clues, then try `pnpm run chat hi`.',
|
||||
);
|
||||
}
|
||||
}
|
||||
if (!res.terminal?.fields.CONFIGURED_CHANNELS) {
|
||||
notes.push(
|
||||
@@ -522,7 +514,6 @@ async function main(): Promise<void> {
|
||||
unresolved_count: notes.length,
|
||||
service_running: res.terminal?.fields.SERVICE === 'running',
|
||||
has_credentials: res.terminal?.fields.CREDENTIALS === 'configured',
|
||||
agent_responds: res.terminal?.fields.AGENT_PING === 'ok',
|
||||
});
|
||||
await offerClaudeAssist({
|
||||
stepName: 'verify',
|
||||
@@ -781,15 +772,25 @@ async function runPasteAuth(method: 'oauth' | 'api'): Promise<void> {
|
||||
message: `Paste your ${label}`,
|
||||
clearOnError: true,
|
||||
validate: (v) => {
|
||||
if (!v || !v.trim()) return 'Required';
|
||||
if (!v.trim().startsWith(prefix)) {
|
||||
// Strip any internal whitespace so a line-wrapped paste that did
|
||||
// survive into clack can still validate. The mid-token-newline
|
||||
// case where clack only sees the first line is caught by the
|
||||
// shape check below.
|
||||
const cleaned = (v ?? '').replace(/\s+/g, '');
|
||||
if (!cleaned) return 'Required';
|
||||
if (!cleaned.startsWith(prefix)) {
|
||||
return `Should start with ${prefix}…`;
|
||||
}
|
||||
if (method === 'oauth' && !/^sk-ant-oat[A-Za-z0-9_-]{80,500}AA$/.test(cleaned)) {
|
||||
return cleaned.length < 90
|
||||
? 'Token looks truncated — line breaks in the paste can cut it off. Widen your terminal so the token fits on one line, then paste again.'
|
||||
: "Token shape doesn't look right (expected sk-ant-oat…AA).";
|
||||
}
|
||||
return undefined;
|
||||
},
|
||||
}),
|
||||
);
|
||||
const token = (answer as string).trim();
|
||||
const token = (answer as string).replace(/\s+/g, '');
|
||||
|
||||
const res = await runQuietChild(
|
||||
'auth',
|
||||
|
||||
@@ -20,6 +20,15 @@ describe('classifyPingResult', () => {
|
||||
expect(classifyPingResult(1, '', 'Authentication error')).toBe('auth_error');
|
||||
});
|
||||
|
||||
it('detects Claude Code login banners printed as a chat reply', () => {
|
||||
expect(
|
||||
classifyPingResult(0, 'Invalid API key · Please run /login'),
|
||||
).toBe('auth_error');
|
||||
expect(
|
||||
classifyPingResult(0, 'Not logged in · Please run /login'),
|
||||
).toBe('auth_error');
|
||||
});
|
||||
|
||||
it('preserves socket errors', () => {
|
||||
expect(classifyPingResult(2, '')).toBe('socket_error');
|
||||
});
|
||||
|
||||
@@ -20,7 +20,10 @@ export function classifyPingResult(exitCode: number | null, stdout: string, stde
|
||||
if (
|
||||
/Invalid bearer token/i.test(output) ||
|
||||
/authentication[_ ]error/i.test(output) ||
|
||||
/Failed to authenticate/i.test(output)
|
||||
/Failed to authenticate/i.test(output) ||
|
||||
/Please run \/login/i.test(output) ||
|
||||
/Not logged in/i.test(output) ||
|
||||
/Invalid API key/i.test(output)
|
||||
) {
|
||||
return 'auth_error';
|
||||
}
|
||||
|
||||
@@ -5,45 +5,14 @@ import { determineVerifyStatus } from './verify.js';
|
||||
const healthyBase = {
|
||||
service: 'running' as const,
|
||||
credentials: 'configured',
|
||||
anyChannelConfigured: false,
|
||||
registeredGroups: 1,
|
||||
agentPing: 'ok' as const,
|
||||
};
|
||||
|
||||
describe('determineVerifyStatus', () => {
|
||||
it('accepts a working CLI-only install', () => {
|
||||
it('accepts a healthy install with at least one wired agent group', () => {
|
||||
expect(determineVerifyStatus(healthyBase)).toBe('success');
|
||||
});
|
||||
|
||||
it('accepts a messaging-channel install when CLI ping is skipped', () => {
|
||||
expect(
|
||||
determineVerifyStatus({
|
||||
...healthyBase,
|
||||
anyChannelConfigured: true,
|
||||
agentPing: 'skipped',
|
||||
}),
|
||||
).toBe('success');
|
||||
});
|
||||
|
||||
it('fails when neither CLI nor messaging channels are usable', () => {
|
||||
expect(
|
||||
determineVerifyStatus({
|
||||
...healthyBase,
|
||||
agentPing: 'skipped',
|
||||
}),
|
||||
).toBe('failed');
|
||||
});
|
||||
|
||||
it('fails when the CLI agent does not respond', () => {
|
||||
expect(
|
||||
determineVerifyStatus({
|
||||
...healthyBase,
|
||||
anyChannelConfigured: true,
|
||||
agentPing: 'no_reply',
|
||||
}),
|
||||
).toBe('failed');
|
||||
});
|
||||
|
||||
it('fails when no agent groups are registered', () => {
|
||||
expect(
|
||||
determineVerifyStatus({
|
||||
@@ -52,4 +21,22 @@ describe('determineVerifyStatus', () => {
|
||||
}),
|
||||
).toBe('failed');
|
||||
});
|
||||
|
||||
it('fails when the service is not running', () => {
|
||||
expect(
|
||||
determineVerifyStatus({
|
||||
...healthyBase,
|
||||
service: 'stopped',
|
||||
}),
|
||||
).toBe('failed');
|
||||
});
|
||||
|
||||
it('fails when credentials are missing', () => {
|
||||
expect(
|
||||
determineVerifyStatus({
|
||||
...healthyBase,
|
||||
credentials: 'missing',
|
||||
}),
|
||||
).toBe('failed');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -14,7 +14,6 @@ import Database from 'better-sqlite3';
|
||||
import { DATA_DIR } from '../src/config.js';
|
||||
import { readEnvFile } from '../src/env.js';
|
||||
import { log } from '../src/log.js';
|
||||
import { pingCliAgent, type PingResult } from './lib/agent-ping.js';
|
||||
import { getLaunchdLabel, getSystemdUnit } from '../src/install-slug.js';
|
||||
import {
|
||||
getPlatform,
|
||||
@@ -33,11 +32,12 @@ export async function run(_args: string[]): Promise<void> {
|
||||
|
||||
// 1. Check service status + detect checkout mismatch.
|
||||
//
|
||||
// Why the mismatch matters: the host binds `<DATA_DIR>/cli.sock` relative
|
||||
// to the project root it was started from. If the running service is from
|
||||
// a sibling checkout (common for developers with multiple clones), this
|
||||
// repo's `data/cli.sock` won't exist — AGENT_PING would return a
|
||||
// misleading `socket_error`. Surface the mismatch directly instead.
|
||||
// Why the mismatch matters: the host reads `<projectRoot>/data/v2.db` and
|
||||
// binds `<DATA_DIR>/cli.sock` relative to the project root it was started
|
||||
// from. If the running service is from a sibling checkout (common for
|
||||
// developers with multiple clones), nothing in this checkout is actually
|
||||
// wired up. Surface the mismatch directly so the user knows to point the
|
||||
// service at the right folder.
|
||||
let service:
|
||||
| 'not_found'
|
||||
| 'stopped'
|
||||
@@ -186,7 +186,6 @@ export async function run(_args: string[]): Promise<void> {
|
||||
if (has('IMESSAGE_ENABLED')) channelAuth.imessage = 'configured';
|
||||
|
||||
const configuredChannels = Object.keys(channelAuth);
|
||||
const anyChannelConfigured = configuredChannels.length > 0;
|
||||
|
||||
// 5. Check registered groups in v2 central DB (agent_groups + messaging_group_agents)
|
||||
let registeredGroups = 0;
|
||||
@@ -218,23 +217,12 @@ export async function run(_args: string[]): Promise<void> {
|
||||
mountAllowlist = 'configured';
|
||||
}
|
||||
|
||||
// 7. End-to-end: ping the CLI agent and confirm it replies. Only run if
|
||||
// everything upstream looks healthy, since a broken socket would just hang.
|
||||
let agentPing: 'ok' | 'no_reply' | 'socket_error' | 'auth_error' | 'skipped' = 'skipped';
|
||||
if (service === 'running' && registeredGroups > 0) {
|
||||
log.info('Pinging CLI agent');
|
||||
agentPing = await pingCliAgent();
|
||||
log.info('Agent ping result', { agentPing });
|
||||
}
|
||||
|
||||
// Determine overall status. A CLI-only install is valid when the local
|
||||
// agent round-trip succeeds; messaging app credentials are optional.
|
||||
// Determine overall status. The cli-agent step earlier in setup already
|
||||
// proved the agent round-trip works; verify is a static health check.
|
||||
const status = determineVerifyStatus({
|
||||
service,
|
||||
credentials,
|
||||
anyChannelConfigured,
|
||||
registeredGroups,
|
||||
agentPing,
|
||||
});
|
||||
|
||||
log.info('Verification complete', { status, channelAuth });
|
||||
@@ -247,7 +235,6 @@ export async function run(_args: string[]): Promise<void> {
|
||||
CHANNEL_AUTH: JSON.stringify(channelAuth),
|
||||
REGISTERED_GROUPS: registeredGroups,
|
||||
MOUNT_ALLOWLIST: mountAllowlist,
|
||||
AGENT_PING: agentPing,
|
||||
STATUS: status,
|
||||
LOG: 'logs/setup.log',
|
||||
});
|
||||
@@ -258,18 +245,11 @@ export async function run(_args: string[]): Promise<void> {
|
||||
export function determineVerifyStatus(input: {
|
||||
service: 'not_found' | 'stopped' | 'running' | 'running_other_checkout';
|
||||
credentials: string;
|
||||
anyChannelConfigured: boolean;
|
||||
registeredGroups: number;
|
||||
agentPing: PingResult | 'skipped';
|
||||
}): 'success' | 'failed' {
|
||||
const cliAgentResponds = input.agentPing === 'ok';
|
||||
const hasUsableChannel = input.anyChannelConfigured || cliAgentResponds;
|
||||
|
||||
return input.service === 'running' &&
|
||||
input.credentials !== 'missing' &&
|
||||
hasUsableChannel &&
|
||||
input.registeredGroups > 0 &&
|
||||
(cliAgentResponds || input.agentPing === 'skipped')
|
||||
input.registeredGroups > 0
|
||||
? 'success'
|
||||
: 'failed';
|
||||
}
|
||||
|
||||
@@ -180,6 +180,19 @@ export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[]
|
||||
.all() as ProcessingClaim[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete orphan 'processing' rows. Called by the host after killing a
|
||||
* container so the leftover claim doesn't trip claim-stuck on the next sweep
|
||||
* tick (which would kill the freshly respawned container before its
|
||||
* agent-runner can run its own startup cleanup).
|
||||
*
|
||||
* Safe because the host only writes to outbound.db when no container is
|
||||
* running (we just killed it). Returns the number of rows deleted.
|
||||
*/
|
||||
export function deleteOrphanProcessingClaims(outDb: Database.Database): number {
|
||||
return outDb.prepare("DELETE FROM processing_ack WHERE status = 'processing'").run().changes;
|
||||
}
|
||||
|
||||
export interface ContainerState {
|
||||
current_tool: string | null;
|
||||
tool_declared_timeout_ms: number | null;
|
||||
|
||||
@@ -3,9 +3,17 @@
|
||||
* ACTION-ITEMS item 9. Lives on the pure helper `decideStuckAction` so we
|
||||
* don't have to mock the filesystem or the container runner.
|
||||
*/
|
||||
import Database from 'better-sqlite3';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, decideStuckAction } from './host-sweep.js';
|
||||
import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js';
|
||||
import {
|
||||
ABSOLUTE_CEILING_MS,
|
||||
CLAIM_STUCK_MS,
|
||||
_resetStuckProcessingRowsForTesting,
|
||||
decideStuckAction,
|
||||
} from './host-sweep.js';
|
||||
import type { Session } from './types.js';
|
||||
|
||||
const BASE = Date.parse('2026-04-20T12:00:00.000Z');
|
||||
|
||||
@@ -144,3 +152,143 @@ describe('decideStuckAction', () => {
|
||||
expect(res.action).toBe('ok');
|
||||
});
|
||||
});
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Orphan claim cleanup (regression test for the SIGKILL → claim-stuck loop)
|
||||
//
|
||||
// Repro of the production bug seen 2026-04-30: container A claimed message M
|
||||
// (writes processing_ack row with status='processing'). Host kills A by
|
||||
// absolute-ceiling. Old behavior: messages_in.M was reset to pending but
|
||||
// processing_ack.M survived. On the next sweep tick, wakeContainer spawned B,
|
||||
// the same-tick SLA check saw M's stale claim age (hours), and SIGKILL'd B
|
||||
// before agent-runner could run clearStaleProcessingAcks(). Loop. The fix
|
||||
// deletes processing_ack 'processing' rows when the host kills/cleans the
|
||||
// container, breaking the loop atomically.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
function makeSessionDbs(): { inDb: Database.Database; outDb: Database.Database } {
|
||||
const inDb = new Database(':memory:');
|
||||
inDb.exec(`
|
||||
CREATE TABLE messages_in (
|
||||
id TEXT PRIMARY KEY,
|
||||
seq INTEGER UNIQUE,
|
||||
kind TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
process_after TEXT,
|
||||
recurrence TEXT,
|
||||
series_id TEXT,
|
||||
tries INTEGER DEFAULT 0,
|
||||
trigger INTEGER NOT NULL DEFAULT 1,
|
||||
platform_id TEXT,
|
||||
channel_type TEXT,
|
||||
thread_id TEXT,
|
||||
content TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
const outDb = new Database(':memory:');
|
||||
outDb.exec(`
|
||||
CREATE TABLE processing_ack (
|
||||
message_id TEXT PRIMARY KEY,
|
||||
status TEXT NOT NULL,
|
||||
status_changed TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
return { inDb, outDb };
|
||||
}
|
||||
|
||||
function fakeSession(): Session {
|
||||
return {
|
||||
id: 'sess-test',
|
||||
agent_group_id: 'ag-test',
|
||||
messaging_group_id: null,
|
||||
thread_id: null,
|
||||
agent_provider: null,
|
||||
status: 'active',
|
||||
container_status: 'stopped',
|
||||
last_active: null,
|
||||
created_at: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
describe('deleteOrphanProcessingClaims', () => {
|
||||
it('removes only processing rows, leaves completed/failed alone', () => {
|
||||
const { outDb } = makeSessionDbs();
|
||||
const ts = new Date().toISOString();
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-proc', 'processing', ?)").run(ts);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-done', 'completed', ?)").run(ts);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-fail', 'failed', ?)").run(ts);
|
||||
|
||||
const removed = deleteOrphanProcessingClaims(outDb);
|
||||
|
||||
expect(removed).toBe(1);
|
||||
const remaining = outDb.prepare('SELECT message_id, status FROM processing_ack ORDER BY message_id').all();
|
||||
expect(remaining).toEqual([
|
||||
{ message_id: 'm-done', status: 'completed' },
|
||||
{ message_id: 'm-fail', status: 'failed' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('returns 0 when nothing to clear', () => {
|
||||
const { outDb } = makeSessionDbs();
|
||||
expect(deleteOrphanProcessingClaims(outDb)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('resetStuckProcessingRows — orphan claim cleanup', () => {
|
||||
it('deletes orphan processing_ack rows so next sweep tick does not see them', () => {
|
||||
const { inDb, outDb } = makeSessionDbs();
|
||||
const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); // 2h ago
|
||||
|
||||
// messages_in.status stays 'pending' during processing — only the
|
||||
// container's processing_ack moves to 'processing'. See
|
||||
// src/db/schema.ts header comment on processing_ack.
|
||||
inDb
|
||||
.prepare(
|
||||
"INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES ('m-1', 1, 'chat', ?, 'pending', '{}')",
|
||||
)
|
||||
.run(claimedAt);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-1', 'processing', ?)").run(claimedAt);
|
||||
|
||||
// Sanity: the orphan claim is what would trip claim-stuck.
|
||||
expect(getProcessingClaims(outDb)).toHaveLength(1);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling');
|
||||
|
||||
// Regression assertion: orphan claim is gone — next sweep tick will see
|
||||
// an empty claims list and not kill the freshly respawned container.
|
||||
expect(getProcessingClaims(outDb)).toEqual([]);
|
||||
|
||||
// And the message itself was rescheduled with backoff (existing behavior).
|
||||
const row = inDb.prepare('SELECT status, tries, process_after FROM messages_in WHERE id = ?').get('m-1') as {
|
||||
status: string;
|
||||
tries: number;
|
||||
process_after: string | null;
|
||||
};
|
||||
expect(row.status).toBe('pending');
|
||||
expect(row.tries).toBe(1);
|
||||
expect(row.process_after).not.toBeNull();
|
||||
});
|
||||
|
||||
it('still clears orphan claims even when the inbound message has already been retried (skip path)', () => {
|
||||
// Edge case: the inbound row was already rescheduled (process_after in
|
||||
// future), so the per-message retry loop skips it. The orphan in
|
||||
// processing_ack must still be removed — otherwise the bug remains.
|
||||
const { inDb, outDb } = makeSessionDbs();
|
||||
const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString();
|
||||
const future = new Date(Date.now() + 60_000).toISOString();
|
||||
|
||||
inDb
|
||||
.prepare(
|
||||
"INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, tries, content) VALUES ('m-2', 2, 'chat', ?, 'pending', ?, 1, '{}')",
|
||||
)
|
||||
.run(claimedAt, future);
|
||||
outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt);
|
||||
|
||||
_resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck');
|
||||
|
||||
expect(getProcessingClaims(outDb)).toEqual([]);
|
||||
const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number };
|
||||
expect(row.tries).toBe(1); // not bumped, the skip path held
|
||||
});
|
||||
});
|
||||
|
||||
@@ -33,6 +33,7 @@ import { getActiveSessions } from './db/sessions.js';
|
||||
import { getAgentGroup } from './db/agent-groups.js';
|
||||
import {
|
||||
countDueMessages,
|
||||
deleteOrphanProcessingClaims,
|
||||
getContainerState,
|
||||
getMessageForRetry,
|
||||
getProcessingClaims,
|
||||
@@ -249,6 +250,15 @@ function enforceRunningContainerSla(
|
||||
resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck');
|
||||
}
|
||||
|
||||
export function _resetStuckProcessingRowsForTesting(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
session: Session,
|
||||
reason: string,
|
||||
): void {
|
||||
resetStuckProcessingRows(inDb, outDb, session, reason);
|
||||
}
|
||||
|
||||
function resetStuckProcessingRows(
|
||||
inDb: Database.Database,
|
||||
outDb: Database.Database,
|
||||
@@ -285,4 +295,15 @@ function resetStuckProcessingRows(
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the orphan 'processing' rows. Without this, the next sweep tick
|
||||
// would re-read them, see the old status_changed timestamp, conclude the
|
||||
// freshly respawned container is stuck, and SIGKILL it before its
|
||||
// agent-runner has a chance to run clearStaleProcessingAcks() on startup.
|
||||
// We're safe to write outbound.db here because we just killed the container
|
||||
// that owned it (or it crashed and left no writer behind).
|
||||
const cleared = deleteOrphanProcessingClaims(outDb);
|
||||
if (cleared > 0) {
|
||||
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user