sfu support added, IPC cirrus support added, session history added, advanced session info added

This commit is contained in:
2023-04-19 21:52:30 +03:00
parent b78bdd81fa
commit 9c2c413363
16 changed files with 285 additions and 74 deletions
+7 -5
View File
@@ -18,14 +18,16 @@ module.exports = {
"-PixelStreamingIP=127.0.0.1",
"-RenderOffScreen",
"-ForceRes",
"-ResX=1600",
"-ResY=900",
"-ResX=1920",
"-ResY=1080",
"-Unattended",
"-MaxFPS=30",
"-PixelStreamingWebRTCMaxFps=30",
"-PixelStreamingEncoderMinQP=25",
"-PixelStreamingEncoderMultipass=DISABLED",
"-PixelStreamingWebRTCDisableAudioSync"
"-PixelStreamingWebRTCDisableAudioSync",
"-PixelStreamingHideCursor=true",
"-PixelStreamingKeyFilter=ESC, CTRL",
"-SimulcastParameters=1.0,5000000,100000000,2.0,1000000,5000000,3.0,500000,2500000",
//"-PixelStreamingHudStats=true" // usefull for debug
]
}
}
+1 -1
Submodule lib updated: b3fa29e4c5...2daac94e92
+24
View File
@@ -0,0 +1,24 @@
const database = require('../database/database')
const update_users_count = async (req, res, next) => {
const {session_id, users_count} = req.body
database.set_session_current_uniq_users(session_id, users_count)
let session = await database.get_running_session_by_id(session_id)
if (!session) return
if (users_count > session.max_uniq_users) {
database.set_session_max_uniq_users(session_id, users_count)
}
res.end()
}
module.exports = {
update_users_count
}
+20 -9
View File
@@ -1,43 +1,54 @@
const database = require('../database/database')
const {get_webrtc_port, get_app_port} = require('../modules/port_alloc')
const {get_webrtc_port, get_app_port, get_sfu_port} = require('../modules/port_alloc')
const {get_app_path} = require('../modules/titles')
const not_found_error = require('../../lib/src/http/errors/not_found_error')
const server_error = require('../../lib/src/http/errors/server_error')
const {run_webrtc, run_app, kill_proc} = require('../modules/run_process')
const {create_websocket_url} = require('../modules/links')
const {port} = require('../../config')
const run_session = async (req, res, next) => {
const {title, session_id} = req.body
var app_path = get_app_path(title)
let app_path = get_app_path(title)
if (!app_path) {
next(new not_found_error('title or path does not exist'))
return
}
var webrtc_port = await get_webrtc_port()
var app_port = await get_app_port()
let webrtc_port = await get_webrtc_port()
let app_port = await get_app_port()
let sfu_port = await get_sfu_port()
if (!app_port || !webrtc_port) {
next(new server_error('all ports busy'))
if (!app_port) {
next(new server_error('all app ports busy'))
return
} else if (!webrtc_port) {
next(new server_error('all webrtc ports busy'))
return
} else if (!sfu_port) {
next(new server_error('all sfu ports busy'))
return
}
var app_pid = await run_app(app_path, app_port)
let app_pid = await run_app(app_path, app_port)
if (!app_pid) {
next(new server_error('can not run app'))
return
}
var webrtc_pid = await run_webrtc(webrtc_port, app_port)
const this_server_url = `http://127.0.0.1:${port}`
let webrtc_pid = await run_webrtc(webrtc_port, app_port, sfu_port, this_server_url, session_id)
if (!webrtc_pid) {
kill_proc(app_pid)
next(new server_error('can not run webrtc'))
return
}
var add_runnning_session_result = await database.add_running_session(session_id, app_pid, webrtc_pid, app_port, webrtc_port)
let add_runnning_session_result = await database.add_running_session(session_id, title, app_pid, webrtc_pid, app_port, webrtc_port, sfu_port)
if(!add_runnning_session_result) {
next(new server_error('can not add session to database'))
+15 -3
View File
@@ -1,9 +1,21 @@
const database = require('../database/database')
const {session_limit} = require('../../config')
const get_overal_status = async (req, res, next) => {
//let sessions_count = database.get_running_sessions()
//console.log(sessions_count)
res.json({message:"overal_status"})
let sessions = await database.get_running_sessions()
let sessions_count = sessions ? sessions.length : 0
let error_history = await database.get_error_history()
let session_history = await database.get_session_history()
res.json({
server_load: sessions_count / session_limit,
session_limit: session_limit,
running_sessions_count: sessions_count,
running_sessions: sessions,
error_history: error_history,
session_history: session_history
})
}
module.exports = {
+120 -8
View File
@@ -1,5 +1,6 @@
const { MongoClient } = require("mongodb")
const { database_url, database_name } = require('../../config')
const time = require('../../lib/src/time')
let max_database_connection_timeout = 1000
const client = new MongoClient(database_url, {
@@ -19,26 +20,32 @@ const get_db = async () => {
}
}
const add_running_session = async (session_id, app_pid, webrtc_pid, app_port, webrtc_port) => {
var db = await get_db()
const add_running_session = async (session_id, title, app_pid, webrtc_pid, app_port, webrtc_port, sfu_port) => {
let db = await get_db()
if (!db) {
return false
}
var running_session = db.collection('running_session')
let running_session = db.collection('running_session')
await running_session.insertOne({
session_id: session_id,
title: title,
app_pid: app_pid,
webrtc_pid: webrtc_pid,
app_port: app_port,
webrtc_port: webrtc_port
webrtc_port: webrtc_port,
sfu_port: sfu_port,
current_uniq_users: 0,
max_uniq_users: 0,
time: time.get_date_time(),
unix_timestamp: time.get_unix_timestamp()
})
return true
}
const remove_running_session = async (session_id) => {
var db = await get_db()
let db = await get_db()
if (!db) {
return false
}
@@ -47,16 +54,121 @@ const remove_running_session = async (session_id) => {
}
const get_running_sessions = async () => {
let db = await get_db()
if (!db) {
return null
}
let running_sessions = await db.collection('running_session').find().toArray()
return (!running_sessions.length) ? null : running_sessions
}
const get_running_session_by_id = async (session_id) => {
let db = await get_db()
if (!db) {
return null
}
let running_session = await db.collection('running_session').findOne({session_id:session_id})
return running_session
}
const add_error_history = async (error_message) => {
var db = await get_db()
if (!db) {
return false
}
var error_collection = db.collection('error_history')
await error_collection.insertOne({
error_message:error_message,
time: time.get_date_time(),
unix_timestamp: time.get_unix_timestamp()
})
return true
}
const get_error_history = async () => {
var db = await get_db()
if (!db) {
return null
}
var running_sessions = await db.collection('running_session').find().toArray()
return (!running_sessions.length) ? null : running_sessions
var errors = await db.collection('error_history').find().toArray()
return (!errors.length) ? null : errors
}
const add_running_session_to_history = async (session_id) => {
let db = await get_db()
if (!db) {
return false
}
let session_history = db.collection('session_history')
let running_session = await get_running_session_by_id(session_id)
if (!session_id) {
return false
}
await session_history.insertOne({
session_id: session_id,
title: running_session.title,
app_pid: running_session.app_pid,
webrtc_pid: running_session.webrtc_pid,
app_port: running_session.app_port,
webrtc_port: running_session.webrtc_port,
sfu_port: running_session.sfu_port,
current_uniq_users: running_session.current_uniq_users,
max_uniq_users: running_session.max_uniq_users,
time: running_session.time,
unix_timestamp: running_session.unix_timestamp
})
return true
}
const get_session_history = async () => {
let db = await get_db()
if (!db) {
return null
}
let session_history = await db.collection('session_history').find().toArray()
if (!session_history.length) {
return null
}
return session_history
}
const set_session_current_uniq_users = async (session_id, current_uniq_users) => {
let db = await get_db()
if (!db) {
return null
}
await db.collection('running_session').updateOne({session_id:session_id}, {$set:{current_uniq_users:current_uniq_users}})
return true
}
const set_session_max_uniq_users = async (session_id, max_uniq_users) => {
let db = await get_db()
if (!db) {
return null
}
await db.collection('running_session').updateOne({session_id:session_id}, {$set:{max_uniq_users:max_uniq_users}})
return true
}
module.exports = {
add_running_session,
remove_running_session,
get_running_sessions
get_running_sessions,
add_error_history,
get_error_history,
add_running_session_to_history,
get_session_history,
get_running_session_by_id,
set_session_current_uniq_users,
set_session_max_uniq_users
}
+3
View File
@@ -1,10 +1,13 @@
const server_error = require('../../lib/src/http/errors/server_error')
const database = require('../database/database')
const error_handler = (err, req, res, next) => {
if (!err.status_code) {
err = new server_error(err.message)
}
res.status(err.status_code).json({message:err.message})
database.add_error_history(err.message)
next(err)
}
+1 -1
View File
@@ -4,7 +4,7 @@ const {log_path} = require('../../config')
const logger_runtime = new _logger(log_path)
const request_logger = (req, res, next) => {
var {url} = req
let {url} = req
logger_runtime.log(url)
next()
}
+11 -3
View File
@@ -1,9 +1,10 @@
const {coordinator_url} = require('../../config')
const request = require('request')
const time = require('../../lib/src/time')
const max_response_timeout = 1000
const close_session = async (session_id) => {
var options = {
const close_session = async (session) => {
let options = {
url: coordinator_url + '/session_server/session_closed',
method: "POST",
timeout: max_response_timeout,
@@ -11,7 +12,14 @@ const close_session = async (session_id) => {
"content-type": "application/json"
},
json: {
session_id: session_id
session_id: session.session_id,
title: session.title,
max_uniq_users: session.max_uniq_users,
users_at_close: session.current_uniq_users,
time: session.time,
unix_timestamp: session.unix_timestamp,
time_end: time.get_date_time(),
unix_timestamp_end: time.get_unix_timestamp()
}
}
+1 -1
View File
@@ -1,7 +1,7 @@
const {external_domain} = require('../../config')
const create_websocket_url = (webrtc_port) => {
var valid_external_domain = external_domain
let valid_external_domain = external_domain
// if external domain contains http OR https part remove it
valid_external_domain = valid_external_domain.replace('http://', '').replace('https://', '')
+51 -29
View File
@@ -25,12 +25,12 @@ class port {
class port_alloc {
constructor(port_begin, count) {
this.#ports = new Array()
for (var i = port_begin; i < port_begin + count; ++i) {
for (let i = port_begin; i < port_begin + count; ++i) {
this.#ports.push(new port(i))
}
}
get() {
for (var i in this.#ports) {
for (let i in this.#ports) {
if (this.#ports[i].is_free()) {
this.#ports[i].set_busy()
return this.#ports[i].get_value()
@@ -39,7 +39,7 @@ class port_alloc {
return null
}
free(busy_port) {
for (var i in this.#ports) {
for (let i in this.#ports) {
if (this.#ports[i].get_value() == busy_port) {
this.#ports[i].set_free()
return
@@ -47,14 +47,14 @@ class port_alloc {
}
}
set_port_as_busy(busy_port) {
var first_port = this.#ports[0].get_value()
var last_port = this.#ports[this.#ports.length - 1].get_value()
let first_port = this.#ports[0].get_value()
let last_port = this.#ports[this.#ports.length - 1].get_value()
if (!(busy_port >= first_port && busy_port <= last_port)) {
return false
}
var ports_count = this.#ports.length
for (var i = 0; i < ports_count; ++i) {
let ports_count = this.#ports.length
for (let i = 0; i < ports_count; ++i) {
if (this.#ports[i].get_value() == busy_port) {
this.#ports[i].set_busy()
return true
@@ -69,30 +69,40 @@ class port_alloc {
const min_available_port = 10000
const max_available_port = 65535
// user does not set app port, calculate it from webrtc port and session limit
const get_app_port_begin = (webrtc_port_begin, session_limit) => {
var app_port_begin = webrtc_port_begin + session_limit
if (app_port_begin > max_available_port) {
app_port_begin = webrtc_port_begin - session_limit
const get_begin_ports = (webrtc_port_begin, session_limit) => {
let pivot = max_available_port / 2
let coeff = (webrtc_port_begin > pivot) ? -1 : 1
let port_diff = session_limit * coeff
let sfu_port_begin = webrtc_port_begin + port_diff
let app_port_begin = sfu_port_begin + port_diff
return {
sfu_port_begin: sfu_port_begin,
app_port_begin: app_port_begin
}
return app_port_begin
}
var webrtc_port_alloc = new port_alloc(webrtc_port_begin, session_limit)
var app_port_alloc = new port_alloc(get_app_port_begin(webrtc_port_begin, session_limit), session_limit)
const {sfu_port_begin, app_port_begin} = get_begin_ports(webrtc_port_begin, session_limit)
let webrtc_port_alloc = new port_alloc(webrtc_port_begin, session_limit)
let sfu_port_alloc = new port_alloc(sfu_port_begin, session_limit)
let app_port_alloc = new port_alloc(app_port_begin, session_limit)
// this will restore all allocated ports before session server reboot
var ports_restored = false
let ports_restored = false
const restore_ports_from_database = async () => {
var sessions = await database.get_running_sessions()
let sessions = await database.get_running_sessions()
if (!sessions) {
return
}
sessions.forEach(session => {
app_port_alloc.set_port_as_busy(session.app_port)
webrtc_port_alloc.set_port_as_busy(session.webrtc_port)
sfu_port_alloc.set_port_as_busy(session.sfu_port)
app_port_alloc.set_port_as_busy(session.app_port)
})
ports_restored = true
}
@@ -106,29 +116,41 @@ const check_ports_restored = async () => {
await restore_ports_from_database()
}
// webrtc ports
// GET
const get_webrtc_port = async () => {
await check_ports_restored()
return webrtc_port_alloc.get()
}
const get_app_port = async () => {
await check_ports_restored()
return app_port_alloc.get()
}
const get_sfu_port = async () => {
await check_ports_restored()
return sfu_port_alloc.get()
}
// FREE
const free_app_port = (port) => {
app_port_alloc.free(port)
}
const free_webrtc_port = (port) => {
webrtc_port_alloc.free(port)
}
// app ports
const get_app_port = async () => {
await check_ports_restored()
return app_port_alloc.get()
}
const free_app_port = (port) => {
app_port_alloc.free(port)
const free_sfu_port = (port) => {
sfu_port_alloc.free(port)
}
module.exports = {
get_webrtc_port,
free_webrtc_port,
get_app_port,
free_app_port
get_sfu_port,
free_webrtc_port,
free_app_port,
free_sfu_port
}
+7 -7
View File
@@ -15,8 +15,8 @@ const is_proc_running = (pid) => {
}
const is_proc_running_async = async (pid, time_ms) => {
var timeout_count = 0
var check_times = 10
let timeout_count = 0
let check_times = 10
while (true) {
++timeout_count
@@ -30,14 +30,14 @@ const is_proc_running_async = async (pid, time_ms) => {
}
}
const run_webrtc = async (webrtc_port, app_port) => {
const run_webrtc = async (webrtc_port, app_port, sfu_port, session_server_url, session_id) => {
if (!fs.existsSync(webrtc_server_path)) {
return null
}
var webrtc_proc
let webrtc_proc
try {
var webrtc_proc_arg = JSON.stringify({webrtc_port:webrtc_port, app_port:app_port})
let webrtc_proc_arg = JSON.stringify({webrtc_port:webrtc_port, app_port:app_port, sfu_port:sfu_port, session_server_url:session_server_url, session_id:session_id})
webrtc_proc = fork(webrtc_server_path, [webrtc_proc_arg], {
detached: true
})
@@ -47,13 +47,13 @@ const run_webrtc = async (webrtc_port, app_port) => {
return null
}
//var proc_status = await is_proc_running_async(webrtc_proc.pid, 500)
//let proc_status = await is_proc_running_async(webrtc_proc.pid, 500)
return (webrtc_proc) ? webrtc_proc.pid : null
}
const run_app = async (app_path, app_port) => {
var app_proc
let app_proc
try {
app_proc = spawn(app_path, [].concat(app_args_static,
[app_port_arg + app_port.toString()], { cwd: './', stdio: ['ignore', 'pipe', 'pipe']}), {
+8 -6
View File
@@ -3,10 +3,10 @@ const database = require('../database/database')
const {is_proc_running, kill_proc} = require('./run_process')
const coordinator = require('./coordinator')
const wait_list = require('./wait_list')
const {free_app_port, free_webrtc_port} = require('./port_alloc')
const {free_app_port, free_webrtc_port, free_sfu_port} = require('./port_alloc')
const observer_timeout = 100
var pending_list = new wait_list([])
let pending_list = new wait_list([])
const start_observer = () => {
check()
@@ -17,8 +17,8 @@ const start_observer = () => {
const check_sessions = async (sessions) => {
await Promise.all(sessions.map(async (session) => {
var webrtc_running = is_proc_running(session.webrtc_pid)
var app_running = is_proc_running(session.app_pid)
let webrtc_running = is_proc_running(session.webrtc_pid)
let app_running = is_proc_running(session.app_pid)
if (webrtc_running && app_running) {
return
@@ -38,9 +38,11 @@ const check_sessions = async (sessions) => {
// free ports allocated for processes
free_webrtc_port(session.webrtc_port)
free_app_port(session.app_port)
free_sfu_port(session.sfu_port)
var close_session_result = await coordinator.close_session(session.session_id)
let close_session_result = await coordinator.close_session(session)
if (close_session_result) {
await database.add_running_session_to_history(session.session_id)
await database.remove_running_session(session.session_id)
} else {
console.error('can not connect to coordinator')
@@ -51,7 +53,7 @@ const check_sessions = async (sessions) => {
}
const check = async () => {
var sessions = await database.get_running_sessions()
let sessions = await database.get_running_sessions()
if (!sessions) {
return
+1 -1
View File
@@ -2,7 +2,7 @@ const titles = require('../../titles.json')
const fs = require('fs')
const get_app_path = (title) => {
var path
let path
titles.forEach((current_title) => {
if (current_title.title == title) {
if (fs.existsSync(current_title.path)) {
+13
View File
@@ -0,0 +1,13 @@
const router = require('express').Router()
const { celebrate, Joi, Segments} = require('celebrate')
const {update_users_count} = require('../controller/cirrus')
router.post('/update_users_count', celebrate({
[Segments.BODY]: Joi.object().keys({
session_id: Joi.string().required(),
users_count: Joi.number().integer().required()
})
}), update_users_count)
module.exports = router
+2
View File
@@ -2,9 +2,11 @@ const router = require('express').Router()
const router_session = require('./session')
const router_ping = require('./ping')
const router_status = require('./status')
const router_cirrus = require('./cirrus')
router.use('/session', router_session)
router.use('/ping', router_ping)
router.use('/status', router_status)
router.use('/cirrus', router_cirrus)
module.exports = router