From 9c2c413363034307aa9ce918567e13771e9e2606 Mon Sep 17 00:00:00 2001 From: "PC-ROGE\\c" Date: Wed, 19 Apr 2023 21:52:30 +0300 Subject: [PATCH] sfu support added, IPC cirrus support added, session history added, advanced session info added --- config.js | 12 +-- lib | 2 +- src/controller/cirrus.js | 24 ++++++ src/controller/session.js | 29 ++++--- src/controller/status.js | 18 ++++- src/database/database.js | 128 +++++++++++++++++++++++++++++-- src/middlewares/error_handler.js | 3 + src/middlewares/logger.js | 2 +- src/modules/coordinator.js | 14 +++- src/modules/links.js | 2 +- src/modules/port_alloc.js | 80 ++++++++++++------- src/modules/run_process.js | 14 ++-- src/modules/session_observer.js | 14 ++-- src/modules/titles.js | 2 +- src/routes/cirrus.js | 13 ++++ src/routes/index.js | 2 + 16 files changed, 285 insertions(+), 74 deletions(-) create mode 100644 src/controller/cirrus.js create mode 100644 src/routes/cirrus.js diff --git a/config.js b/config.js index 2536f4e..1e5da9d 100644 --- a/config.js +++ b/config.js @@ -18,14 +18,16 @@ module.exports = { "-PixelStreamingIP=127.0.0.1", "-RenderOffScreen", "-ForceRes", - "-ResX=1600", - "-ResY=900", + "-ResX=1920", + "-ResY=1080", "-Unattended", "-MaxFPS=30", "-PixelStreamingWebRTCMaxFps=30", - "-PixelStreamingEncoderMinQP=25", - "-PixelStreamingEncoderMultipass=DISABLED", - "-PixelStreamingWebRTCDisableAudioSync" + "-PixelStreamingWebRTCDisableAudioSync", + "-PixelStreamingHideCursor=true", + "-PixelStreamingKeyFilter=ESC, CTRL", + "-SimulcastParameters=1.0,5000000,100000000,2.0,1000000,5000000,3.0,500000,2500000", + //"-PixelStreamingHudStats=true" // usefull for debug ] } } \ No newline at end of file diff --git a/lib b/lib index b3fa29e..2daac94 160000 --- a/lib +++ b/lib @@ -1 +1 @@ -Subproject commit b3fa29e4c5034524a15c57ff9cfcbf2b142032d3 +Subproject commit 2daac94e9224762fbec7578ae5b964e6d413c691 diff --git a/src/controller/cirrus.js b/src/controller/cirrus.js new file mode 100644 index 0000000..f48131f --- /dev/null +++ b/src/controller/cirrus.js @@ -0,0 +1,24 @@ +const database = require('../database/database') + + +const update_users_count = async (req, res, next) => { + const {session_id, users_count} = req.body + + database.set_session_current_uniq_users(session_id, users_count) + + let session = await database.get_running_session_by_id(session_id) + + if (!session) return + + if (users_count > session.max_uniq_users) { + database.set_session_max_uniq_users(session_id, users_count) + } + + res.end() +} + + + +module.exports = { + update_users_count +} \ No newline at end of file diff --git a/src/controller/session.js b/src/controller/session.js index c2294b0..d763ddb 100644 --- a/src/controller/session.js +++ b/src/controller/session.js @@ -1,43 +1,54 @@ const database = require('../database/database') -const {get_webrtc_port, get_app_port} = require('../modules/port_alloc') +const {get_webrtc_port, get_app_port, get_sfu_port} = require('../modules/port_alloc') const {get_app_path} = require('../modules/titles') const not_found_error = require('../../lib/src/http/errors/not_found_error') const server_error = require('../../lib/src/http/errors/server_error') const {run_webrtc, run_app, kill_proc} = require('../modules/run_process') const {create_websocket_url} = require('../modules/links') +const {port} = require('../../config') const run_session = async (req, res, next) => { const {title, session_id} = req.body - var app_path = get_app_path(title) + let app_path = get_app_path(title) if (!app_path) { next(new not_found_error('title or path does not exist')) return } - var webrtc_port = await get_webrtc_port() - var app_port = await get_app_port() + let webrtc_port = await get_webrtc_port() + let app_port = await get_app_port() + let sfu_port = await get_sfu_port() - if (!app_port || !webrtc_port) { - next(new server_error('all ports busy')) + if (!app_port) { + next(new server_error('all app ports busy')) + return + } else if (!webrtc_port) { + next(new server_error('all webrtc ports busy')) + return + } else if (!sfu_port) { + next(new server_error('all sfu ports busy')) return } - var app_pid = await run_app(app_path, app_port) + let app_pid = await run_app(app_path, app_port) if (!app_pid) { next(new server_error('can not run app')) return } - var webrtc_pid = await run_webrtc(webrtc_port, app_port) + const this_server_url = `http://127.0.0.1:${port}` + + let webrtc_pid = await run_webrtc(webrtc_port, app_port, sfu_port, this_server_url, session_id) if (!webrtc_pid) { kill_proc(app_pid) next(new server_error('can not run webrtc')) return } - var add_runnning_session_result = await database.add_running_session(session_id, app_pid, webrtc_pid, app_port, webrtc_port) + + let add_runnning_session_result = await database.add_running_session(session_id, title, app_pid, webrtc_pid, app_port, webrtc_port, sfu_port) if(!add_runnning_session_result) { next(new server_error('can not add session to database')) diff --git a/src/controller/status.js b/src/controller/status.js index 1dd931c..81d2b97 100644 --- a/src/controller/status.js +++ b/src/controller/status.js @@ -1,9 +1,21 @@ const database = require('../database/database') +const {session_limit} = require('../../config') const get_overal_status = async (req, res, next) => { - //let sessions_count = database.get_running_sessions() - //console.log(sessions_count) - res.json({message:"overal_status"}) + let sessions = await database.get_running_sessions() + let sessions_count = sessions ? sessions.length : 0 + + let error_history = await database.get_error_history() + let session_history = await database.get_session_history() + + res.json({ + server_load: sessions_count / session_limit, + session_limit: session_limit, + running_sessions_count: sessions_count, + running_sessions: sessions, + error_history: error_history, + session_history: session_history + }) } module.exports = { diff --git a/src/database/database.js b/src/database/database.js index bcd3f9a..da7be76 100644 --- a/src/database/database.js +++ b/src/database/database.js @@ -1,5 +1,6 @@ const { MongoClient } = require("mongodb") const { database_url, database_name } = require('../../config') +const time = require('../../lib/src/time') let max_database_connection_timeout = 1000 const client = new MongoClient(database_url, { @@ -19,26 +20,32 @@ const get_db = async () => { } } -const add_running_session = async (session_id, app_pid, webrtc_pid, app_port, webrtc_port) => { - var db = await get_db() +const add_running_session = async (session_id, title, app_pid, webrtc_pid, app_port, webrtc_port, sfu_port) => { + let db = await get_db() if (!db) { return false } - var running_session = db.collection('running_session') + let running_session = db.collection('running_session') await running_session.insertOne({ session_id: session_id, + title: title, app_pid: app_pid, webrtc_pid: webrtc_pid, app_port: app_port, - webrtc_port: webrtc_port + webrtc_port: webrtc_port, + sfu_port: sfu_port, + current_uniq_users: 0, + max_uniq_users: 0, + time: time.get_date_time(), + unix_timestamp: time.get_unix_timestamp() }) return true } const remove_running_session = async (session_id) => { - var db = await get_db() + let db = await get_db() if (!db) { return false } @@ -47,16 +54,121 @@ const remove_running_session = async (session_id) => { } const get_running_sessions = async () => { + let db = await get_db() + if (!db) { + return null + } + let running_sessions = await db.collection('running_session').find().toArray() + return (!running_sessions.length) ? null : running_sessions +} + +const get_running_session_by_id = async (session_id) => { + let db = await get_db() + if (!db) { + return null + } + let running_session = await db.collection('running_session').findOne({session_id:session_id}) + return running_session +} + +const add_error_history = async (error_message) => { + var db = await get_db() + if (!db) { + return false + } + + var error_collection = db.collection('error_history') + + await error_collection.insertOne({ + error_message:error_message, + time: time.get_date_time(), + unix_timestamp: time.get_unix_timestamp() + }) + return true +} + +const get_error_history = async () => { var db = await get_db() if (!db) { return null } - var running_sessions = await db.collection('running_session').find().toArray() - return (!running_sessions.length) ? null : running_sessions + var errors = await db.collection('error_history').find().toArray() + return (!errors.length) ? null : errors +} + + +const add_running_session_to_history = async (session_id) => { + let db = await get_db() + if (!db) { + return false + } + + let session_history = db.collection('session_history') + + let running_session = await get_running_session_by_id(session_id) + if (!session_id) { + return false + } + + await session_history.insertOne({ + session_id: session_id, + title: running_session.title, + app_pid: running_session.app_pid, + webrtc_pid: running_session.webrtc_pid, + app_port: running_session.app_port, + webrtc_port: running_session.webrtc_port, + sfu_port: running_session.sfu_port, + current_uniq_users: running_session.current_uniq_users, + max_uniq_users: running_session.max_uniq_users, + time: running_session.time, + unix_timestamp: running_session.unix_timestamp + }) + return true +} + +const get_session_history = async () => { + let db = await get_db() + if (!db) { + return null + } + + let session_history = await db.collection('session_history').find().toArray() + + if (!session_history.length) { + return null + } + return session_history +} + +const set_session_current_uniq_users = async (session_id, current_uniq_users) => { + let db = await get_db() + if (!db) { + return null + } + await db.collection('running_session').updateOne({session_id:session_id}, {$set:{current_uniq_users:current_uniq_users}}) + + return true +} + +const set_session_max_uniq_users = async (session_id, max_uniq_users) => { + let db = await get_db() + if (!db) { + return null + } + await db.collection('running_session').updateOne({session_id:session_id}, {$set:{max_uniq_users:max_uniq_users}}) + + return true } module.exports = { add_running_session, remove_running_session, - get_running_sessions + get_running_sessions, + add_error_history, + get_error_history, + add_running_session_to_history, + get_session_history, + get_running_session_by_id, + set_session_current_uniq_users, + set_session_max_uniq_users } \ No newline at end of file diff --git a/src/middlewares/error_handler.js b/src/middlewares/error_handler.js index 68d60e5..670d317 100644 --- a/src/middlewares/error_handler.js +++ b/src/middlewares/error_handler.js @@ -1,10 +1,13 @@ const server_error = require('../../lib/src/http/errors/server_error') +const database = require('../database/database') const error_handler = (err, req, res, next) => { if (!err.status_code) { err = new server_error(err.message) } res.status(err.status_code).json({message:err.message}) + + database.add_error_history(err.message) next(err) } diff --git a/src/middlewares/logger.js b/src/middlewares/logger.js index c45ddd4..e65e3d6 100644 --- a/src/middlewares/logger.js +++ b/src/middlewares/logger.js @@ -4,7 +4,7 @@ const {log_path} = require('../../config') const logger_runtime = new _logger(log_path) const request_logger = (req, res, next) => { - var {url} = req + let {url} = req logger_runtime.log(url) next() } diff --git a/src/modules/coordinator.js b/src/modules/coordinator.js index e3506c0..a6e7258 100644 --- a/src/modules/coordinator.js +++ b/src/modules/coordinator.js @@ -1,9 +1,10 @@ const {coordinator_url} = require('../../config') const request = require('request') +const time = require('../../lib/src/time') const max_response_timeout = 1000 -const close_session = async (session_id) => { - var options = { +const close_session = async (session) => { + let options = { url: coordinator_url + '/session_server/session_closed', method: "POST", timeout: max_response_timeout, @@ -11,7 +12,14 @@ const close_session = async (session_id) => { "content-type": "application/json" }, json: { - session_id: session_id + session_id: session.session_id, + title: session.title, + max_uniq_users: session.max_uniq_users, + users_at_close: session.current_uniq_users, + time: session.time, + unix_timestamp: session.unix_timestamp, + time_end: time.get_date_time(), + unix_timestamp_end: time.get_unix_timestamp() } } diff --git a/src/modules/links.js b/src/modules/links.js index af9a7c4..848d387 100644 --- a/src/modules/links.js +++ b/src/modules/links.js @@ -1,7 +1,7 @@ const {external_domain} = require('../../config') const create_websocket_url = (webrtc_port) => { - var valid_external_domain = external_domain + let valid_external_domain = external_domain // if external domain contains http OR https part remove it valid_external_domain = valid_external_domain.replace('http://', '').replace('https://', '') diff --git a/src/modules/port_alloc.js b/src/modules/port_alloc.js index 3d1f1da..ebad94b 100644 --- a/src/modules/port_alloc.js +++ b/src/modules/port_alloc.js @@ -25,12 +25,12 @@ class port { class port_alloc { constructor(port_begin, count) { this.#ports = new Array() - for (var i = port_begin; i < port_begin + count; ++i) { + for (let i = port_begin; i < port_begin + count; ++i) { this.#ports.push(new port(i)) } } get() { - for (var i in this.#ports) { + for (let i in this.#ports) { if (this.#ports[i].is_free()) { this.#ports[i].set_busy() return this.#ports[i].get_value() @@ -39,7 +39,7 @@ class port_alloc { return null } free(busy_port) { - for (var i in this.#ports) { + for (let i in this.#ports) { if (this.#ports[i].get_value() == busy_port) { this.#ports[i].set_free() return @@ -47,14 +47,14 @@ class port_alloc { } } set_port_as_busy(busy_port) { - var first_port = this.#ports[0].get_value() - var last_port = this.#ports[this.#ports.length - 1].get_value() + let first_port = this.#ports[0].get_value() + let last_port = this.#ports[this.#ports.length - 1].get_value() if (!(busy_port >= first_port && busy_port <= last_port)) { return false } - var ports_count = this.#ports.length - for (var i = 0; i < ports_count; ++i) { + let ports_count = this.#ports.length + for (let i = 0; i < ports_count; ++i) { if (this.#ports[i].get_value() == busy_port) { this.#ports[i].set_busy() return true @@ -69,30 +69,40 @@ class port_alloc { const min_available_port = 10000 const max_available_port = 65535 -// user does not set app port, calculate it from webrtc port and session limit -const get_app_port_begin = (webrtc_port_begin, session_limit) => { - var app_port_begin = webrtc_port_begin + session_limit - if (app_port_begin > max_available_port) { - app_port_begin = webrtc_port_begin - session_limit +const get_begin_ports = (webrtc_port_begin, session_limit) => { + let pivot = max_available_port / 2 + + let coeff = (webrtc_port_begin > pivot) ? -1 : 1 + let port_diff = session_limit * coeff + + let sfu_port_begin = webrtc_port_begin + port_diff + let app_port_begin = sfu_port_begin + port_diff + + return { + sfu_port_begin: sfu_port_begin, + app_port_begin: app_port_begin } - return app_port_begin } -var webrtc_port_alloc = new port_alloc(webrtc_port_begin, session_limit) -var app_port_alloc = new port_alloc(get_app_port_begin(webrtc_port_begin, session_limit), session_limit) +const {sfu_port_begin, app_port_begin} = get_begin_ports(webrtc_port_begin, session_limit) + +let webrtc_port_alloc = new port_alloc(webrtc_port_begin, session_limit) +let sfu_port_alloc = new port_alloc(sfu_port_begin, session_limit) +let app_port_alloc = new port_alloc(app_port_begin, session_limit) // this will restore all allocated ports before session server reboot -var ports_restored = false +let ports_restored = false const restore_ports_from_database = async () => { - var sessions = await database.get_running_sessions() + let sessions = await database.get_running_sessions() if (!sessions) { return } sessions.forEach(session => { - app_port_alloc.set_port_as_busy(session.app_port) webrtc_port_alloc.set_port_as_busy(session.webrtc_port) + sfu_port_alloc.set_port_as_busy(session.sfu_port) + app_port_alloc.set_port_as_busy(session.app_port) }) ports_restored = true } @@ -106,29 +116,41 @@ const check_ports_restored = async () => { await restore_ports_from_database() } -// webrtc ports +// GET const get_webrtc_port = async () => { await check_ports_restored() return webrtc_port_alloc.get() } +const get_app_port = async () => { + await check_ports_restored() + return app_port_alloc.get() +} + +const get_sfu_port = async () => { + await check_ports_restored() + return sfu_port_alloc.get() +} + +// FREE +const free_app_port = (port) => { + app_port_alloc.free(port) +} + const free_webrtc_port = (port) => { webrtc_port_alloc.free(port) } -// app ports -const get_app_port = async () => { - await check_ports_restored() - return app_port_alloc.get() -} - -const free_app_port = (port) => { - app_port_alloc.free(port) +const free_sfu_port = (port) => { + sfu_port_alloc.free(port) } module.exports = { get_webrtc_port, - free_webrtc_port, get_app_port, - free_app_port + get_sfu_port, + + free_webrtc_port, + free_app_port, + free_sfu_port } \ No newline at end of file diff --git a/src/modules/run_process.js b/src/modules/run_process.js index 0f9957a..2a9b7bf 100644 --- a/src/modules/run_process.js +++ b/src/modules/run_process.js @@ -15,8 +15,8 @@ const is_proc_running = (pid) => { } const is_proc_running_async = async (pid, time_ms) => { - var timeout_count = 0 - var check_times = 10 + let timeout_count = 0 + let check_times = 10 while (true) { ++timeout_count @@ -30,14 +30,14 @@ const is_proc_running_async = async (pid, time_ms) => { } } -const run_webrtc = async (webrtc_port, app_port) => { +const run_webrtc = async (webrtc_port, app_port, sfu_port, session_server_url, session_id) => { if (!fs.existsSync(webrtc_server_path)) { return null } - var webrtc_proc + let webrtc_proc try { - var webrtc_proc_arg = JSON.stringify({webrtc_port:webrtc_port, app_port:app_port}) + let webrtc_proc_arg = JSON.stringify({webrtc_port:webrtc_port, app_port:app_port, sfu_port:sfu_port, session_server_url:session_server_url, session_id:session_id}) webrtc_proc = fork(webrtc_server_path, [webrtc_proc_arg], { detached: true }) @@ -47,13 +47,13 @@ const run_webrtc = async (webrtc_port, app_port) => { return null } - //var proc_status = await is_proc_running_async(webrtc_proc.pid, 500) + //let proc_status = await is_proc_running_async(webrtc_proc.pid, 500) return (webrtc_proc) ? webrtc_proc.pid : null } const run_app = async (app_path, app_port) => { - var app_proc + let app_proc try { app_proc = spawn(app_path, [].concat(app_args_static, [app_port_arg + app_port.toString()], { cwd: './', stdio: ['ignore', 'pipe', 'pipe']}), { diff --git a/src/modules/session_observer.js b/src/modules/session_observer.js index 43c5e4d..d5dbfc4 100644 --- a/src/modules/session_observer.js +++ b/src/modules/session_observer.js @@ -3,10 +3,10 @@ const database = require('../database/database') const {is_proc_running, kill_proc} = require('./run_process') const coordinator = require('./coordinator') const wait_list = require('./wait_list') -const {free_app_port, free_webrtc_port} = require('./port_alloc') +const {free_app_port, free_webrtc_port, free_sfu_port} = require('./port_alloc') const observer_timeout = 100 -var pending_list = new wait_list([]) +let pending_list = new wait_list([]) const start_observer = () => { check() @@ -17,8 +17,8 @@ const start_observer = () => { const check_sessions = async (sessions) => { await Promise.all(sessions.map(async (session) => { - var webrtc_running = is_proc_running(session.webrtc_pid) - var app_running = is_proc_running(session.app_pid) + let webrtc_running = is_proc_running(session.webrtc_pid) + let app_running = is_proc_running(session.app_pid) if (webrtc_running && app_running) { return @@ -38,9 +38,11 @@ const check_sessions = async (sessions) => { // free ports allocated for processes free_webrtc_port(session.webrtc_port) free_app_port(session.app_port) + free_sfu_port(session.sfu_port) - var close_session_result = await coordinator.close_session(session.session_id) + let close_session_result = await coordinator.close_session(session) if (close_session_result) { + await database.add_running_session_to_history(session.session_id) await database.remove_running_session(session.session_id) } else { console.error('can not connect to coordinator') @@ -51,7 +53,7 @@ const check_sessions = async (sessions) => { } const check = async () => { - var sessions = await database.get_running_sessions() + let sessions = await database.get_running_sessions() if (!sessions) { return diff --git a/src/modules/titles.js b/src/modules/titles.js index ad6d1c8..f52adea 100644 --- a/src/modules/titles.js +++ b/src/modules/titles.js @@ -2,7 +2,7 @@ const titles = require('../../titles.json') const fs = require('fs') const get_app_path = (title) => { - var path + let path titles.forEach((current_title) => { if (current_title.title == title) { if (fs.existsSync(current_title.path)) { diff --git a/src/routes/cirrus.js b/src/routes/cirrus.js new file mode 100644 index 0000000..92ea39c --- /dev/null +++ b/src/routes/cirrus.js @@ -0,0 +1,13 @@ +const router = require('express').Router() +const { celebrate, Joi, Segments} = require('celebrate') + +const {update_users_count} = require('../controller/cirrus') + +router.post('/update_users_count', celebrate({ + [Segments.BODY]: Joi.object().keys({ + session_id: Joi.string().required(), + users_count: Joi.number().integer().required() + }) +}), update_users_count) + +module.exports = router \ No newline at end of file diff --git a/src/routes/index.js b/src/routes/index.js index d639cfb..e3d6979 100644 --- a/src/routes/index.js +++ b/src/routes/index.js @@ -2,9 +2,11 @@ const router = require('express').Router() const router_session = require('./session') const router_ping = require('./ping') const router_status = require('./status') +const router_cirrus = require('./cirrus') router.use('/session', router_session) router.use('/ping', router_ping) router.use('/status', router_status) +router.use('/cirrus', router_cirrus) module.exports = router \ No newline at end of file