observer added, database added to improve fault tolerance, config fixed, ping added, port allocation improved, session observer added, multiple checks added, run process improved, process running check added, fault tolerance improved

This commit is contained in:
C
2023-01-25 17:57:39 +05:00
parent c2bf9dce67
commit e3e2cc97aa
19 changed files with 2475 additions and 1 deletions
+28
View File
@@ -0,0 +1,28 @@
const {port} = require('./config')
const app = require('express')()
const routes = require('./src/routes')
const { errors } = require('celebrate')
const body_parser = require('body-parser')
const {request_logger, error_logger} = require('./src/middlewares/logger')
const {error_handler} = require('./src/middlewares/error_handler')
const {start_observer} = require('./src/modules/session_observer')
start_observer()
app.use(body_parser.json())
app.use(request_logger)
app.use(routes)
app.use(errors())
app.use(error_logger)
app.use(error_handler)
// start listen server
app.listen(port)
console.log('all config.js tests passed successfully [not developed]')
console.log(`Listening at http://localhost:${port}`)
+25
View File
@@ -0,0 +1,25 @@
module.exports = {
port: parseInt(process.env.PORT),
external_url: process.env.EXTERNAL_URL,
coordinator_url: process.env.COORDINATOR_URL,
database_url: process.env.DATABASE_URL,
database_name: process.env.DATABASE_NAME,
webrtc_port_begin: parseInt(process.env.WEBRTC_PORT_BEGIN),
session_limit: 100,
log_path: "./logs/runtime.log",
webrtc_server_path: "C:/Users/c/Documents/Projects/pixel-streaming-webrtc/WebServers/SignallingWebServer/cirrus.js",
app_args: {
runtime: {
port: "-PixelStreamingPort="
},
static: [
"-PixelStreamingIP=127.0.0.1",
"-RenderOffScreen",
"-PixelStreamingEncoderMaxBitrate=15000000",
"-ResX 1920 -ResY 1080",
"-PixelStreamingEncoderMinQP=25",
"-PixelStreamingEncoderMultipass=QUARTER",
"-PixelStreamingWebRTCMaxFps=60"
]
}
}
+1 -1
Submodule lib updated: 92ebcbb613...b3fa29e4c5
+1941
View File
File diff suppressed because it is too large Load Diff
+25
View File
@@ -0,0 +1,25 @@
{
"name": "pixel-streaming-session-server",
"version": "1.0.0",
"description": "pixel streaming session server",
"main": "index.js",
"directories": {
"lib": "lib"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "http://192.168.1.163:3000/EgorSuv/pixel-streaming-session-server.git"
},
"author": "",
"license": "ISC",
"dependencies": {
"celebrate": "^15.0.1",
"express": "^4.18.2",
"is-running": "^2.1.0",
"portfinder": "^1.0.32",
"request": "^2.88.2"
}
}
+7
View File
@@ -0,0 +1,7 @@
const ping = async (req, res, next) => {
res.json({message:"ping"})
}
module.exports = {
ping
}
+51
View File
@@ -0,0 +1,51 @@
const database = require('../database/database')
const {get_webrtc_port, get_app_port} = require('../modules/port_alloc')
const {get_app_path} = require('../modules/titles')
const not_found_error = require('../../lib/src/http/errors/not_found_error')
const server_error = require('../../lib/src/http/errors/server_error')
const {run_webrtc, run_app} = require('../modules/run_process')
const {external_url} = require('../../config')
const run_session = async (req, res, next) => {
const {title, session_id} = req.body
var app_path = get_app_path(title)
if (!app_path) {
next(new not_found_error('title or path does not exist'))
return
}
var webrtc_port = await get_webrtc_port()
var app_port = await get_app_port()
if (!app_port || !webrtc_port) {
next(new server_error('all ports busy'))
return
}
var webrtc_pid = await run_webrtc(webrtc_port, app_port)
if (!webrtc_pid) {
next(new server_error('can not run webrtc'))
return
}
var app_pid = await run_app(app_path, app_port)
if (!app_pid) {
next(new server_error('can not run app'))
return
}
var add_runnning_session_result = await database.add_running_session(session_id, app_pid, webrtc_pid)
if(!add_runnning_session_result) {
next(new server_error('can not add session to database'))
return
}
res.json({websocket_url:`wss://${external_url}${webrtc_port}/`})
}
module.exports = {
run_session
}
+58
View File
@@ -0,0 +1,58 @@
const { MongoClient } = require("mongodb")
const { database_url, database_name } = require('../../config')
let max_database_connection_timeout = 1000
const get_db = async () => {
const client = new MongoClient(database_url, {
serverSelectionTimeoutMS: max_database_connection_timeout,
useUnifiedTopology: true
})
try {
await client.connect()
} catch (err) {
console.log('can not connect to database')
return null
}
return client.db(database_name)
}
const add_running_session = async (session_id, app_pid, webrtc_pid) => {
var db = await get_db()
if (!db) {
return false
}
var running_session = db.collection('running_session')
await running_session.insertOne({
session_id: session_id,
app_pid: app_pid,
webrtc_pid: webrtc_pid
})
return true
}
const remove_running_session = async (session_id) => {
var db = await get_db()
if (!db) {
return false
}
await db.collection('running_session').deleteOne({session_id:session_id})
return true
}
const get_running_sessions = async () => {
var db = await get_db()
if (!db) {
return null
}
var running_sessions = await db.collection('running_session').find().toArray()
return (!running_sessions.length) ? null : running_sessions
}
module.exports = {
add_running_session,
remove_running_session,
get_running_sessions
}
+13
View File
@@ -0,0 +1,13 @@
const server_error = require('../../lib/src/http/errors/server_error')
const error_handler = (err, req, res, next) => {
if (!err.status_code) {
err = new server_error(err.message)
}
res.status(err.status_code).json({message:err.message})
next(err)
}
module.exports = {
error_handler
}
+20
View File
@@ -0,0 +1,20 @@
const _logger = require('../../lib/src/logger')
const {log_path} = require('../../config')
const logger_runtime = new _logger(log_path)
const request_logger = (req, res, next) => {
var {url} = req
logger_runtime.log(url)
next()
}
const error_logger = (err, req, res, next) => {
logger_runtime.error(err.message)
next(err)
}
module.exports = {
request_logger,
error_logger
}
+33
View File
@@ -0,0 +1,33 @@
const {coordinator_url} = require('../../config')
const request = require('request')
const max_response_timeout = 1000
const close_session = async (session_id) => {
var options = {
url: coordinator_url + '/session_server/session_closed',
method: "POST",
timeout: max_response_timeout,
headers: {
"content-type": "application/json"
},
json: {
session_id: session_id
}
}
return new Promise(function (resolve, reject) {
request(options,
async function(err, answer, data) {
if (err || answer.statusCode != 200) {
resolve(false)
} else {
resolve(true)
}
}
)
})
}
module.exports = {
close_session
}
+88
View File
@@ -0,0 +1,88 @@
const portfinder = require('portfinder')
const {webrtc_port_begin, session_limit} = require('../../config')
const min_available_port = 2000
const max_available_port = 65535
const get_port_in_range = async (first, last) => {
//could be singleton problem
portfinder.setBasePort(first)
portfinder.setHighestPort(last)
try {
var port = await portfinder.getPortPromise()
return port
} catch(err) {
return null
}
}
const get_webrtc_port = async () => {
var port = await get_port_in_range(webrtc_port_begin, webrtc_port_begin + session_limit - 1)
return port
}
const get_app_port = async () => {
const port_range_size_before_webrtc = webrtc_port_begin - min_available_port
const port_range_after_webrtc = max_available_port - (webrtc_port_begin + session_limit)
var port
if (port_range_size_before_webrtc > port_range_after_webrtc) {
port = await get_port_in_range(min_available_port, webrtc_port_begin - 1)
} else {
port = await get_port_in_range(webrtc_port_begin + session_limit, max_available_port)
}
return port
}
module.exports = {
get_webrtc_port,
get_app_port
}
// class port {
// constructor(value) {
// this.#value = value
// this.#free = true
// }
// get_value() {
// return this.#value
// }
// set_free() {
// this.#free = true
// }
// set_busy() {
// this.#free = false
// }
// is_free() {
// return this.#free
// }
// #value
// #free
// }
// class port_alloc {
// constructor(port_begin, count) {
// this.#ports = new Array()
// for (var i = port_begin; i < port_begin + count; ++i) {
// this.#ports.push(new port(i))
// }
// }
// get() {
// for (var i in this.#ports) {
// if (this.#ports[i].is_free()) {
// this.#ports[i].set_busy()
// return this.#ports[i].get_value()
// }
// }
// throw new Error('no free ports')
// }
// free(busy_port) {
// for (var i in this.#ports) {
// if (this.#ports[i].get_value() == busy_port) {
// this.#ports[i].set_free()
// return
// }
// }
// }
// #ports
// }
+78
View File
@@ -0,0 +1,78 @@
const {spawn, fork} = require('node:child_process')
const is_running = require('is-running')
const config = require('../../config')
const webrtc_server_path = config.webrtc_server_path
const app_args_static = config.app_args.static
const app_port_arg = config.app_args.runtime.port
const fs = require('fs')
const is_proc_running = (pid) => {
return is_running(pid)
}
const is_proc_running_async = async (pid, check_time) => {
var timeout_count = 0
var check_times = 10
while (true) {
++timeout_count
if (!is_proc_running(pid)) {
return false
}
if (timeout_count > check_times) {
return true
}
await new Promise(resolve => setTimeout(resolve, check_time / check_times))
}
}
const run_webrtc = async (webrtc_port, app_port) => {
if (!fs.existsSync(webrtc_server_path)) {
return null
}
var webrtc_proc
try {
webrtc_proc = fork(webrtc_server_path, [webrtc_port.toString(), app_port.toString()], {
detached: true
})
} catch (err) {
return null
}
//var proc_status = await is_proc_running_async(webrtc_proc.pid, 500)
return (webrtc_proc) ? webrtc_proc.pid : null
}
const run_app = async (app_path, app_port) => {
var app_proc
try {
app_proc = spawn(app_path, [].concat(app_args_static,
[app_port_arg + app_port.toString()]), {
detached: true
})
} catch (err) {
return null
}
return (app_proc) ? app_proc.pid : null
}
const kill_proc = (pid) => {
if (process.platform === 'win32') {
spawn('taskkill', ['/pid', pid, '/f', '/t'])
}
else {
process.kill(pid)
}
}
module.exports = {
run_webrtc,
run_app,
is_proc_running,
is_proc_running_async,
kill_proc
}
+49
View File
@@ -0,0 +1,49 @@
const run_process = require('../modules/run_process')
const database = require('../database/database')
const {is_proc_running, kill_proc} = require('./run_process')
const coordinator = require('./coordinator')
const observer_timeout = 100
const start_observer = () => {
check()
setTimeout(() => start_observer(), observer_timeout)
}
// check sessions in parallel
const check_sessions = async (sessions) => {
await Promise.all(sessions.map(async (session) => {
var webrtc_running = is_proc_running(session.webrtc_pid)
var app_running = is_proc_running(session.app_pid)
if (webrtc_running && app_running) {
return
}
// if something running kill process and skip remove from database
if (!webrtc_running && !app_running) {
var close_session_result = await coordinator.close_session(session.session_id)
if (close_session_result) {
database.remove_running_session(session.session_id)
}
} else {
kill_proc(session.webrtc_pid)
kill_proc(session.app_pid)
}
}))
}
const check = async () => {
var sessions = await database.get_running_sessions()
if (!sessions) {
return
}
check_sessions(sessions)
}
module.exports = {
start_observer
}
+20
View File
@@ -0,0 +1,20 @@
const titles = require('../../titles.json')
const fs = require('fs')
const get_app_path = (title) => {
var path
titles.forEach((current_title) => {
if (current_title.title == title) {
if (fs.existsSync(current_title.path)) {
path = current_title.path
} else {
console.log(`file not exists ${current_title.path}`)
}
}
})
return (path) ? path : null
}
module.exports = {
get_app_path
}
+8
View File
@@ -0,0 +1,8 @@
const router = require('express').Router()
const router_session = require('./session')
const router_ping = require('./ping')
router.use('/session', router_session)
router.use('/ping', router_ping)
module.exports = router
+7
View File
@@ -0,0 +1,7 @@
const router = require('express').Router()
const {ping} = require('../controller/ping')
router.get('/', ping)
module.exports = router
+13
View File
@@ -0,0 +1,13 @@
const router = require('express').Router()
const { celebrate, Joi, Segments} = require('celebrate')
const {run_session} = require('../controller/session')
router.post('/run', celebrate({
[Segments.BODY]: Joi.object().keys({
title: Joi.string().required(),
session_id: Joi.string().required()
})
}), run_session)
module.exports = router
+10
View File
@@ -0,0 +1,10 @@
[
{
"title":"test-fortis",
"path":"D:/shared/Builds/Fortis_UnStable_64/WindowsNoEditor/FORTIS_Taktika.exe"
},
{
"title":"test-ivazowsky",
"path":"D:/shared/Builds/ivazowsky/Ivaz_Optimized_2/Ivazowsky.exe"
}
]