From 67ee5974bed566238773220fe5616af71f3e00cf Mon Sep 17 00:00:00 2001 From: C Date: Thu, 24 Nov 2022 19:11:53 +0500 Subject: [PATCH] session creation added --- prototype/coordinator/coordinator.js | 32 ++++++----- prototype/session_server/config.json | 22 +++++++- prototype/session_server/port_alloc.js | 48 +++++++++++++++++ prototype/session_server/session_server.js | 63 ++++++++++++++++++---- 4 files changed, 143 insertions(+), 22 deletions(-) create mode 100644 prototype/session_server/port_alloc.js diff --git a/prototype/coordinator/coordinator.js b/prototype/coordinator/coordinator.js index 10fc94a..c5d5a5f 100644 --- a/prototype/coordinator/coordinator.js +++ b/prototype/coordinator/coordinator.js @@ -21,7 +21,6 @@ if (!fs.existsSync(logs_dir)) fs.appendFileSync(`./${logs_dir}/launch_history.log`, (node_time.create().format('Y-m-d H:M:S')).toString() + '\n') - async function PostSessionServer(data, ip, port, callback) { // Build the post string from an object // An object of options to indicate where to post to @@ -34,18 +33,20 @@ async function PostSessionServer(data, ip, port, callback) { 'Content-Length': Buffer.byteLength(data) } } - // Set up the request - var post_req = http.request(post_options, function(res) { + var request = http.request(post_options, function(res) { res.setEncoding('utf8') res.on('data', async function (answer) { callback(answer) }) }) - + request.on('error', function(e) { + console.error(e) + callback('{"msg":"SESSION_SERVER_DISABLED"}') + }) // post the data - post_req.write(data) - post_req.end() + request.write(data) + request.end() } // response function @@ -64,7 +65,7 @@ async function create_response(request, callback) { } else if (request.verb == 'PLAN_SESSION') { var session_sheduled = database.collection('session_sheduled') var sessions = await session_sheduled.find().toArray() - + callback(response) } else if (request.verb == 'CREATE_SESSION') { var session_server = database.collection('session_server') @@ -73,6 +74,7 @@ async function create_response(request, callback) { if (!servers.length) { response = '{"msg":"SERVERS_NOT_FOUND"}' callback(response) + return } // find free server and create session var session_active = database.collection('session_active') @@ -90,10 +92,16 @@ async function create_response(request, callback) { if (free_server == null) { response = '{"msg":"SESSION_LIMIT_REACHED"}' callback(response) + return } - PostSessionServer(`{"verb":"CREATE_SESSION", "title":"${request.title}"}`, free_server.ip, free_server.port, async function(answer) { - var json_answer = JSON.parse(answer) + var json_answer = await JSON.parse(answer) + + if (json_answer.link == null) { + response = '{"msg":"SESSION_SERVER_NOT_WORKING"}' + callback(response) + return + } // add session to database await session_active.insertOne({ server_id:free_server.server_id, @@ -109,8 +117,7 @@ async function create_response(request, callback) { response = '{"msg":"UNKNOWN_VERB"}' callback(response) } - } - catch(e) { + } catch(e) { console.error(e) } } @@ -126,16 +133,17 @@ const server = http.createServer(function(request, response) { }) request.on('end', function() { + console.log('received: "', body, '"') response.writeHead(200, {'Content-Type': 'message'}) try { create_response(JSON.parse(body), function(message) { response.end(message) + console.log('response:', message) }) } catch(e) { console.error(e) } - console.log('received: "', body, '"') }) } else { diff --git a/prototype/session_server/config.json b/prototype/session_server/config.json index 5a292a1..9acfcd8 100644 --- a/prototype/session_server/config.json +++ b/prototype/session_server/config.json @@ -1,4 +1,24 @@ { "ip" : "127.0.0.1", - "port" : "3002" + "port" : 3002, + "session_ports":{ + "count": 50, + "app_begin": 47000, + "http_begin": 47500 + }, + "webrtc_args_static":{ + "cirrus_path":"D:/shared/Builds/Ivaz_Optimized_2/Samples/PixelStreaming/WebServers/SignallingWebServer/cirrus.js" + }, + "app_args_runtime":{ + "ip":"-PixelStreamingIP=", + "port":"-PixelStreamingPort=" + }, + "app_args_static":[ + "-RenderOffScreen", + "-PixelStreamingEncoderMaxBitrate=15000000", + "-ResX 1280 -ResY 720", + "-PixelStreamingEncoderMinQP=25", + "-PixelStreamingEncoderMultipass=QUARTER", + "-PixelStreamingWebRTCMaxFps=60" + ] } \ No newline at end of file diff --git a/prototype/session_server/port_alloc.js b/prototype/session_server/port_alloc.js new file mode 100644 index 0000000..d3eaa79 --- /dev/null +++ b/prototype/session_server/port_alloc.js @@ -0,0 +1,48 @@ + +class port { + constructor(value) { + this.#value = value + this.#free = true + } + get_value() { + return this.#value + } + set_free() { + this.#free = true + } + set_busy() { + this.#free = false + } + is_free() { + return this.#free + } + #value + #free +} + +module.exports = class port_alloc { + constructor(port_begin, count) { + this.#ports = new Array() + for (var i = port_begin; i < port_begin + count; ++i) { + this.#ports.push(new port(i)) + } + } + get() { + for (var i in this.#ports) { + if (this.#ports[i].is_free()) { + this.#ports[i].set_busy() + return this.#ports[i].get_value() + } + } + throw new Error('no free ports') + } + free(busy_port) { + for (var i in this.#ports) { + if (this.#ports[i].get_value() == busy_port) { + this.#ports[i].set_free() + return + } + } + } + #ports +} \ No newline at end of file diff --git a/prototype/session_server/session_server.js b/prototype/session_server/session_server.js index 07663c9..8a263e6 100644 --- a/prototype/session_server/session_server.js +++ b/prototype/session_server/session_server.js @@ -12,6 +12,10 @@ const titles = require('./titles.json') const { spawn } = require('node:child_process') +const port_alloc = require('./port_alloc.js') + +const child_process = require('child_process') + // write startup server history var logs_dir = './logs' if (!fs.existsSync(logs_dir)) @@ -19,30 +23,70 @@ if (!fs.existsSync(logs_dir)) fs.appendFileSync(`./${logs_dir}/launch_history.log`, (node_time.create().format('Y-m-d H:M:S')).toString() + '\n') +var app_port_alloc = new port_alloc(config.session_ports.app_begin, config.session_ports.count) +var http_port_alloc = new port_alloc(config.session_ports.http_begin, config.session_ports.count) + // response function async function create_response(request, callback) { var response = '' try { switch(request.verb) { case 'CREATE_SESSION': - var title_info + var app_info titles.forEach((title) => { if (title.title == request.title) { - title_info = title + app_info = title } }) - if (title_info == null) { + if (app_info == null) { response = '{"msg":"TITLE_NOT_EXISTS"}' break } - const process = spawn(title_info.path, ['', '']) - - process.on('close', (code) => { - console.log(`child process exited with code ${code}`); + var app_port + var http_port + try { + app_port = app_port_alloc.get() + http_port = http_port_alloc.get() + } catch(e) { + console.error(e) + response = '{"msg":"PORT_BUSY"}' + break + } + + // start app_proc + const app_proc = spawn(app_info.path, [].concat(config.app_args_static, + [config.app_args_runtime.ip + config.ip, config.app_args_runtime.port + app_port.toString()]), { + detached: true + }) + app_proc.on('error', function(err) { + console.error(err) + response = '{"msg":"APP_PROC_ERROR"}' + callback(response) + return + }) + app_proc.on('close', (code) => { + app_port_alloc.free(app_port) }) - response = `{"msg":"SESSION_CREATED", "link":"http://${title_info.title}"}` + // start webrtc server + //console.log('http_port: ', http_port, ' app_port: ', app_port) + + const webrtc_proc = child_process.fork(config.webrtc_args_static.cirrus_path, [http_port.toString(), app_port.toString()], { + detached: true + }) + webrtc_proc.on('error', function(err) { + console.error(err) + response = '{"msg":"WEBRTC_PROC_ERROR"}' + callback(response) + return + }) + webrtc_proc.on('close', (code) => { + http_port_alloc.free(http_port) + }) + + + response = `{"msg":"SESSION_CREATED", "link":"http://${config.ip+':'+http_port}"}` break case 'CONNECT_SESSION': break @@ -67,16 +111,17 @@ const server = http.createServer(function(request, response) { }) request.on('end', function() { + console.log('received: "', body, '"') response.writeHead(200, {'Content-Type': 'message'}) try { create_response(JSON.parse(body), function(message) { response.end(message) + console.log('response:', message) }) } catch(e) { console.error(e) } - console.log('received: "', body, '"') }) } else {