Refactor server session selection logic to incorporate maxApps limits and improve error handling

- Enhanced selectAvailableServer function to consider maxApps when selecting available servers for both stream and local sessions.
- Added dynamic import of serverService to avoid circular dependencies.
- Improved error messages for unavailable servers based on session type.
- Updated startApplication function to check session status before launching and handle session termination more gracefully.
- Ensured proper session management by processing ending sessions before starting new ones.
This commit is contained in:
2025-12-05 19:52:18 +05:00
parent c384087f38
commit e8b8eca0d6
2 changed files with 133 additions and 27 deletions
+92 -14
View File
@@ -226,19 +226,57 @@ export const serverSessionService = {
}, },
/** /**
* Выбрать доступный сервер * Выбрать доступный сервер с учетом maxApps
*/ */
async selectAvailableServer(mode: SessionMode): Promise<string | undefined> { async selectAvailableServer(mode: SessionMode): Promise<string | undefined> {
// Импортируем serverService динамически чтобы избежать циклических зависимостей
const { serverService } = await import("../server");
if (mode === "stream") { if (mode === "stream") {
const server = await db.query.servers.findFirst({ // Для stream-сессий используем логику из assignServer
where: and(eq(servers.type, "stream"), eq(servers.tier, "prod")), const tier = undefined; // Для selectAvailableServer используем prod по умолчанию
}); const availableServers = await serverService.findAvailableStreamServers(tier);
return server?.id;
if (availableServers.length === 0) {
return undefined;
}
// Проверяем количество активных сессий на каждом сервере
for (const server of availableServers) {
const maxApps = server.maxApps ?? 1;
const activeSessions = await this.findByServerId(server.id, {});
const activeCount = activeSessions.filter(
(s) => s.status === "starting" || s.status === "started"
).length;
if (activeCount < maxApps) {
return server.id;
}
}
return undefined;
} else { } else {
const server = await db.query.servers.findFirst({ // Для local-сессий проверяем maxApps
where: eq(servers.type, "local"), const availableLocalServers = await serverService.findAvailableLocalServers();
});
return server?.id; if (availableLocalServers.length === 0) {
return undefined;
}
// Проверяем количество активных сессий на каждом сервере
for (const server of availableLocalServers) {
const maxApps = server.maxApps ?? 1;
const activeSessions = await this.findByServerId(server.id, {});
const activeCount = activeSessions.filter(
(s) => s.status === "starting" || s.status === "started"
).length;
if (activeCount < maxApps) {
return server.id;
}
}
return undefined;
} }
}, },
@@ -283,13 +321,53 @@ export const serverSessionService = {
); );
} }
// Для local-сессий выбираем сервер сразу // Проверить доступность серверов перед созданием сессии
// Для stream-сессий сервер будет назначен динамически при запуске // Импортируем serverService динамически чтобы избежать циклических зависимостей
const { serverService } = await import("../server");
let selectedServerId = serverId; let selectedServerId = serverId;
if (mode === "local" && !selectedServerId) {
selectedServerId = await this.selectAvailableServer(mode); if (mode === "local") {
// Для local-сессий выбираем сервер сразу с проверкой maxApps
if (!selectedServerId) { if (!selectedServerId) {
throw new Error(`Нет доступных серверов для режима ${mode}`); selectedServerId = await this.selectAvailableServer(mode);
if (!selectedServerId) {
throw new Error(
`Нет доступных local серверов (все серверы достигли лимита активных сессий)`
);
}
}
} else {
// Для stream-сессий проверяем наличие доступных серверов перед созданием сессии
const preferredTier = tier || (userId ? undefined : "demo");
const availableServers = await serverService.findAvailableStreamServers(preferredTier);
if (availableServers.length === 0) {
const serverType = preferredTier === "demo" ? "demo " : "";
throw new Error(
`Нет доступных ${serverType}stream серверов (проверьте, что stream серверы зарегистрированы)`
);
}
// Проверяем, есть ли хотя бы один сервер с доступным местом
let hasAvailableServer = false;
for (const server of availableServers) {
const maxApps = server.maxApps ?? 1;
const activeSessions = await this.findByServerId(server.id, {});
const activeCount = activeSessions.filter(
(s) => s.status === "starting" || s.status === "started"
).length;
if (activeCount < maxApps) {
hasAvailableServer = true;
break;
}
}
if (!hasAvailableServer) {
throw new Error(
`Нет доступных stream серверов (все серверы достигли лимита активных сессий)`
);
} }
} }
+41 -13
View File
@@ -329,7 +329,15 @@ async function updateSessionStatus(
* Запустить приложение для сессии * Запустить приложение для сессии
*/ */
async function startApplication(session: SessionData): Promise<void> { async function startApplication(session: SessionData): Promise<void> {
const { id: sessionId, app, serverId, mode } = session; const { id: sessionId, app, serverId, mode, status } = session;
// Проверить статус сессии - не запускать, если статус "ending" или "ended"
if (status === "ending" || status === "ended") {
console.log(
`[${new Date().toISOString()}] ⚠️ Сессия ${sessionId} имеет статус "${status}", запуск отменен`
);
return;
}
// Проверить, не обрабатывается ли уже эта сессия // Проверить, не обрабатывается ли уже эта сессия
if (processingSessions.has(sessionId)) { if (processingSessions.has(sessionId)) {
@@ -567,6 +575,10 @@ async function startApplication(session: SessionData): Promise<void> {
); );
} }
// Проверить, был ли процесс уже удален из activeProcesses в stopApplication
// Если процесса уже нет в activeProcesses, значит статус уже обновлен в stopApplication
const wasProcessRemoved = !activeProcesses.has(sessionId);
activeProcesses.delete(sessionId); activeProcesses.delete(sessionId);
// Для stream-режима также останавливаем Cirrus если он ещё работает // Для stream-режима также останавливаем Cirrus если он ещё работает
@@ -581,8 +593,11 @@ async function startApplication(session: SessionData): Promise<void> {
} }
} }
// Обновить статус на "ended" // Обновить статус на "ended" только если процесс не был удален в stopApplication
await updateSessionStatus(sessionId, "ended"); // (если был удален, статус уже обновлен в stopApplication)
if (!wasProcessRemoved) {
await updateSessionStatus(sessionId, "ended");
}
}); });
appProcess.on("error", async (error) => { appProcess.on("error", async (error) => {
@@ -590,6 +605,10 @@ async function startApplication(session: SessionData): Promise<void> {
`[${new Date().toISOString()}] ❌ Ошибка процесса приложения для сессии ${sessionId}:`, `[${new Date().toISOString()}] ❌ Ошибка процесса приложения для сессии ${sessionId}:`,
error error
); );
// Проверить, был ли процесс уже удален из activeProcesses в stopApplication
const wasProcessRemoved = !activeProcesses.has(sessionId);
activeProcesses.delete(sessionId); activeProcesses.delete(sessionId);
// Для stream-режима также останавливаем Cirrus // Для stream-режима также останавливаем Cirrus
@@ -601,8 +620,10 @@ async function startApplication(session: SessionData): Promise<void> {
} }
} }
// Обновить статус на "ended" в случае ошибки // Обновить статус на "ended" только если процесс не был удален в stopApplication
await updateSessionStatus(sessionId, "ended"); if (!wasProcessRemoved) {
await updateSessionStatus(sessionId, "ended");
}
}); });
// Обновить статус на "started" с PID приложения // Обновить статус на "started" с PID приложения
@@ -688,9 +709,14 @@ function killProcessTree(pid: number): void {
async function stopApplication(session: SessionData): Promise<void> { async function stopApplication(session: SessionData): Promise<void> {
const { id: sessionId, appPid, cirrusPid, mode } = session; const { id: sessionId, appPid, cirrusPid, mode } = session;
// Проверить, не обрабатывается ли уже эта сессия // Если сессия уже обрабатывается, но статус "ending", нужно остановить процесс
if (processingSessions.has(sessionId)) { // Удаляем из processingSessions, чтобы позволить остановку
return; const wasProcessing = processingSessions.has(sessionId);
if (wasProcessing) {
console.log(
`[${new Date().toISOString()}] ⚠️ Сессия ${sessionId} находится в процессе обработки, но статус "ending" - принудительная остановка`
);
processingSessions.delete(sessionId);
} }
const appProcess = activeProcesses.get(sessionId); const appProcess = activeProcesses.get(sessionId);
@@ -834,16 +860,18 @@ async function checkSessions(): Promise<void> {
} }
} }
for (const session of startingSessions) { // Обработать сессии со статусом "ending" ПЕРЕД запуском новых сессий
await startApplication(session); // Это гарантирует, что процессы будут остановлены до запуска новых
}
// Обработать сессии со статусом "ending"
const endingSessions = sessions.filter((s) => s.status === "ending"); const endingSessions = sessions.filter((s) => s.status === "ending");
for (const session of endingSessions) { for (const session of endingSessions) {
await stopApplication(session); await stopApplication(session);
} }
// Обработать сессии со статусом "starting" после остановки завершающихся сессий
for (const session of startingSessions) {
await startApplication(session);
}
// Проверить, что все активные процессы соответствуют активным сессиям // Проверить, что все активные процессы соответствуют активным сессиям
// Сессия считается активной, если: // Сессия считается активной, если:
// 1. Статус "started" // 1. Статус "started"