From e8b8eca0d6d88236dbfcf728f199749db0df78a6 Mon Sep 17 00:00:00 2001 From: inmake Date: Fri, 5 Dec 2025 19:52:18 +0500 Subject: [PATCH] Refactor server session selection logic to incorporate maxApps limits and improve error handling - Enhanced selectAvailableServer function to consider maxApps when selecting available servers for both stream and local sessions. - Added dynamic import of serverService to avoid circular dependencies. - Improved error messages for unavailable servers based on session type. - Updated startApplication function to check session status before launching and handle session termination more gracefully. - Ensured proper session management by processing ending sessions before starting new ones. --- server/src/services/serverSession/index.ts | 106 ++++++++++++++++++--- session-server/src/index.ts | 54 ++++++++--- 2 files changed, 133 insertions(+), 27 deletions(-) diff --git a/server/src/services/serverSession/index.ts b/server/src/services/serverSession/index.ts index 863a8fc..259500e 100644 --- a/server/src/services/serverSession/index.ts +++ b/server/src/services/serverSession/index.ts @@ -226,19 +226,57 @@ export const serverSessionService = { }, /** - * Выбрать доступный сервер + * Выбрать доступный сервер с учетом maxApps */ async selectAvailableServer(mode: SessionMode): Promise { + // Импортируем serverService динамически чтобы избежать циклических зависимостей + const { serverService } = await import("../server"); + if (mode === "stream") { - const server = await db.query.servers.findFirst({ - where: and(eq(servers.type, "stream"), eq(servers.tier, "prod")), - }); - return server?.id; + // Для stream-сессий используем логику из assignServer + const tier = undefined; // Для selectAvailableServer используем prod по умолчанию + const availableServers = await serverService.findAvailableStreamServers(tier); + + if (availableServers.length === 0) { + return undefined; + } + + // Проверяем количество активных сессий на каждом сервере + for (const server of availableServers) { + const maxApps = server.maxApps ?? 1; + const activeSessions = await this.findByServerId(server.id, {}); + const activeCount = activeSessions.filter( + (s) => s.status === "starting" || s.status === "started" + ).length; + + if (activeCount < maxApps) { + return server.id; + } + } + + return undefined; } else { - const server = await db.query.servers.findFirst({ - where: eq(servers.type, "local"), - }); - return server?.id; + // Для local-сессий проверяем maxApps + const availableLocalServers = await serverService.findAvailableLocalServers(); + + if (availableLocalServers.length === 0) { + return undefined; + } + + // Проверяем количество активных сессий на каждом сервере + for (const server of availableLocalServers) { + const maxApps = server.maxApps ?? 1; + const activeSessions = await this.findByServerId(server.id, {}); + const activeCount = activeSessions.filter( + (s) => s.status === "starting" || s.status === "started" + ).length; + + if (activeCount < maxApps) { + return server.id; + } + } + + return undefined; } }, @@ -283,13 +321,53 @@ export const serverSessionService = { ); } - // Для local-сессий выбираем сервер сразу - // Для stream-сессий сервер будет назначен динамически при запуске + // Проверить доступность серверов перед созданием сессии + // Импортируем serverService динамически чтобы избежать циклических зависимостей + const { serverService } = await import("../server"); + let selectedServerId = serverId; - if (mode === "local" && !selectedServerId) { - selectedServerId = await this.selectAvailableServer(mode); + + if (mode === "local") { + // Для local-сессий выбираем сервер сразу с проверкой maxApps if (!selectedServerId) { - throw new Error(`Нет доступных серверов для режима ${mode}`); + selectedServerId = await this.selectAvailableServer(mode); + if (!selectedServerId) { + throw new Error( + `Нет доступных local серверов (все серверы достигли лимита активных сессий)` + ); + } + } + } else { + // Для stream-сессий проверяем наличие доступных серверов перед созданием сессии + const preferredTier = tier || (userId ? undefined : "demo"); + const availableServers = await serverService.findAvailableStreamServers(preferredTier); + + if (availableServers.length === 0) { + const serverType = preferredTier === "demo" ? "demo " : ""; + throw new Error( + `Нет доступных ${serverType}stream серверов (проверьте, что stream серверы зарегистрированы)` + ); + } + + // Проверяем, есть ли хотя бы один сервер с доступным местом + let hasAvailableServer = false; + for (const server of availableServers) { + const maxApps = server.maxApps ?? 1; + const activeSessions = await this.findByServerId(server.id, {}); + const activeCount = activeSessions.filter( + (s) => s.status === "starting" || s.status === "started" + ).length; + + if (activeCount < maxApps) { + hasAvailableServer = true; + break; + } + } + + if (!hasAvailableServer) { + throw new Error( + `Нет доступных stream серверов (все серверы достигли лимита активных сессий)` + ); } } diff --git a/session-server/src/index.ts b/session-server/src/index.ts index 60accfc..b3418d8 100644 --- a/session-server/src/index.ts +++ b/session-server/src/index.ts @@ -329,7 +329,15 @@ async function updateSessionStatus( * Запустить приложение для сессии */ async function startApplication(session: SessionData): Promise { - const { id: sessionId, app, serverId, mode } = session; + const { id: sessionId, app, serverId, mode, status } = session; + + // Проверить статус сессии - не запускать, если статус "ending" или "ended" + if (status === "ending" || status === "ended") { + console.log( + `[${new Date().toISOString()}] ⚠️ Сессия ${sessionId} имеет статус "${status}", запуск отменен` + ); + return; + } // Проверить, не обрабатывается ли уже эта сессия if (processingSessions.has(sessionId)) { @@ -567,6 +575,10 @@ async function startApplication(session: SessionData): Promise { ); } + // Проверить, был ли процесс уже удален из activeProcesses в stopApplication + // Если процесса уже нет в activeProcesses, значит статус уже обновлен в stopApplication + const wasProcessRemoved = !activeProcesses.has(sessionId); + activeProcesses.delete(sessionId); // Для stream-режима также останавливаем Cirrus если он ещё работает @@ -581,8 +593,11 @@ async function startApplication(session: SessionData): Promise { } } - // Обновить статус на "ended" - await updateSessionStatus(sessionId, "ended"); + // Обновить статус на "ended" только если процесс не был удален в stopApplication + // (если был удален, статус уже обновлен в stopApplication) + if (!wasProcessRemoved) { + await updateSessionStatus(sessionId, "ended"); + } }); appProcess.on("error", async (error) => { @@ -590,6 +605,10 @@ async function startApplication(session: SessionData): Promise { `[${new Date().toISOString()}] ❌ Ошибка процесса приложения для сессии ${sessionId}:`, error ); + + // Проверить, был ли процесс уже удален из activeProcesses в stopApplication + const wasProcessRemoved = !activeProcesses.has(sessionId); + activeProcesses.delete(sessionId); // Для stream-режима также останавливаем Cirrus @@ -601,8 +620,10 @@ async function startApplication(session: SessionData): Promise { } } - // Обновить статус на "ended" в случае ошибки - await updateSessionStatus(sessionId, "ended"); + // Обновить статус на "ended" только если процесс не был удален в stopApplication + if (!wasProcessRemoved) { + await updateSessionStatus(sessionId, "ended"); + } }); // Обновить статус на "started" с PID приложения @@ -688,9 +709,14 @@ function killProcessTree(pid: number): void { async function stopApplication(session: SessionData): Promise { const { id: sessionId, appPid, cirrusPid, mode } = session; - // Проверить, не обрабатывается ли уже эта сессия - if (processingSessions.has(sessionId)) { - return; + // Если сессия уже обрабатывается, но статус "ending", нужно остановить процесс + // Удаляем из processingSessions, чтобы позволить остановку + const wasProcessing = processingSessions.has(sessionId); + if (wasProcessing) { + console.log( + `[${new Date().toISOString()}] ⚠️ Сессия ${sessionId} находится в процессе обработки, но статус "ending" - принудительная остановка` + ); + processingSessions.delete(sessionId); } const appProcess = activeProcesses.get(sessionId); @@ -834,16 +860,18 @@ async function checkSessions(): Promise { } } - for (const session of startingSessions) { - await startApplication(session); - } - - // Обработать сессии со статусом "ending" + // Обработать сессии со статусом "ending" ПЕРЕД запуском новых сессий + // Это гарантирует, что процессы будут остановлены до запуска новых const endingSessions = sessions.filter((s) => s.status === "ending"); for (const session of endingSessions) { await stopApplication(session); } + // Обработать сессии со статусом "starting" после остановки завершающихся сессий + for (const session of startingSessions) { + await startApplication(session); + } + // Проверить, что все активные процессы соответствуют активным сессиям // Сессия считается активной, если: // 1. Статус "started"