mirror of
https://github.com/eggent-ai/eggent.git
synced 2026-05-13 15:46:00 +00:00
fix(telegram): harden polling reliability and mode/secret handling
This commit is contained in:
@@ -70,16 +70,42 @@ export function TelegramIntegrationManager() {
|
||||
|
||||
// Helper to detect if URL is localhost/private (needs polling) or public (can use webhook)
|
||||
const detectUrlMode = useCallback((url: string): "webhook" | "polling" => {
|
||||
if (!url.trim()) return "polling";
|
||||
const lowerUrl = url.toLowerCase().trim();
|
||||
// Check for localhost, private IPs, or non-HTTPS
|
||||
if (lowerUrl.includes("localhost")) return "polling";
|
||||
if (lowerUrl.includes("127.0.0.1")) return "polling";
|
||||
if (lowerUrl.includes("192.168.")) return "polling";
|
||||
if (lowerUrl.includes("10.0.")) return "polling";
|
||||
if (lowerUrl.includes("172.16.")) return "polling";
|
||||
if (lowerUrl.startsWith("http://")) return "polling";
|
||||
return "webhook";
|
||||
const normalized = url.trim();
|
||||
if (!normalized) return "polling";
|
||||
|
||||
try {
|
||||
const parsed = new URL(normalized);
|
||||
const hostname = parsed.hostname.toLowerCase();
|
||||
if (parsed.protocol !== "https:") return "polling";
|
||||
|
||||
if (
|
||||
hostname === "localhost" ||
|
||||
hostname === "::1" ||
|
||||
hostname.endsWith(".local")
|
||||
) {
|
||||
return "polling";
|
||||
}
|
||||
|
||||
const octets = hostname.split(".").map((part) => Number(part));
|
||||
if (
|
||||
octets.length === 4 &&
|
||||
octets.every((value) => Number.isInteger(value) && value >= 0 && value <= 255)
|
||||
) {
|
||||
const [first, second] = octets;
|
||||
if (
|
||||
first === 10 ||
|
||||
first === 127 ||
|
||||
(first === 192 && second === 168) ||
|
||||
(first === 172 && second >= 16 && second <= 31)
|
||||
) {
|
||||
return "polling";
|
||||
}
|
||||
}
|
||||
|
||||
return "webhook";
|
||||
} catch {
|
||||
return "polling";
|
||||
}
|
||||
}, []);
|
||||
const [allowedUserIdsInput, setAllowedUserIdsInput] = useState("");
|
||||
const [pendingAccessCodes, setPendingAccessCodes] = useState(0);
|
||||
|
||||
@@ -240,18 +240,36 @@ function normalizePollingInterval(raw: unknown): number {
|
||||
return Math.max(1000, Math.min(60000, numeric));
|
||||
}
|
||||
|
||||
function isPrivateIpv4Address(hostname: string): boolean {
|
||||
const octets = hostname.split(".").map((part) => Number(part));
|
||||
if (octets.length !== 4 || octets.some((value) => !Number.isInteger(value) || value < 0 || value > 255)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const [first, second] = octets;
|
||||
return (
|
||||
first === 10 ||
|
||||
first === 127 ||
|
||||
(first === 192 && second === 168) ||
|
||||
(first === 172 && second >= 16 && second <= 31)
|
||||
);
|
||||
}
|
||||
|
||||
function isLocalhostUrl(url: string): boolean {
|
||||
if (!url) return true;
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
const hostname = parsed.hostname.toLowerCase();
|
||||
if (hostname === "localhost" || hostname === "::1" || hostname.endsWith(".local")) {
|
||||
return true;
|
||||
}
|
||||
if (isPrivateIpv4Address(hostname)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return (
|
||||
hostname === "localhost" ||
|
||||
hostname === "127.0.0.1" ||
|
||||
hostname.startsWith("192.168.") ||
|
||||
hostname.startsWith("10.") ||
|
||||
hostname.startsWith("172.") ||
|
||||
hostname.endsWith(".local")
|
||||
/^fe[89ab][0-9a-f]*:/i.test(hostname) ||
|
||||
/^(fc|fd)[0-9a-f]*:/i.test(hostname)
|
||||
);
|
||||
} catch {
|
||||
return true;
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import {
|
||||
generateTelegramWebhookSecret,
|
||||
getTelegramIntegrationRuntimeConfig,
|
||||
saveTelegramIntegrationStoredSettings,
|
||||
detectTelegramMode,
|
||||
type TelegramIntegrationRuntimeConfig,
|
||||
} from "@/lib/storage/telegram-integration-store";
|
||||
@@ -57,6 +59,14 @@ async function setupTelegramWebhook(
|
||||
throw new Error("Bot token and public base URL are required");
|
||||
}
|
||||
|
||||
let effectiveWebhookSecret = webhookSecret.trim();
|
||||
if (!effectiveWebhookSecret) {
|
||||
effectiveWebhookSecret = generateTelegramWebhookSecret();
|
||||
await saveTelegramIntegrationStoredSettings({
|
||||
webhookSecret: effectiveWebhookSecret,
|
||||
});
|
||||
}
|
||||
|
||||
const webhookUrl = `${publicBaseUrl.replace(/\/$/, "")}/api/integrations/telegram`;
|
||||
|
||||
const response = await fetch(
|
||||
@@ -66,7 +76,7 @@ async function setupTelegramWebhook(
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
url: webhookUrl,
|
||||
secret_token: webhookSecret.trim() || undefined,
|
||||
secret_token: effectiveWebhookSecret,
|
||||
allowed_updates: ["message"],
|
||||
}),
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import {
|
||||
detectTelegramMode,
|
||||
getTelegramIntegrationRuntimeConfig,
|
||||
type TelegramIntegrationRuntimeConfig,
|
||||
} from "@/lib/storage/telegram-integration-store";
|
||||
import {
|
||||
@@ -30,6 +32,12 @@ class TelegramPollingService {
|
||||
private runtimeConfig: TelegramIntegrationRuntimeConfig | null = null;
|
||||
private pollTimeout: NodeJS.Timeout | null = null;
|
||||
|
||||
private extractUpdateId(update: TelegramUpdate): number | null {
|
||||
return typeof update.update_id === "number" && Number.isInteger(update.update_id)
|
||||
? update.update_id
|
||||
: null;
|
||||
}
|
||||
|
||||
get status(): PollingStatus {
|
||||
return {
|
||||
isRunning: this.isRunning,
|
||||
@@ -52,6 +60,7 @@ class TelegramPollingService {
|
||||
this.runtimeConfig = runtimeConfig;
|
||||
this.isRunning = true;
|
||||
this.abortController = new AbortController();
|
||||
this.errorCount = 0;
|
||||
this.consecutiveErrors = 0;
|
||||
|
||||
console.log("[Telegram Polling] Starting polling service...");
|
||||
@@ -98,23 +107,63 @@ class TelegramPollingService {
|
||||
}
|
||||
|
||||
private async poll(): Promise<void> {
|
||||
if (!this.isRunning || !this.runtimeConfig) {
|
||||
if (!this.isRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { botToken } = this.runtimeConfig;
|
||||
|
||||
try {
|
||||
const runtimeConfig = await getTelegramIntegrationRuntimeConfig();
|
||||
const botToken = runtimeConfig.botToken.trim();
|
||||
if (!botToken) {
|
||||
throw new Error("Bot token is required");
|
||||
}
|
||||
|
||||
const detectedMode = detectTelegramMode(runtimeConfig);
|
||||
if (detectedMode !== "polling") {
|
||||
console.log("[Telegram Polling] Detected mode is webhook, stopping polling service");
|
||||
this.stop();
|
||||
return;
|
||||
}
|
||||
|
||||
this.runtimeConfig = runtimeConfig;
|
||||
const updates = await this.getUpdates(botToken);
|
||||
|
||||
this.consecutiveErrors = 0;
|
||||
this.lastPollTime = new Date().toISOString();
|
||||
|
||||
let retrySoon = false;
|
||||
|
||||
for (const update of updates) {
|
||||
if (!this.isRunning) break;
|
||||
await this.processUpdate(update);
|
||||
const updateId = this.extractUpdateId(update);
|
||||
if (updateId === null) {
|
||||
console.warn("[Telegram Polling] Received update without valid update_id, skipping");
|
||||
continue;
|
||||
}
|
||||
|
||||
const processed = await this.processUpdate(update, runtimeConfig);
|
||||
if (!processed) {
|
||||
retrySoon = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// Confirm only successfully processed updates to avoid data loss.
|
||||
this.lastUpdateId = updateId;
|
||||
}
|
||||
|
||||
if (retrySoon) {
|
||||
this.scheduleNextPoll(1000);
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
if (
|
||||
!this.isRunning &&
|
||||
error instanceof Error &&
|
||||
error.name === "AbortError"
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.errorCount++;
|
||||
this.consecutiveErrors++;
|
||||
|
||||
@@ -165,25 +214,27 @@ class TelegramPollingService {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Update lastUpdateId to the highest received
|
||||
for (const update of result) {
|
||||
const updateId = typeof update.update_id === "number" ? update.update_id : null;
|
||||
if (updateId !== null && (this.lastUpdateId === null || updateId > this.lastUpdateId)) {
|
||||
this.lastUpdateId = updateId;
|
||||
}
|
||||
}
|
||||
|
||||
return result as TelegramUpdate[];
|
||||
}
|
||||
|
||||
private async processUpdate(update: TelegramUpdate): Promise<void> {
|
||||
if (!this.runtimeConfig) return;
|
||||
|
||||
private async processUpdate(
|
||||
update: TelegramUpdate,
|
||||
runtimeConfig: TelegramIntegrationRuntimeConfig
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
await processTelegramUpdate(update, this.runtimeConfig);
|
||||
await processTelegramUpdate(update, runtimeConfig);
|
||||
return true;
|
||||
} catch (error) {
|
||||
this.errorCount++;
|
||||
this.consecutiveErrors++;
|
||||
console.error("[Telegram Polling] Error processing update:", error);
|
||||
// Don't throw - continue processing other updates
|
||||
|
||||
if (this.consecutiveErrors >= 10) {
|
||||
console.error("[Telegram Polling] Too many consecutive processing errors, stopping polling");
|
||||
this.stop();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user