From f00d3a447dab319735c5092f994d919ef70a337c Mon Sep 17 00:00:00 2001 From: C Date: Tue, 7 Feb 2023 18:22:10 +0500 Subject: [PATCH] process kill updated to invisible, webrtc_timeout increased, port allocation changed for high concurrency, port information added to database, app process kill added if webrtc not started --- src/controller/session.js | 6 +- src/database/database.js | 6 +- src/modules/port_alloc.js | 185 +++++++++++++++++++++++++++----- src/modules/run_process.js | 10 +- src/modules/session_observer.js | 6 +- 5 files changed, 176 insertions(+), 37 deletions(-) diff --git a/src/controller/session.js b/src/controller/session.js index 9eab745..c2294b0 100644 --- a/src/controller/session.js +++ b/src/controller/session.js @@ -3,7 +3,7 @@ const {get_webrtc_port, get_app_port} = require('../modules/port_alloc') const {get_app_path} = require('../modules/titles') const not_found_error = require('../../lib/src/http/errors/not_found_error') const server_error = require('../../lib/src/http/errors/server_error') -const {run_webrtc, run_app} = require('../modules/run_process') +const {run_webrtc, run_app, kill_proc} = require('../modules/run_process') const {create_websocket_url} = require('../modules/links') @@ -32,17 +32,17 @@ const run_session = async (req, res, next) => { var webrtc_pid = await run_webrtc(webrtc_port, app_port) if (!webrtc_pid) { + kill_proc(app_pid) next(new server_error('can not run webrtc')) return } - var add_runnning_session_result = await database.add_running_session(session_id, app_pid, webrtc_pid) + var add_runnning_session_result = await database.add_running_session(session_id, app_pid, webrtc_pid, app_port, webrtc_port) if(!add_runnning_session_result) { next(new server_error('can not add session to database')) return } - res.json({websocket_url:create_websocket_url(webrtc_port)}) } diff --git a/src/database/database.js b/src/database/database.js index fbc4fff..bcd3f9a 100644 --- a/src/database/database.js +++ b/src/database/database.js @@ -19,7 +19,7 @@ const get_db = async () => { } } -const add_running_session = async (session_id, app_pid, webrtc_pid) => { +const add_running_session = async (session_id, app_pid, webrtc_pid, app_port, webrtc_port) => { var db = await get_db() if (!db) { return false @@ -30,7 +30,9 @@ const add_running_session = async (session_id, app_pid, webrtc_pid) => { await running_session.insertOne({ session_id: session_id, app_pid: app_pid, - webrtc_pid: webrtc_pid + webrtc_pid: webrtc_pid, + app_port: app_port, + webrtc_port: webrtc_port }) return true } diff --git a/src/modules/port_alloc.js b/src/modules/port_alloc.js index 275c488..42b0c86 100644 --- a/src/modules/port_alloc.js +++ b/src/modules/port_alloc.js @@ -1,40 +1,167 @@ const portfinder = require('portfinder') const {webrtc_port_begin, session_limit} = require('../../config') +const database = require('../database/database') -const min_available_port = 2000 -const max_available_port = 65535 +class port { + constructor(value) { + this.#value = value + this.#free = true + } + get_value() { + return this.#value + } + set_free() { + this.#free = true + } + set_busy() { + this.#free = false + } + is_free() { + return this.#free + } + #value + #free +} -const get_port_in_range = async (first, last) => { - //could be singleton problem - portfinder.setBasePort(first) - portfinder.setHighestPort(last) - try { - var port = await portfinder.getPortPromise() - return port - } catch(err) { +class port_alloc { + constructor(port_begin, count) { + this.#ports = new Array() + for (var i = port_begin; i < port_begin + count; ++i) { + this.#ports.push(new port(i)) + } + } + get() { + for (var i in this.#ports) { + if (this.#ports[i].is_free()) { + this.#ports[i].set_busy() + return this.#ports[i].get_value() + } + } return null } -} - -const get_webrtc_port = async () => { - var port = await get_port_in_range(webrtc_port_begin, webrtc_port_begin + session_limit - 1) - return port -} - -const get_app_port = async () => { - const port_range_size_before_webrtc = webrtc_port_begin - min_available_port - const port_range_after_webrtc = max_available_port - (webrtc_port_begin + session_limit) - - var port - if (port_range_size_before_webrtc > port_range_after_webrtc) { - port = await get_port_in_range(min_available_port, webrtc_port_begin - 1) - } else { - port = await get_port_in_range(webrtc_port_begin + session_limit, max_available_port) + free(busy_port) { + for (var i in this.#ports) { + if (this.#ports[i].get_value() == busy_port) { + this.#ports[i].set_free() + return + } + } } - return port + set_port_as_busy(busy_port) { + var first_port = this.#ports[0].get_value() + var last_port = this.#ports[this.#ports.length - 1].get_value() + + if (!(busy_port >= first_port && busy_port <= last_port)) { + return false + } + var ports_count = this.#ports.length + for (var i = 0; i < ports_count; ++i) { + if (this.#ports[i].get_value() == busy_port) { + this.#ports[i].set_busy() + return true + } + } + return false + } + #ports } + +const min_available_port = 10000 +const max_available_port = 65535 + +// user does not set app port, calculate it from webrtc port and session limit +const get_app_port_begin = (webrtc_port_begin, session_limit) => { + var app_port_begin = webrtc_port_begin + session_limit + if (app_port_begin > max_available_port) { + app_port_begin = webrtc_port_begin - session_limit + } + return app_port_begin +} + +var webrtc_port_alloc = new port_alloc(webrtc_port_begin, session_limit) +var app_port_alloc = new port_alloc(get_app_port_begin(webrtc_port_begin, session_limit), session_limit) + +// this will restore all allocated ports before session server reboot +var ports_restored = false +const restore_ports_from_database = async () => { + var sessions = await database.get_running_sessions() + + if (!sessions) { + return + } + + sessions.forEach(session => { + app_port_alloc.set_port_as_busy(session.app_port) + webrtc_port_alloc.set_port_as_busy(session.webrtc_port) + }) + ports_restored = true +} + + + +const check_ports_restored = async () => { + if (ports_restored) { + return + } + await restore_ports_from_database() +} + +// webrtc ports +const get_webrtc_port = async () => { + await check_ports_restored() + return webrtc_port_alloc.get() +} + +const free_webrtc_port = (port) => { + webrtc_port_alloc.free(port) +} + +// app ports +const get_app_port = async () => { + await check_ports_restored() + return app_port_alloc.get() +} + +const free_app_port = (port) => { + app_port_alloc.free(port) +} + +// THIS MODULES DISABLED BECAUSE CAN NOT BIND PORT TO PROCESS IMMEDIATELY, CAUSE PROBLEMS IN HIGH LOAD + +// const get_port_in_range = async (first, last) => { +// //could be singleton problem +// portfinder.setBasePort(first) +// portfinder.setHighestPort(last) +// try { +// var port = await portfinder.getPortPromise() +// return port +// } catch(err) { +// return null +// } +// } + +// const get_webrtc_port = async () => { +// var port = await get_port_in_range(webrtc_port_begin, webrtc_port_begin + session_limit - 1) +// return port +// } + +// const get_app_port = async () => { +// const port_range_size_before_webrtc = webrtc_port_begin - min_available_port +// const port_range_after_webrtc = max_available_port - (webrtc_port_begin + session_limit) + +// var port +// if (port_range_size_before_webrtc > port_range_after_webrtc) { +// port = await get_port_in_range(min_available_port, webrtc_port_begin - 1) +// } else { +// port = await get_port_in_range(webrtc_port_begin + session_limit, max_available_port) +// } +// return port +// } + module.exports = { get_webrtc_port, - get_app_port -} + free_webrtc_port, + get_app_port, + free_app_port +} \ No newline at end of file diff --git a/src/modules/run_process.js b/src/modules/run_process.js index cc83ae0..0f9957a 100644 --- a/src/modules/run_process.js +++ b/src/modules/run_process.js @@ -7,7 +7,7 @@ const app_port_arg = config.app_args.runtime.port const fs = require('fs') const tcp_port_used = require('tcp-port-used') -const webrtc_timeout = 5000 +const webrtc_timeout = 60000 const webrtc_retry_time = 100 const is_proc_running = (pid) => { @@ -43,6 +43,7 @@ const run_webrtc = async (webrtc_port, app_port) => { }) await tcp_port_used.waitUntilUsed(webrtc_port, webrtc_retry_time, webrtc_timeout) } catch (err) { + console.error(err) return null } @@ -80,7 +81,12 @@ const kill_proc = (pid) => { return } if (process.platform === 'win32') { - spawn('taskkill', ['/pid', pid, '/f', '/t']) + spawn('taskkill', ['/pid', pid, '/f', '/t'], { + shell: false, + detached: true, + stdio: 'ignore', + windowsHide: true + }) } else { process.kill(pid) diff --git a/src/modules/session_observer.js b/src/modules/session_observer.js index 29b63d5..43c5e4d 100644 --- a/src/modules/session_observer.js +++ b/src/modules/session_observer.js @@ -3,6 +3,7 @@ const database = require('../database/database') const {is_proc_running, kill_proc} = require('./run_process') const coordinator = require('./coordinator') const wait_list = require('./wait_list') +const {free_app_port, free_webrtc_port} = require('./port_alloc') const observer_timeout = 100 var pending_list = new wait_list([]) @@ -34,12 +35,15 @@ const check_sessions = async (sessions) => { // kill process if running kill_proc(session.webrtc_pid) kill_proc(session.app_pid) + // free ports allocated for processes + free_webrtc_port(session.webrtc_port) + free_app_port(session.app_port) var close_session_result = await coordinator.close_session(session.session_id) if (close_session_result) { await database.remove_running_session(session.session_id) } else { - console.log('can not connect to coordinator') + console.error('can not connect to coordinator') } pending_list.remove(session.session_id) }