Implement exporter using nocap from nostr-watch along with tests
This commit is contained in:
+216
@@ -0,0 +1,216 @@
|
||||
import { URL } from "node:url";
|
||||
import { generateSecretKey, nip19 } from "nostr-tools";
|
||||
|
||||
export type LogLevel = "debug" | "info" | "warn" | "error";
|
||||
|
||||
export interface RelayTarget {
|
||||
relay: string;
|
||||
host: string;
|
||||
}
|
||||
|
||||
export interface WriteCheckConfig {
|
||||
enabled: boolean;
|
||||
kind: number;
|
||||
privkey?: string;
|
||||
}
|
||||
|
||||
export interface AppConfig {
|
||||
relays: RelayTarget[];
|
||||
probeIntervalMs: number;
|
||||
probeTimeoutMs: number;
|
||||
listenAddr: string;
|
||||
port: number;
|
||||
logLevel: LogLevel;
|
||||
probeConcurrency: number;
|
||||
staleAfterMs: number;
|
||||
writeCheck: WriteCheckConfig;
|
||||
warnings: string[];
|
||||
}
|
||||
|
||||
const DEFAULT_PROBE_INTERVAL_MS = 0;
|
||||
const DEFAULT_PROBE_TIMEOUT_MS = 10_000;
|
||||
const DEFAULT_LISTEN_ADDR = "0.0.0.0";
|
||||
const DEFAULT_PORT = 9464;
|
||||
const DEFAULT_LOG_LEVEL: LogLevel = "info";
|
||||
const DEFAULT_WRITE_CHECK_ENABLED = true;
|
||||
const DEFAULT_WRITE_CHECK_KIND = 30078;
|
||||
|
||||
const MIN_TIMEOUT_MS = 1_000;
|
||||
const MIN_PORT = 1;
|
||||
const MAX_PORT = 65_535;
|
||||
|
||||
// Kept as a constant so operators can tune by editing code.
|
||||
export const DEFAULT_PROBE_CONCURRENCY = 4;
|
||||
|
||||
function parseBoolean(raw: string | undefined, fallback: boolean): boolean {
|
||||
if (raw === undefined || raw.trim() === "") return fallback;
|
||||
const value = raw.trim().toLowerCase();
|
||||
if (["1", "true", "yes", "on"].includes(value)) return true;
|
||||
if (["0", "false", "no", "off"].includes(value)) return false;
|
||||
return fallback;
|
||||
}
|
||||
|
||||
function parseInteger(
|
||||
raw: string | undefined,
|
||||
fallback: number,
|
||||
min: number,
|
||||
name: string,
|
||||
): number {
|
||||
if (raw === undefined || raw.trim() === "") return fallback;
|
||||
const parsed = Number.parseInt(raw, 10);
|
||||
if (!Number.isFinite(parsed) || Number.isNaN(parsed)) {
|
||||
throw new Error(`${name} must be an integer`);
|
||||
}
|
||||
if (parsed < min) {
|
||||
throw new Error(`${name} must be >= ${min}`);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
function parseLogLevel(raw: string | undefined): LogLevel {
|
||||
if (!raw) return DEFAULT_LOG_LEVEL;
|
||||
const normalized = raw.trim().toLowerCase();
|
||||
if (
|
||||
normalized === "debug" ||
|
||||
normalized === "info" ||
|
||||
normalized === "warn" ||
|
||||
normalized === "error"
|
||||
) {
|
||||
return normalized;
|
||||
}
|
||||
throw new Error("LOG_LEVEL must be one of: debug, info, warn, error");
|
||||
}
|
||||
|
||||
function parseRelay(rawRelay: string): RelayTarget | null {
|
||||
const relayValue = rawRelay.trim();
|
||||
if (!relayValue) return null;
|
||||
let relayUrl: URL;
|
||||
try {
|
||||
relayUrl = new URL(relayValue);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
if (relayUrl.protocol !== "wss:") return null;
|
||||
return {
|
||||
relay: relayUrl.toString(),
|
||||
host: relayUrl.hostname,
|
||||
};
|
||||
}
|
||||
|
||||
function parseRelays(raw: string): { relays: RelayTarget[]; warnings: string[] } {
|
||||
const warnings: string[] = [];
|
||||
const unique = new Set<string>();
|
||||
const relays: RelayTarget[] = [];
|
||||
|
||||
for (const chunk of raw.split(",")) {
|
||||
const parsed = parseRelay(chunk);
|
||||
if (!parsed) {
|
||||
const clean = chunk.trim();
|
||||
if (clean) warnings.push(`Ignoring invalid relay URL: ${clean}`);
|
||||
continue;
|
||||
}
|
||||
if (unique.has(parsed.relay)) continue;
|
||||
unique.add(parsed.relay);
|
||||
relays.push(parsed);
|
||||
}
|
||||
|
||||
return { relays, warnings };
|
||||
}
|
||||
|
||||
function isHex64(value: string): boolean {
|
||||
return /^[0-9a-f]{64}$/i.test(value);
|
||||
}
|
||||
|
||||
function isValidWriteCheckPrivkey(raw: string): boolean {
|
||||
const value = raw.trim();
|
||||
if (value.startsWith("nsec1")) {
|
||||
try {
|
||||
const decoded = nip19.decode(value);
|
||||
return decoded.type === "nsec";
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return isHex64(value);
|
||||
}
|
||||
|
||||
function generateEphemeralWriteCheckPrivkey(): string {
|
||||
return Buffer.from(generateSecretKey()).toString("hex");
|
||||
}
|
||||
|
||||
export function loadConfig(env: NodeJS.ProcessEnv = process.env): AppConfig {
|
||||
const relaysRaw = env.RELAYS;
|
||||
if (!relaysRaw || relaysRaw.trim() === "") {
|
||||
throw new Error("RELAYS is required (comma-separated list of wss:// URLs)");
|
||||
}
|
||||
|
||||
const { relays, warnings } = parseRelays(relaysRaw);
|
||||
if (relays.length === 0) {
|
||||
throw new Error("RELAYS did not contain any valid wss:// relay URLs");
|
||||
}
|
||||
|
||||
const probeIntervalMs = parseInteger(
|
||||
env.PROBE_INTERVAL_MS,
|
||||
DEFAULT_PROBE_INTERVAL_MS,
|
||||
0,
|
||||
"PROBE_INTERVAL_MS",
|
||||
);
|
||||
const probeTimeoutMs = parseInteger(
|
||||
env.PROBE_TIMEOUT_MS,
|
||||
DEFAULT_PROBE_TIMEOUT_MS,
|
||||
MIN_TIMEOUT_MS,
|
||||
"PROBE_TIMEOUT_MS",
|
||||
);
|
||||
const port = parseInteger(env.PORT, DEFAULT_PORT, MIN_PORT, "PORT");
|
||||
if (port > MAX_PORT) {
|
||||
throw new Error(`PORT must be <= ${MAX_PORT}`);
|
||||
}
|
||||
|
||||
const writeEnabledFlag = parseBoolean(env.WRITE_CHECK_ENABLED, DEFAULT_WRITE_CHECK_ENABLED);
|
||||
const writeCheckKind = parseInteger(
|
||||
env.WRITE_CHECK_KIND,
|
||||
DEFAULT_WRITE_CHECK_KIND,
|
||||
0,
|
||||
"WRITE_CHECK_KIND",
|
||||
);
|
||||
const writePrivkey = env.WRITE_CHECK_PRIVKEY?.trim();
|
||||
let resolvedWritePrivkey = writePrivkey;
|
||||
if (env.WRITE_CHECK_PUBKEY?.trim()) {
|
||||
warnings.push(
|
||||
"WRITE_CHECK_PUBKEY is ignored; write-check pubkey is always derived from WRITE_CHECK_PRIVKEY",
|
||||
);
|
||||
}
|
||||
if (writeEnabledFlag) {
|
||||
if (!writePrivkey) {
|
||||
resolvedWritePrivkey = generateEphemeralWriteCheckPrivkey();
|
||||
warnings.push(
|
||||
"WRITE_CHECK_PRIVKEY not set; generated ephemeral write-check key for this process",
|
||||
);
|
||||
} else if (!isValidWriteCheckPrivkey(writePrivkey)) {
|
||||
resolvedWritePrivkey = generateEphemeralWriteCheckPrivkey();
|
||||
warnings.push(
|
||||
"WRITE_CHECK_PRIVKEY invalid; generated ephemeral write-check key for this process",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const staleAfterMs = probeIntervalMs + probeTimeoutMs + 5_000;
|
||||
const writeCheck: WriteCheckConfig = {
|
||||
enabled: writeEnabledFlag,
|
||||
kind: writeCheckKind,
|
||||
...(resolvedWritePrivkey ? { privkey: resolvedWritePrivkey } : {}),
|
||||
};
|
||||
|
||||
return {
|
||||
relays,
|
||||
probeIntervalMs,
|
||||
probeTimeoutMs,
|
||||
listenAddr: env.LISTEN_ADDR?.trim() || DEFAULT_LISTEN_ADDR,
|
||||
port,
|
||||
logLevel: parseLogLevel(env.LOG_LEVEL),
|
||||
probeConcurrency: DEFAULT_PROBE_CONCURRENCY,
|
||||
staleAfterMs,
|
||||
writeCheck,
|
||||
warnings,
|
||||
};
|
||||
}
|
||||
+147
@@ -0,0 +1,147 @@
|
||||
import http from "node:http";
|
||||
import process from "node:process";
|
||||
|
||||
import { loadConfig, type LogLevel } from "./config.js";
|
||||
import { RelayMetrics } from "./metrics.js";
|
||||
import { RelayProber, type Logger } from "./prober.js";
|
||||
|
||||
const LOG_LEVEL_ORDER: Record<LogLevel, number> = {
|
||||
debug: 10,
|
||||
info: 20,
|
||||
warn: 30,
|
||||
error: 40,
|
||||
};
|
||||
|
||||
function sanitizeFields(fields: Record<string, unknown>): Record<string, unknown> {
|
||||
const sanitized: Record<string, unknown> = {};
|
||||
for (const [key, value] of Object.entries(fields)) {
|
||||
if (key.toLowerCase().includes("privkey") || key.toLowerCase().includes("secret")) {
|
||||
sanitized[key] = "[redacted]";
|
||||
continue;
|
||||
}
|
||||
sanitized[key] = value;
|
||||
}
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
function createLogger(level: LogLevel): Logger {
|
||||
const minLevel = LOG_LEVEL_ORDER[level];
|
||||
|
||||
const emit = (lvl: LogLevel, fields: Record<string, unknown>, message: string): void => {
|
||||
if (LOG_LEVEL_ORDER[lvl] < minLevel) return;
|
||||
const line = {
|
||||
ts: new Date().toISOString(),
|
||||
level: lvl,
|
||||
msg: message,
|
||||
...sanitizeFields(fields),
|
||||
};
|
||||
process.stdout.write(`${JSON.stringify(line)}\n`);
|
||||
};
|
||||
|
||||
return {
|
||||
debug: (fields, message) => emit("debug", fields, message),
|
||||
info: (fields, message) => emit("info", fields, message),
|
||||
warn: (fields, message) => emit("warn", fields, message),
|
||||
error: (fields, message) => emit("error", fields, message),
|
||||
};
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const config = loadConfig();
|
||||
const logger = createLogger(config.logLevel);
|
||||
for (const warning of config.warnings) {
|
||||
logger.warn({ component: "config" }, warning);
|
||||
}
|
||||
|
||||
const metrics = new RelayMetrics();
|
||||
const prober = new RelayProber(config, metrics, logger);
|
||||
prober.start();
|
||||
|
||||
let shuttingDown = false;
|
||||
const server = http.createServer(async (req, res) => {
|
||||
if (!req.url) {
|
||||
res.statusCode = 400;
|
||||
res.end("bad request");
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/metrics") {
|
||||
if (config.probeIntervalMs <= 0) {
|
||||
await prober.runOnDemand();
|
||||
}
|
||||
const nowUnix = Math.floor(Date.now() / 1000);
|
||||
prober.markStale(nowUnix);
|
||||
res.statusCode = 200;
|
||||
res.setHeader("Content-Type", metrics.registry.contentType);
|
||||
res.end(await metrics.registry.metrics());
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/healthz") {
|
||||
const nowUnix = Math.floor(Date.now() / 1000);
|
||||
const health = prober.health(nowUnix);
|
||||
const ok = !shuttingDown && health.ok;
|
||||
const body = JSON.stringify(
|
||||
{
|
||||
status: ok ? "ok" : "degraded",
|
||||
shuttingDown,
|
||||
maxLastProbeAgeMs: health.minLastProbeAgeMs,
|
||||
},
|
||||
null,
|
||||
2,
|
||||
);
|
||||
res.statusCode = ok ? 200 : 503;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(body);
|
||||
return;
|
||||
}
|
||||
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain");
|
||||
res.end("not found");
|
||||
});
|
||||
|
||||
const shutdown = (signal: NodeJS.Signals): void => {
|
||||
if (shuttingDown) return;
|
||||
shuttingDown = true;
|
||||
logger.info({ signal }, "shutdown requested");
|
||||
prober.stop();
|
||||
|
||||
const forceExitTimer = setTimeout(() => {
|
||||
logger.error({ signal }, "forced shutdown after timeout");
|
||||
process.exit(1);
|
||||
}, 10_000);
|
||||
|
||||
server.close((error?: Error) => {
|
||||
clearTimeout(forceExitTimer);
|
||||
if (error) {
|
||||
logger.error({ error: error.message }, "http server close failed");
|
||||
process.exit(1);
|
||||
return;
|
||||
}
|
||||
logger.info({ signal }, "shutdown complete");
|
||||
process.exit(0);
|
||||
});
|
||||
};
|
||||
|
||||
process.on("SIGINT", shutdown);
|
||||
process.on("SIGTERM", shutdown);
|
||||
|
||||
server.listen(config.port, config.listenAddr, () => {
|
||||
logger.info(
|
||||
{
|
||||
listenAddr: config.listenAddr,
|
||||
port: config.port,
|
||||
relays: config.relays.length,
|
||||
writeCheckEnabled: config.writeCheck.enabled,
|
||||
},
|
||||
"relay exporter started",
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
main().catch((error: unknown) => {
|
||||
const message = error instanceof Error ? error.message : "startup failure";
|
||||
process.stderr.write(`${message}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
+130
@@ -0,0 +1,130 @@
|
||||
import { Counter, Gauge, Registry, collectDefaultMetrics } from "prom-client";
|
||||
|
||||
export interface RelayProbeUpdate {
|
||||
relay: string;
|
||||
openOk: boolean;
|
||||
readOk: boolean;
|
||||
writeConfirmOk: boolean;
|
||||
up: boolean;
|
||||
openDurationMs: number;
|
||||
readDurationMs: number;
|
||||
writeDurationMs?: number;
|
||||
lastSuccessUnix?: number;
|
||||
}
|
||||
|
||||
export class RelayMetrics {
|
||||
public readonly registry: Registry;
|
||||
|
||||
private readonly relayUp: Gauge<"relay">;
|
||||
private readonly relayOpenOk: Gauge<"relay">;
|
||||
private readonly relayReadOk: Gauge<"relay">;
|
||||
private readonly relayWriteConfirmOk: Gauge<"relay">;
|
||||
private readonly relayOpenDurationMs: Gauge<"relay">;
|
||||
private readonly relayReadDurationMs: Gauge<"relay">;
|
||||
private readonly relayWriteDurationMs: Gauge<"relay">;
|
||||
private readonly relayLastSuccessUnix: Gauge<"relay">;
|
||||
private readonly relayProbeErrorsTotal: Counter<"relay" | "check">;
|
||||
private readonly relayProbeRunsTotal: Counter<"relay" | "result">;
|
||||
|
||||
constructor() {
|
||||
this.registry = new Registry();
|
||||
collectDefaultMetrics({ register: this.registry });
|
||||
|
||||
this.relayUp = new Gauge({
|
||||
name: "nostr_relay_up",
|
||||
help: "1 if open+read(+write confirm when enabled) succeeds, else 0",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayOpenOk = new Gauge({
|
||||
name: "nostr_relay_open_ok",
|
||||
help: "Relay open check status",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayReadOk = new Gauge({
|
||||
name: "nostr_relay_read_ok",
|
||||
help: "Relay read check status",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayWriteConfirmOk = new Gauge({
|
||||
name: "nostr_relay_write_confirm_ok",
|
||||
help: "Relay write+read-after-write confirmation status",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayOpenDurationMs = new Gauge({
|
||||
name: "nostr_relay_open_duration_ms",
|
||||
help: "Relay open check duration in milliseconds",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayReadDurationMs = new Gauge({
|
||||
name: "nostr_relay_read_duration_ms",
|
||||
help: "Relay read check duration in milliseconds",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayWriteDurationMs = new Gauge({
|
||||
name: "nostr_relay_write_duration_ms",
|
||||
help: "Relay write-confirm duration in milliseconds",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayLastSuccessUnix = new Gauge({
|
||||
name: "nostr_relay_last_success_unixtime",
|
||||
help: "Unix timestamp of the last successful full probe",
|
||||
labelNames: ["relay"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayProbeErrorsTotal = new Counter({
|
||||
name: "nostr_relay_probe_errors_total",
|
||||
help: "Total relay probe errors by check",
|
||||
labelNames: ["relay", "check"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
this.relayProbeRunsTotal = new Counter({
|
||||
name: "nostr_relay_probe_runs_total",
|
||||
help: "Total relay probe runs by final result",
|
||||
labelNames: ["relay", "result"],
|
||||
registers: [this.registry],
|
||||
});
|
||||
}
|
||||
|
||||
public initializeRelay(relay: string): void {
|
||||
this.relayUp.labels(relay).set(0);
|
||||
this.relayOpenOk.labels(relay).set(0);
|
||||
this.relayReadOk.labels(relay).set(0);
|
||||
this.relayWriteConfirmOk.labels(relay).set(0);
|
||||
this.relayOpenDurationMs.labels(relay).set(-1);
|
||||
this.relayReadDurationMs.labels(relay).set(-1);
|
||||
this.relayWriteDurationMs.labels(relay).set(-1);
|
||||
this.relayLastSuccessUnix.labels(relay).set(0);
|
||||
}
|
||||
|
||||
public applyUpdate(update: RelayProbeUpdate): void {
|
||||
this.relayUp.labels(update.relay).set(update.up ? 1 : 0);
|
||||
this.relayOpenOk.labels(update.relay).set(update.openOk ? 1 : 0);
|
||||
this.relayReadOk.labels(update.relay).set(update.readOk ? 1 : 0);
|
||||
this.relayWriteConfirmOk.labels(update.relay).set(update.writeConfirmOk ? 1 : 0);
|
||||
this.relayOpenDurationMs.labels(update.relay).set(update.openDurationMs);
|
||||
this.relayReadDurationMs.labels(update.relay).set(update.readDurationMs);
|
||||
this.relayWriteDurationMs.labels(update.relay).set(update.writeDurationMs ?? -1);
|
||||
if (update.lastSuccessUnix !== undefined) {
|
||||
this.relayLastSuccessUnix.labels(update.relay).set(update.lastSuccessUnix);
|
||||
}
|
||||
}
|
||||
|
||||
public markStale(relay: string): void {
|
||||
this.relayUp.labels(relay).set(0);
|
||||
}
|
||||
|
||||
public incProbeError(relay: string, check: string): void {
|
||||
this.relayProbeErrorsTotal.labels(relay, check).inc();
|
||||
}
|
||||
|
||||
public incProbeRun(relay: string, result: "success" | "failure"): void {
|
||||
this.relayProbeRunsTotal.labels(relay, result).inc();
|
||||
}
|
||||
}
|
||||
Vendored
+4
@@ -0,0 +1,4 @@
|
||||
declare module "@nostrwatch/nocap-every-adapter-default" {
|
||||
const adapters: Record<string, new (...args: unknown[]) => unknown>;
|
||||
export default adapters;
|
||||
}
|
||||
Vendored
+7
@@ -0,0 +1,7 @@
|
||||
declare module "@nostrwatch/nocap" {
|
||||
export class Nocap {
|
||||
constructor(url: string, config?: unknown);
|
||||
check(keys: string[], headers?: boolean): Promise<unknown>;
|
||||
useAdapters(adapters: unknown[]): Promise<void>;
|
||||
}
|
||||
}
|
||||
+296
@@ -0,0 +1,296 @@
|
||||
import pLimit from "p-limit";
|
||||
import { Nocap } from "@nostrwatch/nocap";
|
||||
import NocapEveryAdapterDefault from "@nostrwatch/nocap-every-adapter-default";
|
||||
|
||||
import type { AppConfig, RelayTarget } from "./config.js";
|
||||
import type { RelayMetrics } from "./metrics.js";
|
||||
import { runWriteConfirm } from "./writeConfirm.js";
|
||||
|
||||
export interface Logger {
|
||||
debug(fields: Record<string, unknown>, message: string): void;
|
||||
info(fields: Record<string, unknown>, message: string): void;
|
||||
warn(fields: Record<string, unknown>, message: string): void;
|
||||
error(fields: Record<string, unknown>, message: string): void;
|
||||
}
|
||||
|
||||
interface RelayCheckNode {
|
||||
data?: boolean;
|
||||
duration?: number;
|
||||
status?: string;
|
||||
message?: string;
|
||||
}
|
||||
|
||||
interface NocapResult {
|
||||
open?: RelayCheckNode;
|
||||
read?: RelayCheckNode;
|
||||
}
|
||||
|
||||
interface RelayProbeState {
|
||||
lastProbeUnix?: number;
|
||||
lastSuccessUnix?: number;
|
||||
}
|
||||
|
||||
function withTimeout<T>(promise: Promise<T>, timeoutMs: number, label: string): Promise<T> {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const timer = setTimeout(() => reject(new Error(`${label} timed out after ${timeoutMs}ms`)), timeoutMs);
|
||||
promise
|
||||
.then((value) => {
|
||||
clearTimeout(timer);
|
||||
resolve(value);
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
clearTimeout(timer);
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function durationOrDefault(value: number | undefined): number {
|
||||
if (typeof value === "number" && Number.isFinite(value) && value >= 0) {
|
||||
return value;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
function messageOf(error: unknown): string {
|
||||
if (error instanceof Error) return error.message;
|
||||
return "unknown error";
|
||||
}
|
||||
|
||||
export class RelayProber {
|
||||
private timer: NodeJS.Timeout | null = null;
|
||||
private running = false;
|
||||
private rerunRequested = false;
|
||||
private stopped = false;
|
||||
private readonly states = new Map<string, RelayProbeState>();
|
||||
|
||||
constructor(
|
||||
private readonly config: AppConfig,
|
||||
private readonly metrics: RelayMetrics,
|
||||
private readonly logger: Logger,
|
||||
) {
|
||||
for (const relay of this.config.relays) {
|
||||
this.metrics.initializeRelay(relay.relay);
|
||||
this.states.set(relay.relay, {});
|
||||
}
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
if (this.config.probeIntervalMs <= 0) {
|
||||
this.logger.info(
|
||||
{ probeIntervalMs: this.config.probeIntervalMs },
|
||||
"probe interval disabled; probes run on /metrics scrape",
|
||||
);
|
||||
return;
|
||||
}
|
||||
void this.runCycle();
|
||||
this.timer = setInterval(() => {
|
||||
void this.runCycle();
|
||||
}, this.config.probeIntervalMs);
|
||||
}
|
||||
|
||||
public async runOnDemand(): Promise<void> {
|
||||
await this.runCycle();
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
this.stopped = true;
|
||||
if (this.timer) {
|
||||
clearInterval(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
public markStale(nowUnix: number): void {
|
||||
for (const relay of this.config.relays) {
|
||||
const state = this.states.get(relay.relay);
|
||||
if (!state?.lastProbeUnix) {
|
||||
this.metrics.markStale(relay.relay);
|
||||
continue;
|
||||
}
|
||||
const ageMs = nowUnix * 1000 - state.lastProbeUnix * 1000;
|
||||
if (ageMs > this.config.staleAfterMs) {
|
||||
this.metrics.markStale(relay.relay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public health(nowUnix: number): { ok: boolean; minLastProbeAgeMs: number | null } {
|
||||
let maxAgeMs = 0;
|
||||
let seen = false;
|
||||
for (const relay of this.config.relays) {
|
||||
const state = this.states.get(relay.relay);
|
||||
if (!state?.lastProbeUnix) continue;
|
||||
seen = true;
|
||||
const age = nowUnix * 1000 - state.lastProbeUnix * 1000;
|
||||
if (age > maxAgeMs) maxAgeMs = age;
|
||||
}
|
||||
if (!seen) return { ok: false, minLastProbeAgeMs: null };
|
||||
return { ok: maxAgeMs <= this.config.staleAfterMs, minLastProbeAgeMs: maxAgeMs };
|
||||
}
|
||||
|
||||
private async runCycle(): Promise<void> {
|
||||
if (this.stopped) return;
|
||||
if (this.running) {
|
||||
this.rerunRequested = true;
|
||||
return;
|
||||
}
|
||||
|
||||
this.running = true;
|
||||
const cycleStarted = Date.now();
|
||||
this.logger.debug(
|
||||
{
|
||||
relayCount: this.config.relays.length,
|
||||
concurrency: this.config.probeConcurrency,
|
||||
},
|
||||
"probe cycle started",
|
||||
);
|
||||
|
||||
try {
|
||||
const limit = pLimit(this.config.probeConcurrency);
|
||||
await Promise.allSettled(
|
||||
this.config.relays.map((relay) =>
|
||||
limit(async () => {
|
||||
await this.probeRelay(relay);
|
||||
}),
|
||||
),
|
||||
);
|
||||
} finally {
|
||||
this.running = false;
|
||||
this.logger.debug(
|
||||
{ durationMs: Date.now() - cycleStarted, rerunRequested: this.rerunRequested },
|
||||
"probe cycle finished",
|
||||
);
|
||||
if (this.rerunRequested && !this.stopped) {
|
||||
this.rerunRequested = false;
|
||||
void this.runCycle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async probeRelay(relay: RelayTarget): Promise<void> {
|
||||
const relayLabel = relay.relay;
|
||||
const probeStarted = Date.now();
|
||||
const state = this.states.get(relayLabel) ?? {};
|
||||
|
||||
let openOk = false;
|
||||
let readOk = false;
|
||||
let writeConfirmOk = !this.config.writeCheck.enabled;
|
||||
let openDurationMs = -1;
|
||||
let readDurationMs = -1;
|
||||
let writeDurationMs = -1;
|
||||
|
||||
try {
|
||||
const nocap = new Nocap(relay.relay, {
|
||||
logLevel: this.config.logLevel,
|
||||
timeout: {
|
||||
open: this.config.probeTimeoutMs,
|
||||
read: this.config.probeTimeoutMs,
|
||||
},
|
||||
failAllChecksOnConnectFailure: true,
|
||||
});
|
||||
const adapters = Object.values(NocapEveryAdapterDefault);
|
||||
await nocap.useAdapters(adapters);
|
||||
|
||||
const result = (await withTimeout(
|
||||
nocap.check(["open", "read"], true) as Promise<NocapResult>,
|
||||
this.config.probeTimeoutMs * 2,
|
||||
"nocap open/read",
|
||||
)) as NocapResult;
|
||||
|
||||
openOk = Boolean(result.open?.data);
|
||||
readOk = Boolean(result.read?.data);
|
||||
openDurationMs = durationOrDefault(result.open?.duration);
|
||||
readDurationMs = durationOrDefault(result.read?.duration);
|
||||
|
||||
if (!openOk) {
|
||||
this.metrics.incProbeError(relayLabel, "open");
|
||||
}
|
||||
if (!readOk) {
|
||||
this.metrics.incProbeError(relayLabel, "read");
|
||||
}
|
||||
|
||||
if (this.config.writeCheck.enabled) {
|
||||
const privkey = this.config.writeCheck.privkey;
|
||||
if (!privkey) {
|
||||
writeConfirmOk = false;
|
||||
this.metrics.incProbeError(relayLabel, "write_confirm");
|
||||
} else {
|
||||
const writeResult = await runWriteConfirm({
|
||||
relay: relay.relay,
|
||||
host: relay.host,
|
||||
kind: this.config.writeCheck.kind,
|
||||
privkey,
|
||||
timeoutMs: this.config.probeTimeoutMs,
|
||||
});
|
||||
writeConfirmOk = writeResult.ok;
|
||||
writeDurationMs = durationOrDefault(writeResult.durationMs);
|
||||
if (!writeResult.ok) {
|
||||
this.metrics.incProbeError(relayLabel, "write_confirm");
|
||||
this.logger.warn(
|
||||
{ relay: relayLabel, check: "write_confirm", reason: writeResult.reason },
|
||||
"write confirmation failed",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const up = openOk && readOk && writeConfirmOk;
|
||||
const nowUnix = Math.floor(Date.now() / 1000);
|
||||
state.lastProbeUnix = nowUnix;
|
||||
if (up) {
|
||||
state.lastSuccessUnix = nowUnix;
|
||||
this.metrics.incProbeRun(relayLabel, "success");
|
||||
} else {
|
||||
this.metrics.incProbeRun(relayLabel, "failure");
|
||||
}
|
||||
this.states.set(relayLabel, state);
|
||||
|
||||
this.metrics.applyUpdate({
|
||||
relay: relayLabel,
|
||||
openOk,
|
||||
readOk,
|
||||
writeConfirmOk,
|
||||
up,
|
||||
openDurationMs,
|
||||
readDurationMs,
|
||||
writeDurationMs,
|
||||
...(state.lastSuccessUnix !== undefined ? { lastSuccessUnix: state.lastSuccessUnix } : {}),
|
||||
});
|
||||
|
||||
this.logger.info(
|
||||
{
|
||||
relay: relayLabel,
|
||||
up,
|
||||
openOk,
|
||||
readOk,
|
||||
writeConfirmOk,
|
||||
durationMs: Date.now() - probeStarted,
|
||||
},
|
||||
"relay probe completed",
|
||||
);
|
||||
} catch (error: unknown) {
|
||||
const nowUnix = Math.floor(Date.now() / 1000);
|
||||
state.lastProbeUnix = nowUnix;
|
||||
this.states.set(relayLabel, state);
|
||||
|
||||
this.metrics.incProbeError(relayLabel, "probe");
|
||||
this.metrics.incProbeRun(relayLabel, "failure");
|
||||
this.metrics.applyUpdate({
|
||||
relay: relayLabel,
|
||||
openOk: false,
|
||||
readOk: false,
|
||||
writeConfirmOk: false,
|
||||
up: false,
|
||||
openDurationMs,
|
||||
readDurationMs,
|
||||
writeDurationMs,
|
||||
...(state.lastSuccessUnix !== undefined ? { lastSuccessUnix: state.lastSuccessUnix } : {}),
|
||||
});
|
||||
this.logger.error(
|
||||
{ relay: relayLabel, error: messageOf(error), durationMs: Date.now() - probeStarted },
|
||||
"relay probe failed",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
import { performance } from "node:perf_hooks";
|
||||
import { finalizeEvent, getPublicKey, nip19, Relay, type Filter } from "nostr-tools";
|
||||
|
||||
interface WriteConfirmInput {
|
||||
relay: string;
|
||||
host: string;
|
||||
kind: number;
|
||||
privkey: string;
|
||||
timeoutMs: number;
|
||||
}
|
||||
|
||||
export interface WriteConfirmResult {
|
||||
ok: boolean;
|
||||
durationMs: number;
|
||||
reason?: string;
|
||||
}
|
||||
|
||||
function isHex64(value: string): boolean {
|
||||
return /^[0-9a-f]{64}$/i.test(value);
|
||||
}
|
||||
|
||||
function decodePrivateKey(raw: string): Uint8Array {
|
||||
const value = raw.trim();
|
||||
if (value.startsWith("nsec1")) {
|
||||
const decoded = nip19.decode(value);
|
||||
if (decoded.type !== "nsec") {
|
||||
throw new Error("WRITE_CHECK_PRIVKEY is not a valid nsec key");
|
||||
}
|
||||
return decoded.data;
|
||||
}
|
||||
if (!isHex64(value)) {
|
||||
throw new Error("WRITE_CHECK_PRIVKEY must be nsec or 64-char hex");
|
||||
}
|
||||
return Uint8Array.from(Buffer.from(value, "hex"));
|
||||
}
|
||||
|
||||
function withTimeout<T>(promise: Promise<T>, timeoutMs: number, label: string): Promise<T> {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const timer = setTimeout(() => reject(new Error(`${label} timed out after ${timeoutMs}ms`)), timeoutMs);
|
||||
promise
|
||||
.then((value) => {
|
||||
clearTimeout(timer);
|
||||
resolve(value);
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
clearTimeout(timer);
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function waitForEvent(
|
||||
relay: Relay,
|
||||
filter: Filter,
|
||||
expectedEventId: string,
|
||||
timeoutMs: number,
|
||||
): Promise<boolean> {
|
||||
return withTimeout<boolean>(
|
||||
new Promise<boolean>((resolve) => {
|
||||
let settled = false;
|
||||
const sub = relay.subscribe([filter], {
|
||||
onevent: (evt) => {
|
||||
if (settled) return;
|
||||
if (evt.id === expectedEventId) {
|
||||
settled = true;
|
||||
sub.close();
|
||||
resolve(true);
|
||||
}
|
||||
},
|
||||
oneose: () => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
sub.close();
|
||||
resolve(false);
|
||||
},
|
||||
});
|
||||
}),
|
||||
timeoutMs,
|
||||
"write confirm read-after-write",
|
||||
);
|
||||
}
|
||||
|
||||
export async function runWriteConfirm(input: WriteConfirmInput): Promise<WriteConfirmResult> {
|
||||
const startedAt = performance.now();
|
||||
let relay: Relay | null = null;
|
||||
try {
|
||||
const secretKey = decodePrivateKey(input.privkey);
|
||||
const derivedPubkey = getPublicKey(secretKey);
|
||||
|
||||
relay = await withTimeout(
|
||||
Relay.connect(input.relay, { enablePing: false, enableReconnect: false }),
|
||||
input.timeoutMs,
|
||||
"relay connect",
|
||||
);
|
||||
|
||||
const createdAt = Math.floor(Date.now() / 1000);
|
||||
const dTag = `healthcheck:${input.host}`;
|
||||
|
||||
const event = finalizeEvent(
|
||||
{
|
||||
kind: input.kind,
|
||||
created_at: createdAt,
|
||||
tags: [["d", dTag]],
|
||||
content: "ok",
|
||||
},
|
||||
secretKey,
|
||||
);
|
||||
|
||||
await withTimeout(relay.publish(event), input.timeoutMs, "relay publish");
|
||||
|
||||
const elapsedMs = performance.now() - startedAt;
|
||||
const remainingMs = Math.max(1, input.timeoutMs - Math.floor(elapsedMs));
|
||||
|
||||
const confirmed = await waitForEvent(
|
||||
relay,
|
||||
{
|
||||
authors: [derivedPubkey],
|
||||
kinds: [input.kind],
|
||||
"#d": [dTag],
|
||||
limit: 1,
|
||||
since: Math.max(0, createdAt - 5),
|
||||
},
|
||||
event.id,
|
||||
remainingMs,
|
||||
);
|
||||
|
||||
return {
|
||||
ok: confirmed,
|
||||
durationMs: Math.round(performance.now() - startedAt),
|
||||
...(confirmed ? {} : { reason: "event not observed on read-after-write query" }),
|
||||
};
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : "unknown write confirmation error";
|
||||
return {
|
||||
ok: false,
|
||||
durationMs: Math.round(performance.now() - startedAt),
|
||||
reason: message,
|
||||
};
|
||||
} finally {
|
||||
relay?.close();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user