From a78c41c75e783ab482640f2d6b3d13c0522beaca Mon Sep 17 00:00:00 2001 From: ilya-bov <111734093+ilya-bov@users.noreply.github.com> Date: Thu, 9 Apr 2026 20:28:49 +0300 Subject: [PATCH] fix(telegram): harden polling reliability and mode/secret handling --- .../telegram-integration-manager.tsx | 46 +++++++--- src/lib/storage/telegram-integration-store.ts | 30 +++++-- src/lib/telegram/polling-lifecycle.ts | 12 ++- src/lib/telegram/polling-service.ts | 85 +++++++++++++++---- 4 files changed, 139 insertions(+), 34 deletions(-) diff --git a/src/components/telegram-integration-manager.tsx b/src/components/telegram-integration-manager.tsx index 1c71e9d..5b0db83 100644 --- a/src/components/telegram-integration-manager.tsx +++ b/src/components/telegram-integration-manager.tsx @@ -70,16 +70,42 @@ export function TelegramIntegrationManager() { // Helper to detect if URL is localhost/private (needs polling) or public (can use webhook) const detectUrlMode = useCallback((url: string): "webhook" | "polling" => { - if (!url.trim()) return "polling"; - const lowerUrl = url.toLowerCase().trim(); - // Check for localhost, private IPs, or non-HTTPS - if (lowerUrl.includes("localhost")) return "polling"; - if (lowerUrl.includes("127.0.0.1")) return "polling"; - if (lowerUrl.includes("192.168.")) return "polling"; - if (lowerUrl.includes("10.0.")) return "polling"; - if (lowerUrl.includes("172.16.")) return "polling"; - if (lowerUrl.startsWith("http://")) return "polling"; - return "webhook"; + const normalized = url.trim(); + if (!normalized) return "polling"; + + try { + const parsed = new URL(normalized); + const hostname = parsed.hostname.toLowerCase(); + if (parsed.protocol !== "https:") return "polling"; + + if ( + hostname === "localhost" || + hostname === "::1" || + hostname.endsWith(".local") + ) { + return "polling"; + } + + const octets = hostname.split(".").map((part) => Number(part)); + if ( + octets.length === 4 && + octets.every((value) => Number.isInteger(value) && value >= 0 && value <= 255) + ) { + const [first, second] = octets; + if ( + first === 10 || + first === 127 || + (first === 192 && second === 168) || + (first === 172 && second >= 16 && second <= 31) + ) { + return "polling"; + } + } + + return "webhook"; + } catch { + return "polling"; + } }, []); const [allowedUserIdsInput, setAllowedUserIdsInput] = useState(""); const [pendingAccessCodes, setPendingAccessCodes] = useState(0); diff --git a/src/lib/storage/telegram-integration-store.ts b/src/lib/storage/telegram-integration-store.ts index 0f6a9ee..e3acd62 100644 --- a/src/lib/storage/telegram-integration-store.ts +++ b/src/lib/storage/telegram-integration-store.ts @@ -240,18 +240,36 @@ function normalizePollingInterval(raw: unknown): number { return Math.max(1000, Math.min(60000, numeric)); } +function isPrivateIpv4Address(hostname: string): boolean { + const octets = hostname.split(".").map((part) => Number(part)); + if (octets.length !== 4 || octets.some((value) => !Number.isInteger(value) || value < 0 || value > 255)) { + return false; + } + + const [first, second] = octets; + return ( + first === 10 || + first === 127 || + (first === 192 && second === 168) || + (first === 172 && second >= 16 && second <= 31) + ); +} + function isLocalhostUrl(url: string): boolean { if (!url) return true; try { const parsed = new URL(url); const hostname = parsed.hostname.toLowerCase(); + if (hostname === "localhost" || hostname === "::1" || hostname.endsWith(".local")) { + return true; + } + if (isPrivateIpv4Address(hostname)) { + return true; + } + return ( - hostname === "localhost" || - hostname === "127.0.0.1" || - hostname.startsWith("192.168.") || - hostname.startsWith("10.") || - hostname.startsWith("172.") || - hostname.endsWith(".local") + /^fe[89ab][0-9a-f]*:/i.test(hostname) || + /^(fc|fd)[0-9a-f]*:/i.test(hostname) ); } catch { return true; diff --git a/src/lib/telegram/polling-lifecycle.ts b/src/lib/telegram/polling-lifecycle.ts index 529a845..18bc529 100644 --- a/src/lib/telegram/polling-lifecycle.ts +++ b/src/lib/telegram/polling-lifecycle.ts @@ -1,5 +1,7 @@ import { + generateTelegramWebhookSecret, getTelegramIntegrationRuntimeConfig, + saveTelegramIntegrationStoredSettings, detectTelegramMode, type TelegramIntegrationRuntimeConfig, } from "@/lib/storage/telegram-integration-store"; @@ -57,6 +59,14 @@ async function setupTelegramWebhook( throw new Error("Bot token and public base URL are required"); } + let effectiveWebhookSecret = webhookSecret.trim(); + if (!effectiveWebhookSecret) { + effectiveWebhookSecret = generateTelegramWebhookSecret(); + await saveTelegramIntegrationStoredSettings({ + webhookSecret: effectiveWebhookSecret, + }); + } + const webhookUrl = `${publicBaseUrl.replace(/\/$/, "")}/api/integrations/telegram`; const response = await fetch( @@ -66,7 +76,7 @@ async function setupTelegramWebhook( headers: { "Content-Type": "application/json" }, body: JSON.stringify({ url: webhookUrl, - secret_token: webhookSecret.trim() || undefined, + secret_token: effectiveWebhookSecret, allowed_updates: ["message"], }), } diff --git a/src/lib/telegram/polling-service.ts b/src/lib/telegram/polling-service.ts index af09b46..7b9efc3 100644 --- a/src/lib/telegram/polling-service.ts +++ b/src/lib/telegram/polling-service.ts @@ -1,4 +1,6 @@ import { + detectTelegramMode, + getTelegramIntegrationRuntimeConfig, type TelegramIntegrationRuntimeConfig, } from "@/lib/storage/telegram-integration-store"; import { @@ -30,6 +32,12 @@ class TelegramPollingService { private runtimeConfig: TelegramIntegrationRuntimeConfig | null = null; private pollTimeout: NodeJS.Timeout | null = null; + private extractUpdateId(update: TelegramUpdate): number | null { + return typeof update.update_id === "number" && Number.isInteger(update.update_id) + ? update.update_id + : null; + } + get status(): PollingStatus { return { isRunning: this.isRunning, @@ -52,6 +60,7 @@ class TelegramPollingService { this.runtimeConfig = runtimeConfig; this.isRunning = true; this.abortController = new AbortController(); + this.errorCount = 0; this.consecutiveErrors = 0; console.log("[Telegram Polling] Starting polling service..."); @@ -98,23 +107,63 @@ class TelegramPollingService { } private async poll(): Promise { - if (!this.isRunning || !this.runtimeConfig) { + if (!this.isRunning) { return; } - const { botToken } = this.runtimeConfig; - try { + const runtimeConfig = await getTelegramIntegrationRuntimeConfig(); + const botToken = runtimeConfig.botToken.trim(); + if (!botToken) { + throw new Error("Bot token is required"); + } + + const detectedMode = detectTelegramMode(runtimeConfig); + if (detectedMode !== "polling") { + console.log("[Telegram Polling] Detected mode is webhook, stopping polling service"); + this.stop(); + return; + } + + this.runtimeConfig = runtimeConfig; const updates = await this.getUpdates(botToken); this.consecutiveErrors = 0; this.lastPollTime = new Date().toISOString(); + let retrySoon = false; + for (const update of updates) { if (!this.isRunning) break; - await this.processUpdate(update); + const updateId = this.extractUpdateId(update); + if (updateId === null) { + console.warn("[Telegram Polling] Received update without valid update_id, skipping"); + continue; + } + + const processed = await this.processUpdate(update, runtimeConfig); + if (!processed) { + retrySoon = true; + break; + } + + // Confirm only successfully processed updates to avoid data loss. + this.lastUpdateId = updateId; + } + + if (retrySoon) { + this.scheduleNextPoll(1000); + return; } } catch (error) { + if ( + !this.isRunning && + error instanceof Error && + error.name === "AbortError" + ) { + return; + } + this.errorCount++; this.consecutiveErrors++; @@ -165,25 +214,27 @@ class TelegramPollingService { return []; } - // Update lastUpdateId to the highest received - for (const update of result) { - const updateId = typeof update.update_id === "number" ? update.update_id : null; - if (updateId !== null && (this.lastUpdateId === null || updateId > this.lastUpdateId)) { - this.lastUpdateId = updateId; - } - } - return result as TelegramUpdate[]; } - private async processUpdate(update: TelegramUpdate): Promise { - if (!this.runtimeConfig) return; - + private async processUpdate( + update: TelegramUpdate, + runtimeConfig: TelegramIntegrationRuntimeConfig + ): Promise { try { - await processTelegramUpdate(update, this.runtimeConfig); + await processTelegramUpdate(update, runtimeConfig); + return true; } catch (error) { + this.errorCount++; + this.consecutiveErrors++; console.error("[Telegram Polling] Error processing update:", error); - // Don't throw - continue processing other updates + + if (this.consecutiveErrors >= 10) { + console.error("[Telegram Polling] Too many consecutive processing errors, stopping polling"); + this.stop(); + } + + return false; } }