refactor(telegram): Simplify and modularize the handling of Telegram updates

- Removal of complex internal functions and previously integrated helper modules
- Centralized integration of update processing via `processTelegramUpdate`
- Reduced code for direct handling of sessions, files, commands, and messages
- Improved error handling with rollback for duplicate or failed updates
- Updated the React TelegramIntegrationManager component to support connection modes
- Added support for “auto,” “webhook,” and “polling” modes with control and status in the interface
- Implementation of automatic mode detection based on public URL or local environment
- UI improvements for configuration and connection statuses, including polling and webhook
- Initialization of the Telegram lifecycle (polling or webhook) upon server startup
- Updated documentation and text to reflect new integration and usage options
This commit is contained in:
Anton Sychev
2026-03-25 21:05:14 +01:00
parent aca51e1e19
commit 292bb62ed5
9 changed files with 1743 additions and 907 deletions

View File

@@ -0,0 +1,104 @@
import { NextRequest } from "next/server";
import {
getTelegramIntegrationRuntimeConfig,
detectTelegramMode,
} from "@/lib/storage/telegram-integration-store";
import { telegramPollingService } from "@/lib/telegram/polling-service";
export const maxDuration = 300;
export async function GET() {
const runtime = await getTelegramIntegrationRuntimeConfig();
const detectedMode = detectTelegramMode(runtime);
return Response.json({
status: "ok",
polling: telegramPollingService.status,
config: {
mode: runtime.mode,
detectedMode,
canStartPolling: !!runtime.botToken && detectedMode === "polling",
},
});
}
export async function POST(req: NextRequest) {
try {
const runtime = await getTelegramIntegrationRuntimeConfig();
const detectedMode = detectTelegramMode(runtime);
if (!runtime.botToken.trim()) {
return Response.json(
{ error: "Telegram bot token is not configured" },
{ status: 503 }
);
}
// Only allow polling if detected mode is polling or user explicitly forces it
const body = (await req.json().catch(() => ({}))) as { force?: boolean };
const force = body.force === true;
if (detectedMode === "webhook" && !force) {
return Response.json(
{
error: "Detected mode is webhook. Use force=true to start polling anyway.",
detectedMode,
},
{ status: 400 }
);
}
if (telegramPollingService.status.isRunning) {
return Response.json(
{
error: "Polling is already running",
polling: telegramPollingService.status,
},
{ status: 409 }
);
}
await telegramPollingService.start(runtime);
return Response.json({
ok: true,
message: "Polling started",
polling: telegramPollingService.status,
});
} catch (error) {
console.error("[Telegram Polling API] Error starting polling:", error);
return Response.json(
{
error: error instanceof Error ? error.message : "Failed to start polling",
},
{ status: 500 }
);
}
}
export async function DELETE() {
try {
if (!telegramPollingService.status.isRunning) {
return Response.json(
{ error: "Polling is not running" },
{ status: 409 }
);
}
telegramPollingService.stop();
return Response.json({
ok: true,
message: "Polling stopped",
polling: telegramPollingService.status,
});
} catch (error) {
console.error("[Telegram Polling API] Error stopping polling:", error);
return Response.json(
{
error: error instanceof Error ? error.message : "Failed to stop polling",
},
{ status: 500 }
);
}
}

View File

@@ -1,136 +1,12 @@
import { NextRequest } from "next/server";
import { timingSafeEqual } from "node:crypto";
import {
ExternalMessageError,
handleExternalMessage,
} from "@/lib/external/handle-external-message";
import {
createDefaultTelegramSessionId,
createFreshTelegramSessionId,
getTelegramChatSessionId,
setTelegramChatSessionId,
} from "@/lib/storage/telegram-session-store";
import {
claimTelegramUpdate,
releaseTelegramUpdate,
} from "@/lib/storage/telegram-update-store";
import {
consumeTelegramAccessCode,
getTelegramIntegrationRuntimeConfig,
normalizeTelegramUserId,
} from "@/lib/storage/telegram-integration-store";
import { saveChatFile } from "@/lib/storage/chat-files-store";
import { createChat, getChat } from "@/lib/storage/chat-store";
import {
contextKey,
type ExternalSession,
getOrCreateExternalSession,
saveExternalSession,
} from "@/lib/storage/external-session-store";
import { getAllProjects } from "@/lib/storage/project-store";
const TELEGRAM_TEXT_LIMIT = 4096;
const TELEGRAM_FILE_MAX_BYTES = 30 * 1024 * 1024;
interface TelegramUpdate {
update_id?: unknown;
message?: TelegramMessage;
}
interface TelegramMessage {
message_id?: unknown;
text?: unknown;
caption?: unknown;
from?: {
id?: unknown;
};
chat?: {
id?: unknown;
type?: unknown;
};
document?: {
file_id?: unknown;
file_name?: unknown;
mime_type?: unknown;
};
photo?: Array<{
file_id?: unknown;
width?: unknown;
height?: unknown;
}>;
audio?: {
file_id?: unknown;
file_name?: unknown;
mime_type?: unknown;
};
video?: {
file_id?: unknown;
file_name?: unknown;
mime_type?: unknown;
};
voice?: {
file_id?: unknown;
mime_type?: unknown;
};
}
interface TelegramApiResponse {
ok?: boolean;
description?: string;
result?: Record<string, unknown>;
}
interface TelegramIncomingFile {
fileId: string;
fileName: string;
}
interface TelegramExternalChatContext {
chatId: string;
projectId?: string;
currentPath: string;
}
function normalizeTelegramCurrentPath(rawPath: string | undefined): string {
const value = (rawPath ?? "").trim();
if (!value || value === "/telegram") {
return "";
}
return value;
}
interface TelegramResolvedProjectContext {
session: ExternalSession;
resolvedProjectId?: string;
projectName?: string;
}
function parseTelegramError(status: number, payload: TelegramApiResponse | null): string {
const description = payload?.description?.trim();
return description
? `Telegram API error (${status}): ${description}`
: `Telegram API error (${status})`;
}
async function callTelegramApi(
botToken: string,
method: string,
body?: Record<string, unknown>
): Promise<TelegramApiResponse> {
const response = await fetch(`https://api.telegram.org/bot${botToken}/${method}`, {
method: body ? "POST" : "GET",
headers: body ? { "Content-Type": "application/json" } : undefined,
body: body ? JSON.stringify(body) : undefined,
});
const payload = (await response.json().catch(() => null)) as
| TelegramApiResponse
| null;
if (!response.ok || !payload?.ok) {
throw new Error(parseTelegramError(response.status, payload));
}
return payload;
}
processTelegramUpdate,
type TelegramUpdate,
} from "@/lib/telegram/telegram-message-handler";
function safeTokenMatch(actual: string, expected: string): boolean {
const actualBytes = Buffer.from(actual);
@@ -142,334 +18,6 @@ function safeTokenMatch(actual: string, expected: string): boolean {
return timingSafeEqual(actualBytes, expectedBytes);
}
function getBotId(botToken: string): string {
const [rawBotId] = botToken.trim().split(":", 1);
const botId = rawBotId?.trim() || "default";
return botId.replace(/[^a-zA-Z0-9._:-]/g, "_").slice(0, 128) || "default";
}
function chatBelongsToProject(
chatProjectId: string | undefined,
projectId: string | undefined
): boolean {
const left = chatProjectId ?? null;
const right = projectId ?? null;
return left === right;
}
async function ensureTelegramExternalChatContext(params: {
sessionId: string;
defaultProjectId?: string;
}): Promise<TelegramExternalChatContext> {
const { session, resolvedProjectId } = await resolveTelegramProjectContext({
sessionId: params.sessionId,
defaultProjectId: params.defaultProjectId,
});
const projectKey = contextKey(resolvedProjectId);
let resolvedChatId = session.activeChats[projectKey];
if (resolvedChatId) {
const existing = await getChat(resolvedChatId);
if (!existing || !chatBelongsToProject(existing.projectId, resolvedProjectId)) {
resolvedChatId = "";
}
}
if (!resolvedChatId) {
resolvedChatId = crypto.randomUUID();
await createChat(
resolvedChatId,
`External session ${session.id}`,
resolvedProjectId
);
}
session.activeChats[projectKey] = resolvedChatId;
session.currentPaths[projectKey] = normalizeTelegramCurrentPath(
session.currentPaths[projectKey]
);
session.updatedAt = new Date().toISOString();
await saveExternalSession(session);
return {
chatId: resolvedChatId,
projectId: resolvedProjectId,
currentPath: session.currentPaths[projectKey] ?? "",
};
}
async function resolveTelegramProjectContext(params: {
sessionId: string;
defaultProjectId?: string;
}): Promise<TelegramResolvedProjectContext> {
const session = await getOrCreateExternalSession(params.sessionId);
const projects = await getAllProjects();
const projectById = new Map(projects.map((project) => [project.id, project]));
let resolvedProjectId: string | undefined;
const explicitProjectId = params.defaultProjectId?.trim() || "";
if (explicitProjectId) {
if (!projectById.has(explicitProjectId)) {
throw new Error(`Project "${explicitProjectId}" not found`);
}
resolvedProjectId = explicitProjectId;
session.activeProjectId = explicitProjectId;
} else if (session.activeProjectId && projectById.has(session.activeProjectId)) {
resolvedProjectId = session.activeProjectId;
} else if (projects.length > 0) {
resolvedProjectId = projects[0].id;
session.activeProjectId = projects[0].id;
} else {
session.activeProjectId = null;
}
return {
session,
resolvedProjectId,
projectName: resolvedProjectId ? projectById.get(resolvedProjectId)?.name : undefined,
};
}
function extensionFromMime(mimeType: string): string {
const lower = mimeType.toLowerCase();
if (lower.includes("pdf")) return ".pdf";
if (lower.includes("png")) return ".png";
if (lower.includes("jpeg") || lower.includes("jpg")) return ".jpg";
if (lower.includes("webp")) return ".webp";
if (lower.includes("gif")) return ".gif";
if (lower.includes("mp4")) return ".mp4";
if (lower.includes("mpeg") || lower.includes("mp3")) return ".mp3";
if (lower.includes("ogg")) return ".ogg";
if (lower.includes("wav")) return ".wav";
if (lower.includes("plain")) return ".txt";
return "";
}
function buildIncomingFileName(params: {
base: string;
messageId?: number;
mimeType?: string;
}): string {
const suffix = params.messageId ?? Date.now();
const ext = params.mimeType ? extensionFromMime(params.mimeType) : "";
return `${params.base}-${suffix}${ext}`;
}
function sanitizeFileName(value: string): string {
const base = value.trim().replace(/[\\/]+/g, "_");
return base || `file-${Date.now()}`;
}
function withMessageIdPrefix(fileName: string, messageId?: number): string {
if (typeof messageId !== "number") return fileName;
return `${messageId}-${fileName}`;
}
function extractIncomingFile(
message: TelegramMessage,
messageId?: number
): TelegramIncomingFile | null {
const documentFileId =
typeof message.document?.file_id === "string"
? message.document.file_id.trim()
: "";
if (documentFileId) {
const docNameRaw =
typeof message.document?.file_name === "string"
? message.document.file_name
: "";
const fallback = buildIncomingFileName({
base: "document",
messageId,
mimeType:
typeof message.document?.mime_type === "string"
? message.document.mime_type
: undefined,
});
return {
fileId: documentFileId,
fileName: withMessageIdPrefix(sanitizeFileName(docNameRaw || fallback), messageId),
};
}
const photos: Array<{ file_id?: unknown }> = Array.isArray(message.photo)
? message.photo
: [];
for (let i = photos.length - 1; i >= 0; i -= 1) {
const photo = photos[i];
const fileId = typeof photo?.file_id === "string" ? photo.file_id.trim() : "";
if (fileId) {
return {
fileId,
fileName: sanitizeFileName(
buildIncomingFileName({ base: "photo", messageId, mimeType: "image/jpeg" })
),
};
}
}
const audioFileId =
typeof message.audio?.file_id === "string" ? message.audio.file_id.trim() : "";
if (audioFileId) {
const audioNameRaw =
typeof message.audio?.file_name === "string" ? message.audio.file_name : "";
const fallback = buildIncomingFileName({
base: "audio",
messageId,
mimeType:
typeof message.audio?.mime_type === "string"
? message.audio.mime_type
: undefined,
});
return {
fileId: audioFileId,
fileName: withMessageIdPrefix(sanitizeFileName(audioNameRaw || fallback), messageId),
};
}
const videoFileId =
typeof message.video?.file_id === "string" ? message.video.file_id.trim() : "";
if (videoFileId) {
const videoNameRaw =
typeof message.video?.file_name === "string" ? message.video.file_name : "";
const fallback = buildIncomingFileName({
base: "video",
messageId,
mimeType:
typeof message.video?.mime_type === "string"
? message.video.mime_type
: undefined,
});
return {
fileId: videoFileId,
fileName: withMessageIdPrefix(sanitizeFileName(videoNameRaw || fallback), messageId),
};
}
const voiceFileId =
typeof message.voice?.file_id === "string" ? message.voice.file_id.trim() : "";
if (voiceFileId) {
return {
fileId: voiceFileId,
fileName: sanitizeFileName(
buildIncomingFileName({
base: "voice",
messageId,
mimeType:
typeof message.voice?.mime_type === "string"
? message.voice.mime_type
: undefined,
})
),
};
}
return null;
}
async function downloadTelegramFile(botToken: string, fileId: string): Promise<Buffer> {
const payload = await callTelegramApi(botToken, "getFile", {
file_id: fileId,
});
const result = payload.result ?? {};
const filePath = typeof result.file_path === "string" ? result.file_path : "";
if (!filePath) {
throw new Error("Telegram getFile returned empty file_path");
}
const fileUrl = `https://api.telegram.org/file/bot${botToken}/${filePath}`;
const response = await fetch(fileUrl);
if (!response.ok) {
throw new Error(`Failed to download Telegram file (${response.status})`);
}
const bytes = await response.arrayBuffer();
if (bytes.byteLength > TELEGRAM_FILE_MAX_BYTES) {
throw new Error(
`Telegram file is too large (${bytes.byteLength} bytes). Max supported size is ${TELEGRAM_FILE_MAX_BYTES} bytes.`
);
}
return Buffer.from(bytes);
}
function extractCommand(text: string): string | null {
const first = text.trim().split(/\s+/, 1)[0];
if (!first || !first.startsWith("/")) return null;
return first.split("@", 1)[0].toLowerCase();
}
function extractAccessCodeCandidate(text: string): string | null {
const value = text.trim();
if (!value) return null;
const fromCommand = value.match(
/^\/(?:code|start)(?:@[a-zA-Z0-9_]+)?\s+([A-Za-z0-9_-]{6,64})$/i
);
if (fromCommand?.[1]) {
return fromCommand[1];
}
if (/^[A-Za-z0-9_-]{6,64}$/.test(value)) {
return value;
}
return null;
}
function normalizeOutgoingText(text: string): string {
const value = text.trim();
if (!value) return "Пустой ответ от агента.";
if (value.length <= TELEGRAM_TEXT_LIMIT) return value;
return `${value.slice(0, TELEGRAM_TEXT_LIMIT - 1)}`;
}
async function sendTelegramMessage(
botToken: string,
chatId: number | string,
text: string,
replyToMessageId?: number
): Promise<void> {
const response = await fetch(`https://api.telegram.org/bot${botToken}/sendMessage`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
chat_id: chatId,
text: normalizeOutgoingText(text),
...(typeof replyToMessageId === "number" ? { reply_to_message_id: replyToMessageId } : {}),
}),
});
const payload = (await response.json().catch(() => null)) as
| { ok?: boolean; description?: string }
| null;
if (!response.ok || !payload?.ok) {
throw new Error(
`Telegram sendMessage failed (${response.status})${payload?.description ? `: ${payload.description}` : ""}`
);
}
}
function helpText(activeProject?: { id?: string; name?: string }): string {
const activeProjectLine = activeProject?.id
? `Active project: ${activeProject.name ? `${activeProject.name} (${activeProject.id})` : activeProject.id}`
: "Active project: not selected";
return [
"Telegram connection is active.",
activeProjectLine,
"",
"Commands:",
"/start - show this help",
"/help - show this help",
"/code <access_code> - activate access for your Telegram user",
"/new - start a new conversation (reset context)",
"",
"Text messages are sent to the agent.",
"File uploads are saved into chat files.",
"You can also ask the agent to send a local file back to Telegram.",
].join("\n");
}
export const maxDuration = 300;
export async function GET() {
@@ -484,8 +32,6 @@ export async function POST(req: NextRequest) {
const runtime = await getTelegramIntegrationRuntimeConfig();
const botToken = runtime.botToken.trim();
const webhookSecret = runtime.webhookSecret.trim();
const defaultProjectId = runtime.defaultProjectId || undefined;
const allowedUserIds = new Set(runtime.allowedUserIds);
if (!botToken || !webhookSecret) {
return Response.json(
@@ -502,235 +48,11 @@ export async function POST(req: NextRequest) {
return Response.json({ error: "Unauthorized" }, { status: 401 });
}
let botIdForRollback: string | null = null;
let updateIdForRollback: number | null = null;
try {
const body = (await req.json()) as TelegramUpdate;
const updateId =
typeof body.update_id === "number" && Number.isInteger(body.update_id)
? body.update_id
: null;
if (updateId === null) {
return Response.json({ error: "Invalid update_id" }, { status: 400 });
}
const botId = getBotId(botToken);
botIdForRollback = botId;
updateIdForRollback = updateId;
const isNewUpdate = await claimTelegramUpdate(botId, updateId);
if (!isNewUpdate) {
return Response.json({ ok: true, duplicate: true });
}
const message = body.message;
const chatId =
typeof message?.chat?.id === "number" || typeof message?.chat?.id === "string"
? message.chat.id
: null;
const chatType = typeof message?.chat?.type === "string" ? message.chat.type : "";
const messageId =
typeof message?.message_id === "number" ? message.message_id : undefined;
if (chatId === null || !chatType) {
return Response.json({ ok: true, ignored: true, reason: "unsupported_update" });
}
if (chatType !== "private") {
return Response.json({ ok: true, ignored: true, reason: "private_only" });
}
const text = typeof message?.text === "string" ? message.text.trim() : "";
const caption =
typeof message?.caption === "string" ? message.caption.trim() : "";
const incomingText = text || caption;
const fromUserId = normalizeTelegramUserId(message?.from?.id);
if (!fromUserId) {
return Response.json({
ok: true,
ignored: true,
reason: "missing_user_id",
});
}
if (!allowedUserIds.has(fromUserId)) {
const accessCode = extractAccessCodeCandidate(text);
const granted =
accessCode &&
(await consumeTelegramAccessCode({
code: accessCode,
userId: fromUserId,
}));
if (granted) {
await sendTelegramMessage(
botToken,
chatId,
"Доступ выдан. Теперь можно отправлять сообщения агенту.",
messageId
);
return Response.json({
ok: true,
accessGranted: true,
userId: fromUserId,
});
}
await sendTelegramMessage(
botToken,
chatId,
[
"Доступ запрещён: ваш user_id не в списке разрешённых.",
"Отправьте код активации командой /code <код> или /start <код>.",
`Ваш user_id: ${fromUserId}`,
].join("\n"),
messageId
);
return Response.json({
ok: true,
ignored: true,
reason: "user_not_allowed",
userId: fromUserId,
});
}
let sessionId = await getTelegramChatSessionId(botId, chatId);
if (!sessionId) {
sessionId = createDefaultTelegramSessionId(botId, chatId);
await setTelegramChatSessionId(botId, chatId, sessionId);
}
const command = extractCommand(text);
if (command === "/start" || command === "/help") {
const resolvedProject = await resolveTelegramProjectContext({
sessionId,
defaultProjectId,
});
await saveExternalSession({
...resolvedProject.session,
updatedAt: new Date().toISOString(),
});
await sendTelegramMessage(
botToken,
chatId,
helpText({
id: resolvedProject.resolvedProjectId,
name: resolvedProject.projectName,
}),
messageId
);
return Response.json({ ok: true, command });
}
if (command === "/new") {
const freshSessionId = createFreshTelegramSessionId(botId, chatId);
await setTelegramChatSessionId(botId, chatId, freshSessionId);
await sendTelegramMessage(
botToken,
chatId,
"Начал новый диалог. Контекст очищен для следующего сообщения.",
messageId
);
return Response.json({ ok: true, command });
}
let incomingSavedFile:
| {
name: string;
path: string;
size: number;
}
| null = null;
const incomingFile = message ? extractIncomingFile(message, messageId) : null;
let externalContext: TelegramExternalChatContext | null = null;
if (incomingFile) {
externalContext = await ensureTelegramExternalChatContext({
sessionId,
defaultProjectId,
});
const fileBuffer = await downloadTelegramFile(botToken, incomingFile.fileId);
const saved = await saveChatFile(
externalContext.chatId,
fileBuffer,
incomingFile.fileName
);
incomingSavedFile = {
name: saved.name,
path: saved.path,
size: saved.size,
};
}
if (!incomingText) {
if (incomingSavedFile) {
await sendTelegramMessage(
botToken,
chatId,
`File "${incomingSavedFile.name}" saved to chat files.`,
messageId
);
return Response.json({
ok: true,
fileSaved: true,
file: incomingSavedFile,
});
}
await sendTelegramMessage(
botToken,
chatId,
"Only text messages and file uploads are supported right now.",
messageId
);
return Response.json({ ok: true, ignored: true, reason: "non_text" });
}
try {
const result = await handleExternalMessage({
sessionId,
message: incomingSavedFile
? `${incomingText}\n\nAttached file: ${incomingSavedFile.name}`
: incomingText,
projectId: externalContext?.projectId ?? defaultProjectId,
chatId: externalContext?.chatId,
currentPath: normalizeTelegramCurrentPath(externalContext?.currentPath),
runtimeData: {
telegram: {
botToken,
chatId,
replyToMessageId: messageId ?? null,
},
},
});
await sendTelegramMessage(botToken, chatId, result.reply, messageId);
return Response.json({ ok: true });
} catch (error) {
if (error instanceof ExternalMessageError) {
const errorMessage =
typeof error.payload.error === "string"
? error.payload.error
: "Не удалось обработать сообщение.";
await sendTelegramMessage(botToken, chatId, `Ошибка: ${errorMessage}`, messageId);
return Response.json({ ok: true, handledError: true, status: error.status });
}
throw error;
}
const result = await processTelegramUpdate(body, runtime);
return Response.json(result);
} catch (error) {
if (
botIdForRollback &&
typeof updateIdForRollback === "number" &&
Number.isInteger(updateIdForRollback)
) {
try {
await releaseTelegramUpdate(botIdForRollback, updateIdForRollback);
} catch (releaseError) {
console.error("Telegram rollback error:", releaseError);
}
}
console.error("Telegram webhook error:", error);
return Response.json(
{

View File

@@ -54,7 +54,7 @@ export default function ApiPage() {
</section>
<section className="rounded-lg border bg-card p-4 space-y-3">
<h3 className="text-lg font-medium">Telegram Webhook</h3>
<h3 className="text-lg font-medium">Telegram Integration</h3>
<p className="text-sm text-muted-foreground">
Telegram endpoint: <span className="font-mono">POST /api/integrations/telegram</span>.
It reuses the same external session context engine as{" "}
@@ -64,14 +64,45 @@ export default function ApiPage() {
Configure credentials in <span className="font-mono">Dashboard -&gt; Messengers</span>
(bot token is enough; webhook secret/url are configured automatically).
</p>
<CodeBlock
code={`curl -X POST "https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/setWebhook" \\
<div className="space-y-2">
<h4 className="text-sm font-medium">Connection Modes</h4>
<ul className="list-disc pl-5 space-y-1 text-sm text-muted-foreground">
<li><strong>Webhook</strong> (default for public HTTPS URLs): Telegram pushes updates to your server. Requires a public HTTPS URL.</li>
<li><strong>Long Polling</strong> (default for localhost): Your server periodically fetches updates from Telegram. Works without HTTPS, perfect for local development.</li>
<li><strong>Auto</strong> (recommended): Automatically selects the best mode based on your Public Base URL configuration.</li>
</ul>
</div>
<div className="space-y-2">
<h4 className="text-sm font-medium">Webhook Setup (Production)</h4>
<CodeBlock
code={`curl -X POST "https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/setWebhook" \\
-H "Content-Type: application/json" \\
-d '{
"url": "https://YOUR_PUBLIC_BASE_URL/api/integrations/telegram",
"secret_token": "'$TELEGRAM_WEBHOOK_SECRET'"
}'`}
/>
/>
</div>
<div className="space-y-2">
<h4 className="text-sm font-medium">Long Polling API (Development)</h4>
<p className="text-sm text-muted-foreground">
When using long polling mode, control the polling service via API:
</p>
<CodeBlock
code={`# Get polling status
GET /api/integrations/telegram/polling
# Start polling
POST /api/integrations/telegram/polling
# Stop polling
DELETE /api/integrations/telegram/polling`}
/>
</div>
<p className="text-sm text-muted-foreground">
Supported commands: <span className="font-mono">/start</span>,{" "}
<span className="font-mono">/help</span>,{" "}

View File

@@ -2,6 +2,7 @@ import type { Metadata } from "next";
import { Geist, Geist_Mono } from "next/font/google";
import { unstable_noStore as noStore } from "next/cache";
import { getSettings } from "@/lib/storage/settings-store";
import { initTelegramLifecycle } from "@/lib/telegram/polling-lifecycle";
import "./globals.css";
const geistSans = Geist({
@@ -19,6 +20,13 @@ export const metadata: Metadata = {
description: "AI Agent Terminal - Execute code, manage memory, search the web",
};
// Initialize Telegram lifecycle (polling or webhook) on server startup
if (typeof window === "undefined") {
initTelegramLifecycle().catch((error) => {
console.error("Failed to initialize Telegram lifecycle:", error);
});
}
export default async function RootLayout({
children,
}: Readonly<{

View File

@@ -1,7 +1,7 @@
"use client";
import { useCallback, useEffect, useMemo, useState } from "react";
import { KeyRound, Loader2, Link2, RotateCcw, ShieldCheck, Trash2 } from "lucide-react";
import { KeyRound, Loader2, Link2, ShieldCheck, Trash2, Play, Square, Radio, Globe } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
@@ -14,9 +14,13 @@ interface TelegramSettingsResponse {
allowedUserIds: string[];
pendingAccessCodes: number;
updatedAt: string | null;
mode: "auto" | "webhook" | "polling";
pollingInterval: number;
detectedMode: "webhook" | "polling";
sources: {
botToken: "stored" | "env" | "none";
webhookSecret: "stored" | "env" | "none";
mode: "stored" | "env" | "none";
};
error?: string;
}
@@ -29,19 +33,24 @@ interface TelegramAccessCodeResponse {
error?: string;
}
interface WebhookStatusResponse {
configured: boolean;
message?: string;
webhook: {
url: string;
pendingUpdateCount: number;
lastErrorDate: number | null;
lastErrorMessage: string | null;
} | null;
error?: string;
interface PollingStatusResponse {
status: string;
polling: {
isRunning: boolean;
lastUpdateId: number | null;
lastPollTime: string | null;
errorCount: number;
consecutiveErrors: number;
};
config: {
mode: "auto" | "webhook" | "polling";
detectedMode: "webhook" | "polling";
canStartPolling: boolean;
};
}
type ActionState = "idle" | "loading";
type TelegramMode = "auto" | "webhook" | "polling";
function sourceLabel(source: "stored" | "env" | "none"): string {
if (source === "stored") return "stored in app";
@@ -56,6 +65,22 @@ export function TelegramIntegrationManager() {
const [tokenSource, setTokenSource] = useState<"stored" | "env" | "none">(
"none"
);
const [mode, setMode] = useState<TelegramMode>("auto");
const [detectedMode, setDetectedMode] = useState<"webhook" | "polling">("polling");
// Helper to detect if URL is localhost/private (needs polling) or public (can use webhook)
const detectUrlMode = useCallback((url: string): "webhook" | "polling" => {
if (!url.trim()) return "polling";
const lowerUrl = url.toLowerCase().trim();
// Check for localhost, private IPs, or non-HTTPS
if (lowerUrl.includes("localhost")) return "polling";
if (lowerUrl.includes("127.0.0.1")) return "polling";
if (lowerUrl.includes("192.168.")) return "polling";
if (lowerUrl.includes("10.0.")) return "polling";
if (lowerUrl.includes("172.16.")) return "polling";
if (lowerUrl.startsWith("http://")) return "polling";
return "webhook";
}, []);
const [allowedUserIdsInput, setAllowedUserIdsInput] = useState("");
const [pendingAccessCodes, setPendingAccessCodes] = useState(0);
const [generatedAccessCode, setGeneratedAccessCode] = useState<string | null>(null);
@@ -63,16 +88,14 @@ export function TelegramIntegrationManager() {
string | null
>(null);
const [updatedAt, setUpdatedAt] = useState<string | null>(null);
const [webhookStatus, setWebhookStatus] = useState<WebhookStatusResponse | null>(
null
);
const [pollingStatus, setPollingStatus] = useState<PollingStatusResponse | null>(null);
const [loadingSettings, setLoadingSettings] = useState(true);
const [connectState, setConnectState] = useState<ActionState>("idle");
const [reconnectState, setReconnectState] = useState<ActionState>("idle");
const [disconnectState, setDisconnectState] = useState<ActionState>("idle");
const [saveAllowedUsersState, setSaveAllowedUsersState] = useState<ActionState>("idle");
const [generateCodeState, setGenerateCodeState] = useState<ActionState>("idle");
const [webhookState, setWebhookState] = useState<ActionState>("idle");
const [pollingState, setPollingState] = useState<ActionState>("idle");
const [modeState, setModeState] = useState<ActionState>("idle");
const [error, setError] = useState<string | null>(null);
const [success, setSuccess] = useState<string | null>(null);
@@ -90,6 +113,8 @@ export function TelegramIntegrationManager() {
setStoredMaskedToken(data.botToken || "");
setPublicBaseUrl(data.publicBaseUrl || "");
setTokenSource(data.sources.botToken);
setMode(data.mode || "auto");
setDetectedMode(data.detectedMode || "polling");
setAllowedUserIdsInput((data.allowedUserIds || []).join(", "));
setPendingAccessCodes(
typeof data.pendingAccessCodes === "number" ? data.pendingAccessCodes : 0
@@ -102,28 +127,37 @@ export function TelegramIntegrationManager() {
}
}, []);
const loadWebhookStatus = useCallback(async () => {
setWebhookState("loading");
const loadPollingStatus = useCallback(async () => {
setPollingState("loading");
try {
const res = await fetch("/api/integrations/telegram/webhook", {
const res = await fetch("/api/integrations/telegram/polling", {
cache: "no-store",
});
const data = (await res.json()) as WebhookStatusResponse;
const data = (await res.json()) as PollingStatusResponse;
if (!res.ok) {
throw new Error(data.error || "Failed to load webhook status");
throw new Error("Failed to load polling status");
}
setWebhookStatus(data);
setPollingStatus(data);
} catch {
setWebhookStatus(null);
setPollingStatus(null);
} finally {
setWebhookState("idle");
setPollingState("idle");
}
}, []);
useEffect(() => {
loadSettings();
loadWebhookStatus();
}, [loadSettings, loadWebhookStatus]);
loadPollingStatus();
// Refresh polling status every 5 seconds when in polling mode
const interval = setInterval(() => {
if (detectedMode === "polling" || mode === "polling") {
loadPollingStatus();
}
}, 5000);
return () => clearInterval(interval);
}, [loadSettings, loadPollingStatus, detectedMode, mode]);
const connectTelegram = useCallback(async () => {
setConnectState("loading");
@@ -133,9 +167,6 @@ export function TelegramIntegrationManager() {
const trimmedToken = botToken.trim();
const trimmedBaseUrl = publicBaseUrl.trim();
if (!trimmedBaseUrl) {
throw new Error("Public Base URL is required");
}
if (!trimmedToken && tokenSource === "none") {
throw new Error("Telegram bot token is required");
}
@@ -169,43 +200,15 @@ export function TelegramIntegrationManager() {
throw new Error(setupData.error || "Failed to connect Telegram");
}
setSuccess(setupData.message || "Telegram connected");
setSuccess(setupData.message || "Webhook configured");
setBotToken("");
await Promise.all([loadSettings(), loadWebhookStatus()]);
await loadSettings();
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to connect Telegram");
} finally {
setConnectState("idle");
}
}, [botToken, loadSettings, loadWebhookStatus, publicBaseUrl, tokenSource]);
const reconnectTelegram = useCallback(async () => {
setReconnectState("loading");
setError(null);
setSuccess(null);
try {
const res = await fetch("/api/integrations/telegram/setup", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({}),
});
const data = (await res.json()) as {
success?: boolean;
message?: string;
error?: string;
};
if (!res.ok) {
throw new Error(data.error || "Failed to reconnect Telegram");
}
setSuccess(data.message || "Telegram reconnected");
await Promise.all([loadSettings(), loadWebhookStatus()]);
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to reconnect Telegram");
} finally {
setReconnectState("idle");
}
}, [loadSettings, loadWebhookStatus]);
}, [botToken, loadSettings, publicBaseUrl, tokenSource]);
const disconnectTelegram = useCallback(async () => {
setDisconnectState("loading");
@@ -231,13 +234,13 @@ export function TelegramIntegrationManager() {
setSuccess(messages.join(" "));
setBotToken("");
await Promise.all([loadSettings(), loadWebhookStatus()]);
await loadSettings();
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to disconnect Telegram");
} finally {
setDisconnectState("idle");
}
}, [loadSettings, loadWebhookStatus]);
}, [loadSettings]);
const saveAllowedUsers = useCallback(async () => {
setSaveAllowedUsersState("loading");
@@ -295,23 +298,84 @@ export function TelegramIntegrationManager() {
}
}, [loadSettings]);
const hasTokenConfigured = tokenSource !== "none";
const hasBaseUrlConfigured = publicBaseUrl.trim().length > 0;
const isConnected = hasTokenConfigured && hasBaseUrlConfigured;
const startPolling = useCallback(async () => {
setPollingState("loading");
setError(null);
setSuccess(null);
try {
const res = await fetch("/api/integrations/telegram/polling", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({}),
});
const data = (await res.json()) as { ok?: boolean; message?: string; error?: string };
if (!res.ok) {
throw new Error(data.error || "Failed to start polling");
}
setSuccess(data.message || "Polling started");
await loadPollingStatus();
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to start polling");
} finally {
setPollingState("idle");
}
}, [loadPollingStatus]);
const canConnect = useMemo(() => {
if (!publicBaseUrl.trim()) return false;
if (botToken.trim()) return true;
return tokenSource !== "none";
}, [botToken, publicBaseUrl, tokenSource]);
const stopPolling = useCallback(async () => {
setPollingState("loading");
setError(null);
setSuccess(null);
try {
const res = await fetch("/api/integrations/telegram/polling", {
method: "DELETE",
});
const data = (await res.json()) as { ok?: boolean; message?: string; error?: string };
if (!res.ok) {
throw new Error(data.error || "Failed to stop polling");
}
setSuccess(data.message || "Polling stopped");
await loadPollingStatus();
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to stop polling");
} finally {
setPollingState("idle");
}
}, [loadPollingStatus]);
const saveMode = useCallback(async (newMode: TelegramMode) => {
setModeState("loading");
setError(null);
setSuccess(null);
try {
const res = await fetch("/api/integrations/telegram/config", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ mode: newMode }),
});
const data = (await res.json()) as TelegramSettingsResponse;
if (!res.ok) {
throw new Error(data.error || "Failed to save mode");
}
setMode(data.mode || "auto");
setDetectedMode(data.detectedMode || "polling");
setSuccess(`Mode updated to ${newMode}`);
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to save mode");
} finally {
setModeState("idle");
}
}, []);
const hasTokenConfigured = tokenSource !== "none";
const isBusy =
loadingSettings ||
connectState === "loading" ||
reconnectState === "loading" ||
disconnectState === "loading" ||
saveAllowedUsersState === "loading" ||
generateCodeState === "loading";
generateCodeState === "loading" ||
pollingState === "loading" ||
modeState === "loading";
const updatedAtLabel = useMemo(() => {
if (!updatedAt) return null;
@@ -320,127 +384,311 @@ export function TelegramIntegrationManager() {
return date.toLocaleString();
}, [updatedAt]);
// Determine effective mode considering auto detection
const effectiveMode = mode === "auto" ? detectedMode : mode;
return (
<div className="space-y-4">
{/* Step 1: Bot Token */}
<section className="rounded-lg border bg-card p-4 space-y-4">
<div className="space-y-1">
<h3 className="text-lg font-medium">Telegram</h3>
{!isConnected ? (
<p className="text-sm text-muted-foreground">
Enter the bot token and Public Base URL, then click Connect Telegram.
</p>
) : (
<p className="text-sm text-muted-foreground">
Telegram is connected. You can reconnect or disconnect it.
<h3 className="text-lg font-medium">1. Bot Token</h3>
<p className="text-sm text-muted-foreground">
Enter your Telegram bot token from @BotFather.
</p>
</div>
<div className="space-y-2">
<Label htmlFor="telegram-bot-token">Bot Token</Label>
<Input
id="telegram-bot-token"
type="password"
value={botToken}
onChange={(e) => setBotToken(e.target.value)}
placeholder="123456789:AA..."
disabled={isBusy || hasTokenConfigured}
/>
{hasTokenConfigured && (
<p className="text-xs text-muted-foreground">
Token saved ({sourceLabel(tokenSource)})
{storedMaskedToken ? `: ${storedMaskedToken}` : ""}
</p>
)}
</div>
{!isConnected ? (
<>
<div className="space-y-2">
<Label htmlFor="telegram-bot-token">Bot Token</Label>
<Input
id="telegram-bot-token"
type="password"
value={botToken}
onChange={(e) => setBotToken(e.target.value)}
placeholder="123456789:AA..."
disabled={isBusy}
/>
<p className="text-xs text-muted-foreground">
Current source: {sourceLabel(tokenSource)}
{storedMaskedToken ? ` (${storedMaskedToken})` : ""}
</p>
</div>
{!hasTokenConfigured && (
<div className="flex flex-wrap items-center gap-2">
<Button
onClick={async () => {
const trimmedToken = botToken.trim();
if (!trimmedToken) {
setError("Bot token is required");
return;
}
setConnectState("loading");
setError(null);
try {
const res = await fetch("/api/integrations/telegram/config", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ botToken: trimmedToken }),
});
const data = (await res.json()) as { error?: string };
if (!res.ok) {
throw new Error(data.error || "Failed to save bot token");
}
setSuccess("Bot token saved");
setBotToken("");
await loadSettings();
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to save bot token");
} finally {
setConnectState("idle");
}
}}
disabled={!botToken.trim() || isBusy}
>
{connectState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Saving...
</>
) : (
<>
<Link2 className="size-4" />
Save Token
</>
)}
</Button>
</div>
)}
</section>
<div className="space-y-2">
<Label htmlFor="telegram-public-base-url">Public Base URL</Label>
<Input
id="telegram-public-base-url"
type="text"
value={publicBaseUrl}
onChange={(e) => setPublicBaseUrl(e.target.value)}
placeholder="https://your-public-host.example.com"
disabled={isBusy}
/>
<p className="text-xs text-muted-foreground">
Webhook endpoint:{" "}
<span className="font-mono">{publicBaseUrl || "https://..."}/api/integrations/telegram</span>
</p>
</div>
{/* Step 2: Connection Mode */}
{hasTokenConfigured && (
<section className="rounded-lg border bg-card p-4 space-y-4">
<div className="space-y-1">
<h3 className="text-lg font-medium">2. Connection Mode</h3>
<p className="text-sm text-muted-foreground">
Choose how Telegram connects to your bot.
</p>
</div>
<div className="flex flex-wrap items-center gap-2">
<Button onClick={connectTelegram} disabled={!canConnect || isBusy}>
{connectState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Connecting...
</>
) : (
<>
<Link2 className="size-4" />
Connect Telegram
</>
)}
</Button>
</div>
</>
) : (
<>
<div className="rounded-md border bg-muted/20 p-3 text-sm space-y-1">
<div>
Token source: {sourceLabel(tokenSource)}
{storedMaskedToken ? ` (${storedMaskedToken})` : ""}
<div className="space-y-4">
<div className="flex items-center gap-4">
<div className="flex-1">
<Label htmlFor="telegram-mode" className="text-sm">Mode</Label>
<select
id="telegram-mode"
value={mode}
onChange={(e) => saveMode(e.target.value as TelegramMode)}
disabled={isBusy}
className="mt-1 w-full rounded-md border border-input bg-background px-3 py-2 text-sm"
>
<option value="auto">Auto (recommended)</option>
<option value="webhook">Webhook</option>
<option value="polling">Long Polling</option>
</select>
</div>
<div className="flex-1">
<Label className="text-sm">Active Mode</Label>
<div className="mt-1 flex items-center gap-2 text-sm">
{effectiveMode === "webhook" ? (
<>
<Globe className="size-4 text-blue-500" />
<span>Webhook</span>
</>
) : (
<>
<Radio className="size-4 text-green-500" />
<span>Long Polling</span>
</>
)}
</div>
</div>
</div>
{mode === "auto" && (
<div className="rounded-md border bg-muted/20 p-3 text-sm">
<p className="text-muted-foreground">
<strong>Auto mode:</strong>{" "}
{detectedMode === "webhook"
? "Webhook will be used when a public HTTPS URL is configured."
: "Long polling is active. Add a public HTTPS URL to switch to webhook."}
</p>
</div>
)}
{/* Webhook URL Input - only show when webhook mode is active */}
{effectiveMode === "webhook" && (
<div className="space-y-2 rounded-md border bg-muted/20 p-3">
<Label htmlFor="telegram-public-base-url">Public Base URL (HTTPS required)</Label>
<Input
id="telegram-public-base-url"
type="text"
value={publicBaseUrl}
onChange={(e) => {
const newUrl = e.target.value;
setPublicBaseUrl(newUrl);
const detected = detectUrlMode(newUrl);
setDetectedMode(detected);
}}
placeholder="https://your-public-host.example.com"
disabled={isBusy}
/>
<p className="text-xs text-muted-foreground">
Webhook endpoint:{" "}
<span className="font-mono">{publicBaseUrl || "https://..."}/api/integrations/telegram</span>
</p>
<div className="flex flex-wrap items-center gap-2 pt-2">
<Button
onClick={connectTelegram}
disabled={!publicBaseUrl.trim() || isBusy}
size="sm"
>
{connectState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Connecting...
</>
) : (
<>
<Link2 className="size-4" />
Setup Webhook
</>
)}
</Button>
</div>
</div>
)}
{/* Polling Controls - only show when polling mode is active */}
{effectiveMode === "polling" && (
<div className="space-y-2 rounded-md border bg-muted/20 p-3">
<div className="flex items-center justify-between">
<div>
<p className="font-medium text-sm">Long Polling</p>
<p className="text-xs text-muted-foreground">
Bot will receive messages via long polling (no HTTPS required).
</p>
</div>
{!pollingStatus?.polling?.isRunning ? (
<Button
variant="outline"
onClick={startPolling}
disabled={isBusy}
size="sm"
>
{pollingState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Starting...
</>
) : (
<>
<Play className="size-4" />
Start Polling
</>
)}
</Button>
) : (
<Button
variant="outline"
onClick={stopPolling}
disabled={isBusy}
size="sm"
>
{pollingState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Stopping...
</>
) : (
<>
<Square className="size-4" />
Stop Polling
</>
)}
</Button>
)}
</div>
{pollingStatus?.polling && (
<div className="text-sm space-y-1 pt-2 border-t">
<div className="flex items-center gap-2">
Status:{" "}
{pollingStatus.polling.isRunning ? (
<span className="flex items-center gap-1 text-green-600">
<span className="relative flex h-2 w-2">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full bg-green-400 opacity-75"></span>
<span className="relative inline-flex rounded-full h-2 w-2 bg-green-500"></span>
</span>
Running
</span>
) : (
<span className="text-gray-500">Stopped</span>
)}
</div>
{pollingStatus.polling.lastUpdateId !== null && (
<div className="text-xs text-muted-foreground">
Last update ID: {pollingStatus.polling.lastUpdateId}
</div>
)}
</div>
)}
</div>
)}
</div>
</section>
)}
{/* Connected Status */}
{hasTokenConfigured && (
<section className="rounded-lg border bg-card p-4 space-y-4">
<div className="space-y-1">
<h4 className="font-medium">Connection Status</h4>
</div>
<div className="rounded-md border bg-muted/20 p-3 text-sm space-y-1">
<div>
Token: {sourceLabel(tokenSource)}
{storedMaskedToken ? ` (${storedMaskedToken})` : ""}
</div>
{publicBaseUrl && (
<div>
Public Base URL:{" "}
<span className="font-mono text-xs break-all">{publicBaseUrl}</span>
</div>
{updatedAtLabel && (
<div className="text-xs text-muted-foreground">Updated: {updatedAtLabel}</div>
)}
)}
<div>
Mode: <span className="font-medium">{effectiveMode === "webhook" ? "Webhook" : "Long Polling"}</span>
</div>
{updatedAtLabel && (
<div className="text-xs text-muted-foreground">Updated: {updatedAtLabel}</div>
)}
</div>
<div className="flex flex-wrap items-center gap-2">
<Button
variant="outline"
onClick={reconnectTelegram}
disabled={isBusy}
>
{reconnectState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Reconnecting...
</>
) : (
<>
<RotateCcw className="size-4" />
Reconnect Telegram
</>
)}
</Button>
<Button
variant="outline"
onClick={disconnectTelegram}
disabled={isBusy}
>
{disconnectState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Disconnecting...
</>
) : (
<>
<Trash2 className="size-4" />
Disconnect Telegram
</>
)}
</Button>
</div>
</>
)}
</section>
<div className="flex flex-wrap items-center gap-2">
<Button
variant="outline"
onClick={disconnectTelegram}
disabled={isBusy}
>
{disconnectState === "loading" ? (
<>
<Loader2 className="size-4 animate-spin" />
Disconnecting...
</>
) : (
<>
<Trash2 className="size-4" />
Disconnect
</>
)}
</Button>
</div>
</section>
)}
<section className="rounded-lg border bg-card p-4 space-y-4">
<div className="space-y-1">
@@ -518,50 +766,6 @@ export function TelegramIntegrationManager() {
</div>
</section>
{isConnected && (
<section className="rounded-lg border bg-card p-4 space-y-4">
<div className="space-y-1">
<h4 className="font-medium">Webhook Status</h4>
<p className="text-sm text-muted-foreground">
Current webhook status from the latest check.
</p>
</div>
{webhookState === "loading" && (
<p className="text-sm text-muted-foreground">Loading webhook status...</p>
)}
{webhookStatus?.webhook && (
<div className="rounded-md border bg-muted/20 p-3 text-sm space-y-1">
<div>
URL:{" "}
<span className="font-mono text-xs break-all">
{webhookStatus.webhook.url || "(empty)"}
</span>
</div>
<div>Pending updates: {webhookStatus.webhook.pendingUpdateCount}</div>
{webhookStatus.webhook.lastErrorMessage && (
<div className="text-red-600">
Last error: {webhookStatus.webhook.lastErrorMessage}
</div>
)}
{webhookStatus.webhook.lastErrorDate && (
<div className="text-xs text-muted-foreground">
Last error at:{" "}
{new Date(webhookStatus.webhook.lastErrorDate * 1000).toLocaleString()}
</div>
)}
</div>
)}
{webhookState !== "loading" && !webhookStatus?.webhook && (
<p className="text-sm text-muted-foreground">
{webhookStatus?.message || "Webhook status is not loaded yet."}
</p>
)}
</section>
)}
{success && <p className="text-sm text-emerald-600">{success}</p>}
{error && <p className="text-sm text-red-600">{error}</p>}
</div>

View File

@@ -10,6 +10,7 @@ const TELEGRAM_SETTINGS_FILE = path.join(
);
export type TelegramConfigSource = "stored" | "env" | "none";
export type TelegramMode = "auto" | "webhook" | "polling";
interface TelegramAccessCodeRecord {
hash: string;
@@ -24,6 +25,8 @@ interface TelegramIntegrationFileRecord {
defaultProjectId?: string;
allowedUserIds?: unknown;
accessCodes?: unknown;
mode?: unknown;
pollingInterval?: unknown;
createdAt?: string;
updatedAt?: string;
}
@@ -35,6 +38,8 @@ export interface TelegramIntegrationStoredSettings {
defaultProjectId: string;
allowedUserIds: string[];
accessCodes: TelegramAccessCodeRecord[];
mode: TelegramMode;
pollingInterval: number;
createdAt: string;
updatedAt: string;
}
@@ -45,9 +50,13 @@ export interface TelegramIntegrationRuntimeConfig {
publicBaseUrl: string;
defaultProjectId: string;
allowedUserIds: string[];
mode: TelegramMode;
pollingInterval: number;
detectedMode: TelegramMode;
sources: {
botToken: TelegramConfigSource;
webhookSecret: TelegramConfigSource;
mode: TelegramConfigSource;
};
}
@@ -221,6 +230,43 @@ async function readStoredRecord(): Promise<TelegramIntegrationFileRecord> {
}
}
function normalizeMode(raw: unknown): TelegramMode {
if (raw === "webhook" || raw === "polling") return raw;
return "auto";
}
function normalizePollingInterval(raw: unknown): number {
const numeric = typeof raw === "number" && Number.isFinite(raw) ? raw : 5000;
return Math.max(1000, Math.min(60000, numeric));
}
function isLocalhostUrl(url: string): boolean {
if (!url) return true;
try {
const parsed = new URL(url);
const hostname = parsed.hostname.toLowerCase();
return (
hostname === "localhost" ||
hostname === "127.0.0.1" ||
hostname.startsWith("192.168.") ||
hostname.startsWith("10.") ||
hostname.startsWith("172.") ||
hostname.endsWith(".local")
);
} catch {
return true;
}
}
export function detectTelegramMode(config: {
mode: TelegramMode;
publicBaseUrl: string;
}): "webhook" | "polling" {
if (config.mode !== "auto") return config.mode;
if (isLocalhostUrl(config.publicBaseUrl)) return "polling";
return "webhook";
}
function normalizeStoredRecord(
record: TelegramIntegrationFileRecord
): TelegramIntegrationStoredSettings {
@@ -231,6 +277,8 @@ function normalizeStoredRecord(
defaultProjectId: trimString(record.defaultProjectId),
allowedUserIds: normalizeAllowedUserIds(record.allowedUserIds),
accessCodes: normalizeAccessCodeRecords(record.accessCodes),
mode: normalizeMode(record.mode),
pollingInterval: normalizePollingInterval(record.pollingInterval),
createdAt: trimString(record.createdAt),
updatedAt: trimString(record.updatedAt),
};
@@ -259,6 +307,8 @@ export async function saveTelegramIntegrationStoredSettings(input: {
defaultProjectId?: string;
allowedUserIds?: string[];
accessCodes?: TelegramAccessCodeRecord[];
mode?: TelegramMode;
pollingInterval?: number;
}): Promise<TelegramIntegrationStoredSettings> {
const current = await getTelegramIntegrationStoredSettings();
@@ -284,6 +334,12 @@ export async function saveTelegramIntegrationStoredSettings(input: {
input.accessCodes !== undefined
? normalizeAccessCodeRecords(input.accessCodes)
: current.accessCodes;
const nextMode =
input.mode !== undefined ? normalizeMode(input.mode) : current.mode;
const nextPollingInterval =
input.pollingInterval !== undefined
? normalizePollingInterval(input.pollingInterval)
: current.pollingInterval;
const now = new Date().toISOString();
const next: TelegramIntegrationStoredSettings = {
@@ -293,6 +349,8 @@ export async function saveTelegramIntegrationStoredSettings(input: {
defaultProjectId: nextDefaultProjectId,
allowedUserIds: nextAllowedUserIds,
accessCodes: nextAccessCodes,
mode: nextMode,
pollingInterval: nextPollingInterval,
createdAt: current.createdAt || now,
updatedAt: now,
};
@@ -310,6 +368,7 @@ export async function getTelegramIntegrationRuntimeConfig(): Promise<TelegramInt
const envAllowedUserIds = parseAllowedUserIdsFromEnv(
trimString(process.env.TELEGRAM_ALLOWED_USER_IDS)
);
const envMode = normalizeMode(process.env.TELEGRAM_MODE);
const botToken = stored.botToken || envBotToken;
const webhookSecret = stored.webhookSecret || envWebhookSecret;
@@ -319,6 +378,8 @@ export async function getTelegramIntegrationRuntimeConfig(): Promise<TelegramInt
stored.allowedUserIds,
envAllowedUserIds
);
const mode = stored.mode !== "auto" ? stored.mode : envMode !== "auto" ? envMode : "auto";
const pollingInterval = stored.pollingInterval || 5000;
const botTokenSource: TelegramConfigSource = stored.botToken
? "stored"
@@ -330,6 +391,13 @@ export async function getTelegramIntegrationRuntimeConfig(): Promise<TelegramInt
: envWebhookSecret
? "env"
: "none";
const modeSource: TelegramConfigSource = stored.mode !== "auto"
? "stored"
: envMode !== "auto"
? "env"
: "none";
const detectedMode = detectTelegramMode({ mode, publicBaseUrl });
return {
botToken,
@@ -337,9 +405,13 @@ export async function getTelegramIntegrationRuntimeConfig(): Promise<TelegramInt
publicBaseUrl,
defaultProjectId,
allowedUserIds,
mode,
pollingInterval,
detectedMode,
sources: {
botToken: botTokenSource,
webhookSecret: webhookSecretSource,
mode: modeSource,
},
};
}
@@ -350,11 +422,15 @@ export async function getTelegramIntegrationPublicSettings(): Promise<{
publicBaseUrl: string;
defaultProjectId: string;
allowedUserIds: string[];
mode: TelegramMode;
pollingInterval: number;
detectedMode: TelegramMode;
pendingAccessCodes: number;
updatedAt: string | null;
sources: {
botToken: TelegramConfigSource;
webhookSecret: TelegramConfigSource;
mode: TelegramConfigSource;
};
}> {
const stored = await getTelegramIntegrationStoredSettings();
@@ -365,6 +441,9 @@ export async function getTelegramIntegrationPublicSettings(): Promise<{
publicBaseUrl: runtime.publicBaseUrl,
defaultProjectId: runtime.defaultProjectId,
allowedUserIds: runtime.allowedUserIds,
mode: runtime.mode,
pollingInterval: runtime.pollingInterval,
detectedMode: runtime.detectedMode,
pendingAccessCodes: stored.accessCodes.length,
updatedAt: stored.updatedAt || null,
sources: runtime.sources,
@@ -377,6 +456,8 @@ export async function saveTelegramIntegrationFromPublicInput(input: {
publicBaseUrl?: unknown;
defaultProjectId?: unknown;
allowedUserIds?: unknown;
mode?: unknown;
pollingInterval?: unknown;
}): Promise<void> {
const currentStored = await getTelegramIntegrationStoredSettings();
@@ -407,6 +488,14 @@ export async function saveTelegramIntegrationFromPublicInput(input: {
? input.defaultProjectId
: undefined;
const allowedUserIds = parseAllowedUserIdsInput(input.allowedUserIds);
const mode =
typeof input.mode === "string"
? normalizeMode(input.mode)
: undefined;
const pollingInterval =
typeof input.pollingInterval === "number"
? normalizePollingInterval(input.pollingInterval)
: undefined;
await saveTelegramIntegrationStoredSettings({
botToken,
@@ -414,6 +503,8 @@ export async function saveTelegramIntegrationFromPublicInput(input: {
publicBaseUrl,
defaultProjectId,
allowedUserIds,
mode,
pollingInterval,
});
}

View File

@@ -0,0 +1,149 @@
import {
getTelegramIntegrationRuntimeConfig,
detectTelegramMode,
type TelegramIntegrationRuntimeConfig,
} from "@/lib/storage/telegram-integration-store";
import { telegramPollingService } from "@/lib/telegram/polling-service";
let lifecycleInitialized = false;
interface TelegramLifecycleOptions {
autoStartPolling?: boolean;
autoSetupWebhook?: boolean;
}
export async function initTelegramLifecycle(
options: TelegramLifecycleOptions = {}
): Promise<void> {
if (lifecycleInitialized) {
return;
}
const runtime = await getTelegramIntegrationRuntimeConfig();
const detectedMode = detectTelegramMode(runtime);
console.log(`[Telegram Lifecycle] Mode: ${runtime.mode}, Detected: ${detectedMode}`);
if (detectedMode === "polling") {
if (options.autoStartPolling !== false && runtime.botToken.trim()) {
try {
await telegramPollingService.start(runtime);
console.log("[Telegram Lifecycle] Polling started automatically");
} catch (error) {
console.error("[Telegram Lifecycle] Failed to start polling:", error);
}
}
} else if (detectedMode === "webhook") {
if (options.autoSetupWebhook !== false && runtime.botToken.trim() && runtime.publicBaseUrl.trim()) {
try {
await setupTelegramWebhook(runtime);
console.log("[Telegram Lifecycle] Webhook configured");
} catch (error) {
console.error("[Telegram Lifecycle] Failed to setup webhook:", error);
}
}
}
setupGracefulShutdown();
lifecycleInitialized = true;
}
async function setupTelegramWebhook(
runtime: TelegramIntegrationRuntimeConfig
): Promise<void> {
const { botToken, publicBaseUrl, webhookSecret } = runtime;
if (!botToken.trim() || !publicBaseUrl.trim()) {
throw new Error("Bot token and public base URL are required");
}
const webhookUrl = `${publicBaseUrl.replace(/\/$/, "")}/api/integrations/telegram`;
const response = await fetch(
`https://api.telegram.org/bot${botToken}/setWebhook`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
url: webhookUrl,
secret_token: webhookSecret.trim() || undefined,
allowed_updates: ["message"],
}),
}
);
const payload = (await response.json().catch(() => null)) as
| { ok?: boolean; description?: string }
| null;
if (!response.ok || !payload?.ok) {
throw new Error(
`Failed to set webhook: ${payload?.description || response.statusText}`
);
}
}
export async function migrateToWebhook(
runtime: TelegramIntegrationRuntimeConfig
): Promise<void> {
// Stop polling if running
if (telegramPollingService.status.isRunning) {
telegramPollingService.stop();
console.log("[Telegram Migration] Polling stopped");
}
// Setup webhook
await setupTelegramWebhook(runtime);
console.log("[Telegram Migration] Migrated to webhook mode");
}
export async function migrateToPolling(
runtime: TelegramIntegrationRuntimeConfig
): Promise<void> {
// Delete webhook
await deleteTelegramWebhook(runtime.botToken);
// Start polling
if (!telegramPollingService.status.isRunning) {
await telegramPollingService.start(runtime);
console.log("[Telegram Migration] Migrated to polling mode");
}
}
async function deleteTelegramWebhook(botToken: string): Promise<void> {
const response = await fetch(
`https://api.telegram.org/bot${botToken}/deleteWebhook`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ drop_pending_updates: false }),
}
);
const payload = (await response.json().catch(() => null)) as
| { ok?: boolean; description?: string }
| null;
if (payload?.ok) {
console.log("[Telegram Lifecycle] Webhook deleted");
} else {
console.warn("[Telegram Lifecycle] Failed to delete webhook:", payload?.description);
}
}
function setupGracefulShutdown(): void {
const shutdown = () => {
console.log("[Telegram Lifecycle] Shutting down...");
if (telegramPollingService.status.isRunning) {
telegramPollingService.stop();
}
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
}
export function isLifecycleInitialized(): boolean {
return lifecycleInitialized;
}

View File

@@ -0,0 +1,216 @@
import {
type TelegramIntegrationRuntimeConfig,
} from "@/lib/storage/telegram-integration-store";
import {
processTelegramUpdate,
type TelegramUpdate,
} from "@/lib/telegram/telegram-message-handler";
interface TelegramApiResponse {
ok?: boolean;
description?: string;
result?: Record<string, unknown> | Array<Record<string, unknown>>;
}
export interface PollingStatus {
isRunning: boolean;
lastUpdateId: number | null;
lastPollTime: string | null;
errorCount: number;
consecutiveErrors: number;
}
class TelegramPollingService {
private isRunning = false;
private abortController: AbortController | null = null;
private lastUpdateId: number | null = null;
private errorCount = 0;
private consecutiveErrors = 0;
private lastPollTime: string | null = null;
private runtimeConfig: TelegramIntegrationRuntimeConfig | null = null;
private pollTimeout: NodeJS.Timeout | null = null;
get status(): PollingStatus {
return {
isRunning: this.isRunning,
lastUpdateId: this.lastUpdateId,
lastPollTime: this.lastPollTime,
errorCount: this.errorCount,
consecutiveErrors: this.consecutiveErrors,
};
}
async start(runtimeConfig: TelegramIntegrationRuntimeConfig): Promise<void> {
if (this.isRunning) {
throw new Error("Polling is already running");
}
if (!runtimeConfig.botToken.trim()) {
throw new Error("Bot token is required");
}
this.runtimeConfig = runtimeConfig;
this.isRunning = true;
this.abortController = new AbortController();
this.consecutiveErrors = 0;
console.log("[Telegram Polling] Starting polling service...");
// Delete webhook if exists to ensure polling works
await this.deleteWebhook(runtimeConfig.botToken);
// Start first poll immediately
this.scheduleNextPoll(0);
}
stop(): void {
if (!this.isRunning) {
return;
}
console.log("[Telegram Polling] Stopping polling service...");
this.isRunning = false;
if (this.pollTimeout) {
clearTimeout(this.pollTimeout);
this.pollTimeout = null;
}
if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}
this.runtimeConfig = null;
}
private scheduleNextPoll(delay?: number): void {
if (!this.isRunning) {
return;
}
const actualDelay = delay ?? this.runtimeConfig?.pollingInterval ?? 5000;
this.pollTimeout = setTimeout(() => {
this.poll();
}, actualDelay);
}
private async poll(): Promise<void> {
if (!this.isRunning || !this.runtimeConfig) {
return;
}
const { botToken } = this.runtimeConfig;
try {
const updates = await this.getUpdates(botToken);
this.consecutiveErrors = 0;
this.lastPollTime = new Date().toISOString();
for (const update of updates) {
if (!this.isRunning) break;
await this.processUpdate(update);
}
} catch (error) {
this.errorCount++;
this.consecutiveErrors++;
const errorMessage = error instanceof Error ? error.message : String(error);
console.error(`[Telegram Polling] Error (consecutive: ${this.consecutiveErrors}):`, errorMessage);
if (this.consecutiveErrors >= 10) {
console.error("[Telegram Polling] Too many consecutive errors, stopping polling");
this.stop();
return;
}
}
this.scheduleNextPoll();
}
private async getUpdates(botToken: string): Promise<TelegramUpdate[]> {
const params: Record<string, unknown> = {
limit: 100,
timeout: 30,
};
if (this.lastUpdateId !== null) {
params.offset = this.lastUpdateId + 1;
}
const response = await fetch(
`https://api.telegram.org/bot${botToken}/getUpdates`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(params),
signal: this.abortController?.signal,
}
);
const payload = (await response.json().catch(() => null)) as
| TelegramApiResponse
| null;
if (!response.ok || !payload?.ok) {
const description = payload?.description || "Unknown error";
throw new Error(`getUpdates failed (${response.status}): ${description}`);
}
const result = payload.result;
if (!Array.isArray(result)) {
return [];
}
// Update lastUpdateId to the highest received
for (const update of result) {
const updateId = typeof update.update_id === "number" ? update.update_id : null;
if (updateId !== null && (this.lastUpdateId === null || updateId > this.lastUpdateId)) {
this.lastUpdateId = updateId;
}
}
return result as TelegramUpdate[];
}
private async processUpdate(update: TelegramUpdate): Promise<void> {
if (!this.runtimeConfig) return;
try {
await processTelegramUpdate(update, this.runtimeConfig);
} catch (error) {
console.error("[Telegram Polling] Error processing update:", error);
// Don't throw - continue processing other updates
}
}
private async deleteWebhook(botToken: string): Promise<void> {
try {
const response = await fetch(
`https://api.telegram.org/bot${botToken}/deleteWebhook`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ drop_pending_updates: true }),
}
);
const payload = (await response.json().catch(() => null)) as
| TelegramApiResponse
| null;
if (payload?.ok) {
console.log("[Telegram Polling] Webhook deleted successfully");
} else {
console.warn("[Telegram Polling] Failed to delete webhook:", payload?.description);
}
} catch (error) {
console.warn("[Telegram Polling] Error deleting webhook:", error);
}
}
}
export const telegramPollingService = new TelegramPollingService();

View File

@@ -0,0 +1,711 @@
import {
handleExternalMessage,
ExternalMessageError,
} from "@/lib/external/handle-external-message";
import {
createDefaultTelegramSessionId,
createFreshTelegramSessionId,
getTelegramChatSessionId,
setTelegramChatSessionId,
} from "@/lib/storage/telegram-session-store";
import {
claimTelegramUpdate,
releaseTelegramUpdate,
} from "@/lib/storage/telegram-update-store";
import {
consumeTelegramAccessCode,
normalizeTelegramUserId,
type TelegramIntegrationRuntimeConfig,
} from "@/lib/storage/telegram-integration-store";
import { saveChatFile } from "@/lib/storage/chat-files-store";
import { createChat, getChat } from "@/lib/storage/chat-store";
import {
contextKey,
type ExternalSession,
getOrCreateExternalSession,
saveExternalSession,
} from "@/lib/storage/external-session-store";
import { getAllProjects } from "@/lib/storage/project-store";
import crypto from "node:crypto";
const TELEGRAM_TEXT_LIMIT = 4096;
const TELEGRAM_FILE_MAX_BYTES = 30 * 1024 * 1024;
export interface TelegramUpdate {
update_id?: unknown;
message?: TelegramMessage;
}
export interface TelegramMessage {
message_id?: unknown;
text?: unknown;
caption?: unknown;
from?: {
id?: unknown;
};
chat?: {
id?: unknown;
type?: unknown;
};
document?: {
file_id?: unknown;
file_name?: unknown;
mime_type?: unknown;
};
photo?: Array<{
file_id?: unknown;
width?: unknown;
height?: unknown;
}>;
audio?: {
file_id?: unknown;
file_name?: unknown;
mime_type?: unknown;
};
video?: {
file_id?: unknown;
file_name?: unknown;
mime_type?: unknown;
};
voice?: {
file_id?: unknown;
mime_type?: unknown;
};
}
interface TelegramApiResponse {
ok?: boolean;
description?: string;
result?: Record<string, unknown>;
}
interface TelegramFileResult {
file_id?: string;
file_unique_id?: string;
file_size?: number;
file_path?: string;
}
export interface TelegramIncomingFile {
fileId: string;
fileName: string;
}
export interface TelegramExternalChatContext {
chatId: string;
projectId?: string;
currentPath: string;
}
interface TelegramResolvedProjectContext {
session: ExternalSession;
resolvedProjectId?: string;
projectName?: string;
}
export interface ProcessTelegramUpdateResult {
ok: boolean;
duplicate?: boolean;
ignored?: boolean;
reason?: string;
command?: string;
accessGranted?: boolean;
userId?: string;
fileSaved?: boolean;
file?: {
name: string;
path: string;
size: number;
};
handledError?: boolean;
status?: number;
}
function normalizeTelegramCurrentPath(rawPath: string | undefined): string {
const value = (rawPath ?? "").trim();
if (!value || value === "/telegram") {
return "";
}
return value;
}
function parseTelegramError(status: number, payload: TelegramApiResponse | null): string {
const description = payload?.description?.trim();
return description
? `Telegram API error (${status}): ${description}`
: `Telegram API error (${status})`;
}
async function callTelegramApi(
botToken: string,
method: string,
body?: Record<string, unknown>
): Promise<TelegramApiResponse> {
const response = await fetch(`https://api.telegram.org/bot${botToken}/${method}`, {
method: body ? "POST" : "GET",
headers: body ? { "Content-Type": "application/json" } : undefined,
body: body ? JSON.stringify(body) : undefined,
});
const payload = (await response.json().catch(() => null)) as
| TelegramApiResponse
| null;
if (!response.ok || !payload?.ok) {
throw new Error(parseTelegramError(response.status, payload));
}
return payload;
}
function getBotId(botToken: string): string {
const [rawBotId] = botToken.trim().split(":", 1);
const botId = rawBotId?.trim() || "default";
return botId.replace(/[^a-zA-Z0-9._:-]/g, "_").slice(0, 128) || "default";
}
function chatBelongsToProject(
chatProjectId: string | undefined,
projectId: string | undefined
): boolean {
const left = chatProjectId ?? null;
const right = projectId ?? null;
return left === right;
}
async function ensureTelegramExternalChatContext(params: {
sessionId: string;
defaultProjectId?: string;
}): Promise<TelegramExternalChatContext> {
const { session, resolvedProjectId } = await resolveTelegramProjectContext({
sessionId: params.sessionId,
defaultProjectId: params.defaultProjectId,
});
const projectKey = contextKey(resolvedProjectId);
let resolvedChatId = session.activeChats[projectKey];
if (resolvedChatId) {
const existing = await getChat(resolvedChatId);
if (!existing || !chatBelongsToProject(existing.projectId, resolvedProjectId)) {
resolvedChatId = "";
}
}
if (!resolvedChatId) {
resolvedChatId = crypto.randomUUID();
await createChat(
resolvedChatId,
`External session ${session.id}`,
resolvedProjectId
);
}
session.activeChats[projectKey] = resolvedChatId;
session.currentPaths[projectKey] = normalizeTelegramCurrentPath(
session.currentPaths[projectKey]
);
session.updatedAt = new Date().toISOString();
await saveExternalSession(session);
return {
chatId: resolvedChatId,
projectId: resolvedProjectId,
currentPath: session.currentPaths[projectKey] ?? "",
};
}
async function resolveTelegramProjectContext(params: {
sessionId: string;
defaultProjectId?: string;
}): Promise<TelegramResolvedProjectContext> {
const session = await getOrCreateExternalSession(params.sessionId);
const projects = await getAllProjects();
const projectById = new Map(projects.map((project) => [project.id, project]));
let resolvedProjectId: string | undefined;
const explicitProjectId = params.defaultProjectId?.trim() || "";
if (explicitProjectId) {
if (!projectById.has(explicitProjectId)) {
throw new Error(`Project "${explicitProjectId}" not found`);
}
resolvedProjectId = explicitProjectId;
session.activeProjectId = explicitProjectId;
} else if (session.activeProjectId && projectById.has(session.activeProjectId)) {
resolvedProjectId = session.activeProjectId;
} else if (projects.length > 0) {
resolvedProjectId = projects[0].id;
session.activeProjectId = projects[0].id;
} else {
session.activeProjectId = null;
}
return {
session,
resolvedProjectId,
projectName: resolvedProjectId ? projectById.get(resolvedProjectId)?.name : undefined,
};
}
function extensionFromMime(mimeType: string): string {
const lower = mimeType.toLowerCase();
if (lower.includes("pdf")) return ".pdf";
if (lower.includes("png")) return ".png";
if (lower.includes("jpeg") || lower.includes("jpg")) return ".jpg";
if (lower.includes("webp")) return ".webp";
if (lower.includes("gif")) return ".gif";
if (lower.includes("mp4")) return ".mp4";
if (lower.includes("mpeg") || lower.includes("mp3")) return ".mp3";
if (lower.includes("ogg")) return ".ogg";
if (lower.includes("wav")) return ".wav";
if (lower.includes("plain")) return ".txt";
return "";
}
function buildIncomingFileName(params: {
base: string;
messageId?: number;
mimeType?: string;
}): string {
const suffix = params.messageId ?? Date.now();
const ext = params.mimeType ? extensionFromMime(params.mimeType) : "";
return `${params.base}-${suffix}${ext}`;
}
function sanitizeFileName(value: string): string {
const base = value.trim().replace(/[\\/]+/g, "_");
return base || `file-${Date.now()}`;
}
function withMessageIdPrefix(fileName: string, messageId?: number): string {
if (typeof messageId !== "number") return fileName;
return `${messageId}-${fileName}`;
}
export function extractIncomingFile(
message: TelegramMessage,
messageId?: number
): TelegramIncomingFile | null {
const documentFileId =
typeof message.document?.file_id === "string"
? message.document.file_id.trim()
: "";
if (documentFileId) {
const docNameRaw =
typeof message.document?.file_name === "string"
? message.document.file_name
: "";
const fallback = buildIncomingFileName({
base: "document",
messageId,
mimeType:
typeof message.document?.mime_type === "string"
? message.document.mime_type
: undefined,
});
return {
fileId: documentFileId,
fileName: withMessageIdPrefix(sanitizeFileName(docNameRaw || fallback), messageId),
};
}
const photos: Array<{ file_id?: unknown }> = Array.isArray(message.photo)
? message.photo
: [];
for (let i = photos.length - 1; i >= 0; i -= 1) {
const photo = photos[i];
const fileId = typeof photo?.file_id === "string" ? photo.file_id.trim() : "";
if (fileId) {
return {
fileId,
fileName: sanitizeFileName(
buildIncomingFileName({ base: "photo", messageId, mimeType: "image/jpeg" })
),
};
}
}
const audioFileId =
typeof message.audio?.file_id === "string" ? message.audio.file_id.trim() : "";
if (audioFileId) {
const audioNameRaw =
typeof message.audio?.file_name === "string" ? message.audio.file_name : "";
const fallback = buildIncomingFileName({
base: "audio",
messageId,
mimeType:
typeof message.audio?.mime_type === "string"
? message.audio.mime_type
: undefined,
});
return {
fileId: audioFileId,
fileName: withMessageIdPrefix(sanitizeFileName(audioNameRaw || fallback), messageId),
};
}
const videoFileId =
typeof message.video?.file_id === "string" ? message.video.file_id.trim() : "";
if (videoFileId) {
const videoNameRaw =
typeof message.video?.file_name === "string" ? message.video.file_name : "";
const fallback = buildIncomingFileName({
base: "video",
messageId,
mimeType:
typeof message.video?.mime_type === "string"
? message.video.mime_type
: undefined,
});
return {
fileId: videoFileId,
fileName: withMessageIdPrefix(sanitizeFileName(videoNameRaw || fallback), messageId),
};
}
const voiceFileId =
typeof message.voice?.file_id === "string" ? message.voice.file_id.trim() : "";
if (voiceFileId) {
return {
fileId: voiceFileId,
fileName: sanitizeFileName(
buildIncomingFileName({
base: "voice",
messageId,
mimeType:
typeof message.voice?.mime_type === "string"
? message.voice.mime_type
: undefined,
})
),
};
}
return null;
}
export async function downloadTelegramFile(botToken: string, fileId: string): Promise<Buffer> {
const payload = await callTelegramApi(botToken, "getFile", {
file_id: fileId,
});
const result = payload.result as TelegramFileResult | undefined;
const filePath = result?.file_path ?? "";
if (!filePath) {
throw new Error("Telegram getFile returned empty file_path");
}
const fileUrl = `https://api.telegram.org/file/bot${botToken}/${filePath}`;
const response = await fetch(fileUrl);
if (!response.ok) {
throw new Error(`Failed to download Telegram file (${response.status})`);
}
const bytes = await response.arrayBuffer();
if (bytes.byteLength > TELEGRAM_FILE_MAX_BYTES) {
throw new Error(
`Telegram file is too large (${bytes.byteLength} bytes). Max supported size is ${TELEGRAM_FILE_MAX_BYTES} bytes.`
);
}
return Buffer.from(bytes);
}
function extractCommand(text: string): string | null {
const first = text.trim().split(/\s+/, 1)[0];
if (!first || !first.startsWith("/")) return null;
return first.split("@", 1)[0].toLowerCase();
}
function extractAccessCodeCandidate(text: string): string | null {
const value = text.trim();
if (!value) return null;
const fromCommand = value.match(
/^\/(?:code|start)(?:@[a-zA-Z0-9_]+)?\s+([A-Za-z0-9_-]{6,64})$/i
);
if (fromCommand?.[1]) {
return fromCommand[1];
}
if (/^[A-Za-z0-9_-]{6,64}$/.test(value)) {
return value;
}
return null;
}
function normalizeOutgoingText(text: string): string {
const value = text.trim();
if (!value) return "Пустой ответ от агента.";
if (value.length <= TELEGRAM_TEXT_LIMIT) return value;
return `${value.slice(0, TELEGRAM_TEXT_LIMIT - 1)}`;
}
export async function sendTelegramMessage(
botToken: string,
chatId: number | string,
text: string,
replyToMessageId?: number
): Promise<void> {
const response = await fetch(`https://api.telegram.org/bot${botToken}/sendMessage`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
chat_id: chatId,
text: normalizeOutgoingText(text),
...(typeof replyToMessageId === "number" ? { reply_to_message_id: replyToMessageId } : {}),
}),
});
const payload = (await response.json().catch(() => null)) as
| { ok?: boolean; description?: string }
| null;
if (!response.ok || !payload?.ok) {
throw new Error(
`Telegram sendMessage failed (${response.status})${payload?.description ? `: ${payload.description}` : ""}`
);
}
}
function helpText(activeProject?: { id?: string; name?: string }): string {
const activeProjectLine = activeProject?.id
? `Active project: ${activeProject.name ? `${activeProject.name} (${activeProject.id})` : activeProject.id}`
: "Active project: not selected";
return [
"Telegram connection is active.",
activeProjectLine,
"",
"Commands:",
"/start - show this help",
"/help - show this help",
"/code <access_code> - activate access for your Telegram user",
"/new - start a new conversation (reset context)",
"",
"Text messages are sent to the agent.",
"File uploads are saved into chat files.",
"You can also ask the agent to send a local file back to Telegram.",
].join("\n");
}
export async function processTelegramUpdate(
update: TelegramUpdate,
runtime: TelegramIntegrationRuntimeConfig
): Promise<ProcessTelegramUpdateResult> {
const botToken = runtime.botToken.trim();
const defaultProjectId = runtime.defaultProjectId || undefined;
const allowedUserIds = new Set(runtime.allowedUserIds);
if (!botToken) {
throw new Error("Telegram bot token is not configured");
}
const updateId =
typeof update.update_id === "number" && Number.isInteger(update.update_id)
? update.update_id
: null;
if (updateId === null) {
throw new Error("Invalid update_id");
}
const botId = getBotId(botToken);
const isNewUpdate = await claimTelegramUpdate(botId, updateId);
if (!isNewUpdate) {
return { ok: true, duplicate: true };
}
try {
const message = update.message;
const chatId =
typeof message?.chat?.id === "number" || typeof message?.chat?.id === "string"
? message.chat.id
: null;
const chatType = typeof message?.chat?.type === "string" ? message.chat.type : "";
const messageId =
typeof message?.message_id === "number" ? message.message_id : undefined;
if (chatId === null || !chatType) {
return { ok: true, ignored: true, reason: "unsupported_update" };
}
if (chatType !== "private") {
return { ok: true, ignored: true, reason: "private_only" };
}
const text = typeof message?.text === "string" ? message.text.trim() : "";
const caption =
typeof message?.caption === "string" ? message.caption.trim() : "";
const incomingText = text || caption;
const fromUserId = normalizeTelegramUserId(message?.from?.id);
if (!fromUserId) {
return {
ok: true,
ignored: true,
reason: "missing_user_id",
};
}
if (!allowedUserIds.has(fromUserId)) {
const accessCode = extractAccessCodeCandidate(text);
const granted =
accessCode &&
(await consumeTelegramAccessCode({
code: accessCode,
userId: fromUserId,
}));
if (granted) {
await sendTelegramMessage(
botToken,
chatId,
"Доступ выдан. Теперь можно отправлять сообщения агенту.",
messageId
);
return {
ok: true,
accessGranted: true,
userId: fromUserId,
};
}
await sendTelegramMessage(
botToken,
chatId,
[
"Доступ запрещён: ваш user_id не в списке разрешённых.",
"Отправьте код активации командой /code <код> или /start <код>.",
`Ваш user_id: ${fromUserId}`,
].join("\n"),
messageId
);
return {
ok: true,
ignored: true,
reason: "user_not_allowed",
userId: fromUserId,
};
}
let sessionId = await getTelegramChatSessionId(botId, chatId);
if (!sessionId) {
sessionId = createDefaultTelegramSessionId(botId, chatId);
await setTelegramChatSessionId(botId, chatId, sessionId);
}
const command = extractCommand(text);
if (command === "/start" || command === "/help") {
const resolvedProject = await resolveTelegramProjectContext({
sessionId,
defaultProjectId,
});
await saveExternalSession({
...resolvedProject.session,
updatedAt: new Date().toISOString(),
});
await sendTelegramMessage(
botToken,
chatId,
helpText({
id: resolvedProject.resolvedProjectId,
name: resolvedProject.projectName,
}),
messageId
);
return { ok: true, command };
}
if (command === "/new") {
const freshSessionId = createFreshTelegramSessionId(botId, chatId);
await setTelegramChatSessionId(botId, chatId, freshSessionId);
await sendTelegramMessage(
botToken,
chatId,
"Начал новый диалог. Контекст очищен для следующего сообщения.",
messageId
);
return { ok: true, command };
}
let incomingSavedFile: {
name: string;
path: string;
size: number;
} | null = null;
const incomingFile = message ? extractIncomingFile(message, messageId) : null;
let externalContext: TelegramExternalChatContext | null = null;
if (incomingFile) {
externalContext = await ensureTelegramExternalChatContext({
sessionId,
defaultProjectId,
});
const fileBuffer = await downloadTelegramFile(botToken, incomingFile.fileId);
const saved = await saveChatFile(
externalContext.chatId,
fileBuffer,
incomingFile.fileName
);
incomingSavedFile = {
name: saved.name,
path: saved.path,
size: saved.size,
};
}
if (!incomingText) {
if (incomingSavedFile) {
await sendTelegramMessage(
botToken,
chatId,
`File "${incomingSavedFile.name}" saved to chat files.`,
messageId
);
return {
ok: true,
fileSaved: true,
file: incomingSavedFile,
};
}
await sendTelegramMessage(
botToken,
chatId,
"Only text messages and file uploads are supported right now.",
messageId
);
return { ok: true, ignored: true, reason: "non_text" };
}
try {
const result = await handleExternalMessage({
sessionId,
message: incomingSavedFile
? `${incomingText}\n\nAttached file: ${incomingSavedFile.name}`
: incomingText,
projectId: externalContext?.projectId ?? defaultProjectId,
chatId: externalContext?.chatId,
currentPath: normalizeTelegramCurrentPath(externalContext?.currentPath),
runtimeData: {
telegram: {
botToken,
chatId,
replyToMessageId: messageId ?? null,
},
},
});
await sendTelegramMessage(botToken, chatId, result.reply, messageId);
return { ok: true };
} catch (error) {
if (error instanceof ExternalMessageError) {
const errorMessage =
typeof error.payload.error === "string"
? error.payload.error
: "Не удалось обработать сообщение.";
await sendTelegramMessage(botToken, chatId, `Ошибка: ${errorMessage}`, messageId);
return { ok: true, handledError: true, status: error.status };
}
throw error;
}
} catch (error) {
await releaseTelegramUpdate(botId, updateId);
throw error;
}
}