process kill updated to invisible, webrtc_timeout increased, port allocation changed for high concurrency, port information added to database, app process kill added if webrtc not started

This commit is contained in:
C
2023-02-07 18:22:10 +05:00
parent a0ecb483a1
commit f00d3a447d
5 changed files with 176 additions and 37 deletions
+3 -3
View File
@@ -3,7 +3,7 @@ const {get_webrtc_port, get_app_port} = require('../modules/port_alloc')
const {get_app_path} = require('../modules/titles')
const not_found_error = require('../../lib/src/http/errors/not_found_error')
const server_error = require('../../lib/src/http/errors/server_error')
const {run_webrtc, run_app} = require('../modules/run_process')
const {run_webrtc, run_app, kill_proc} = require('../modules/run_process')
const {create_websocket_url} = require('../modules/links')
@@ -32,17 +32,17 @@ const run_session = async (req, res, next) => {
var webrtc_pid = await run_webrtc(webrtc_port, app_port)
if (!webrtc_pid) {
kill_proc(app_pid)
next(new server_error('can not run webrtc'))
return
}
var add_runnning_session_result = await database.add_running_session(session_id, app_pid, webrtc_pid)
var add_runnning_session_result = await database.add_running_session(session_id, app_pid, webrtc_pid, app_port, webrtc_port)
if(!add_runnning_session_result) {
next(new server_error('can not add session to database'))
return
}
res.json({websocket_url:create_websocket_url(webrtc_port)})
}
+4 -2
View File
@@ -19,7 +19,7 @@ const get_db = async () => {
}
}
const add_running_session = async (session_id, app_pid, webrtc_pid) => {
const add_running_session = async (session_id, app_pid, webrtc_pid, app_port, webrtc_port) => {
var db = await get_db()
if (!db) {
return false
@@ -30,7 +30,9 @@ const add_running_session = async (session_id, app_pid, webrtc_pid) => {
await running_session.insertOne({
session_id: session_id,
app_pid: app_pid,
webrtc_pid: webrtc_pid
webrtc_pid: webrtc_pid,
app_port: app_port,
webrtc_port: webrtc_port
})
return true
}
+156 -29
View File
@@ -1,40 +1,167 @@
const portfinder = require('portfinder')
const {webrtc_port_begin, session_limit} = require('../../config')
const database = require('../database/database')
const min_available_port = 2000
const max_available_port = 65535
class port {
constructor(value) {
this.#value = value
this.#free = true
}
get_value() {
return this.#value
}
set_free() {
this.#free = true
}
set_busy() {
this.#free = false
}
is_free() {
return this.#free
}
#value
#free
}
const get_port_in_range = async (first, last) => {
//could be singleton problem
portfinder.setBasePort(first)
portfinder.setHighestPort(last)
try {
var port = await portfinder.getPortPromise()
return port
} catch(err) {
class port_alloc {
constructor(port_begin, count) {
this.#ports = new Array()
for (var i = port_begin; i < port_begin + count; ++i) {
this.#ports.push(new port(i))
}
}
get() {
for (var i in this.#ports) {
if (this.#ports[i].is_free()) {
this.#ports[i].set_busy()
return this.#ports[i].get_value()
}
}
return null
}
}
const get_webrtc_port = async () => {
var port = await get_port_in_range(webrtc_port_begin, webrtc_port_begin + session_limit - 1)
return port
}
const get_app_port = async () => {
const port_range_size_before_webrtc = webrtc_port_begin - min_available_port
const port_range_after_webrtc = max_available_port - (webrtc_port_begin + session_limit)
var port
if (port_range_size_before_webrtc > port_range_after_webrtc) {
port = await get_port_in_range(min_available_port, webrtc_port_begin - 1)
} else {
port = await get_port_in_range(webrtc_port_begin + session_limit, max_available_port)
free(busy_port) {
for (var i in this.#ports) {
if (this.#ports[i].get_value() == busy_port) {
this.#ports[i].set_free()
return
}
}
}
return port
set_port_as_busy(busy_port) {
var first_port = this.#ports[0].get_value()
var last_port = this.#ports[this.#ports.length - 1].get_value()
if (!(busy_port >= first_port && busy_port <= last_port)) {
return false
}
var ports_count = this.#ports.length
for (var i = 0; i < ports_count; ++i) {
if (this.#ports[i].get_value() == busy_port) {
this.#ports[i].set_busy()
return true
}
}
return false
}
#ports
}
const min_available_port = 10000
const max_available_port = 65535
// user does not set app port, calculate it from webrtc port and session limit
const get_app_port_begin = (webrtc_port_begin, session_limit) => {
var app_port_begin = webrtc_port_begin + session_limit
if (app_port_begin > max_available_port) {
app_port_begin = webrtc_port_begin - session_limit
}
return app_port_begin
}
var webrtc_port_alloc = new port_alloc(webrtc_port_begin, session_limit)
var app_port_alloc = new port_alloc(get_app_port_begin(webrtc_port_begin, session_limit), session_limit)
// this will restore all allocated ports before session server reboot
var ports_restored = false
const restore_ports_from_database = async () => {
var sessions = await database.get_running_sessions()
if (!sessions) {
return
}
sessions.forEach(session => {
app_port_alloc.set_port_as_busy(session.app_port)
webrtc_port_alloc.set_port_as_busy(session.webrtc_port)
})
ports_restored = true
}
const check_ports_restored = async () => {
if (ports_restored) {
return
}
await restore_ports_from_database()
}
// webrtc ports
const get_webrtc_port = async () => {
await check_ports_restored()
return webrtc_port_alloc.get()
}
const free_webrtc_port = (port) => {
webrtc_port_alloc.free(port)
}
// app ports
const get_app_port = async () => {
await check_ports_restored()
return app_port_alloc.get()
}
const free_app_port = (port) => {
app_port_alloc.free(port)
}
// THIS MODULES DISABLED BECAUSE CAN NOT BIND PORT TO PROCESS IMMEDIATELY, CAUSE PROBLEMS IN HIGH LOAD
// const get_port_in_range = async (first, last) => {
// //could be singleton problem
// portfinder.setBasePort(first)
// portfinder.setHighestPort(last)
// try {
// var port = await portfinder.getPortPromise()
// return port
// } catch(err) {
// return null
// }
// }
// const get_webrtc_port = async () => {
// var port = await get_port_in_range(webrtc_port_begin, webrtc_port_begin + session_limit - 1)
// return port
// }
// const get_app_port = async () => {
// const port_range_size_before_webrtc = webrtc_port_begin - min_available_port
// const port_range_after_webrtc = max_available_port - (webrtc_port_begin + session_limit)
// var port
// if (port_range_size_before_webrtc > port_range_after_webrtc) {
// port = await get_port_in_range(min_available_port, webrtc_port_begin - 1)
// } else {
// port = await get_port_in_range(webrtc_port_begin + session_limit, max_available_port)
// }
// return port
// }
module.exports = {
get_webrtc_port,
get_app_port
}
free_webrtc_port,
get_app_port,
free_app_port
}
+8 -2
View File
@@ -7,7 +7,7 @@ const app_port_arg = config.app_args.runtime.port
const fs = require('fs')
const tcp_port_used = require('tcp-port-used')
const webrtc_timeout = 5000
const webrtc_timeout = 60000
const webrtc_retry_time = 100
const is_proc_running = (pid) => {
@@ -43,6 +43,7 @@ const run_webrtc = async (webrtc_port, app_port) => {
})
await tcp_port_used.waitUntilUsed(webrtc_port, webrtc_retry_time, webrtc_timeout)
} catch (err) {
console.error(err)
return null
}
@@ -80,7 +81,12 @@ const kill_proc = (pid) => {
return
}
if (process.platform === 'win32') {
spawn('taskkill', ['/pid', pid, '/f', '/t'])
spawn('taskkill', ['/pid', pid, '/f', '/t'], {
shell: false,
detached: true,
stdio: 'ignore',
windowsHide: true
})
}
else {
process.kill(pid)
+5 -1
View File
@@ -3,6 +3,7 @@ const database = require('../database/database')
const {is_proc_running, kill_proc} = require('./run_process')
const coordinator = require('./coordinator')
const wait_list = require('./wait_list')
const {free_app_port, free_webrtc_port} = require('./port_alloc')
const observer_timeout = 100
var pending_list = new wait_list([])
@@ -34,12 +35,15 @@ const check_sessions = async (sessions) => {
// kill process if running
kill_proc(session.webrtc_pid)
kill_proc(session.app_pid)
// free ports allocated for processes
free_webrtc_port(session.webrtc_port)
free_app_port(session.app_port)
var close_session_result = await coordinator.close_session(session.session_id)
if (close_session_result) {
await database.remove_running_session(session.session_id)
} else {
console.log('can not connect to coordinator')
console.error('can not connect to coordinator')
}
pending_list.remove(session.session_id)
}