session close added

This commit is contained in:
C
2022-11-30 16:07:56 +05:00
parent 12080c106a
commit 288718779d
14 changed files with 29 additions and 4 deletions
+6
View File
@@ -0,0 +1,6 @@
{
"ip" : "192.168.1.115",
"port" : "3001",
"mongodb_url" : "mongodb://127.0.0.1:27017/",
"database_name" : "pixel_streaming"
}
+157
View File
@@ -0,0 +1,157 @@
// coordinator
// http
const http = require('http')
// filesystem
const fs = require('fs')
// server config
const config = require('./config.json')
// node date time
const node_time = require('node-datetime')
const mongodb_client = require('./mongodb_client.js')
const crypto = require('crypto')
var querystring = require('querystring')
const http_client = require('../lib/http_client.js')
const logger = require('../lib/logger')
// logger
const logger_init = new logger('./logs/init.log')
const logger_runtime = new logger('./logs/runtime.log')
// response function
async function create_response(request, callback) {
var mongo_client = new mongodb_client(config.mongodb_url)
var database = await mongo_client.connect_database(config.database_name)
var response = ''
try {
if (request.verb == 'GET_TITLES') {
var title = database.collection('title')
response = await title.find().skip(request.start).limit(request.count).toArray()
callback(response)
} else if (request.verb == 'PLAN_SESSION') {
var session_sheduled = database.collection('session_sheduled')
var sessions = await session_sheduled.find().toArray()
callback(response)
} else if (request.verb == 'CREATE_SESSION') {
var session_server = database.collection('session_server')
var servers = await session_server.find({'title':request.title}).toArray()
if (!servers.length) {
response = {msg:'SERVERS_NOT_FOUND'}
callback(response)
return
}
// find free server and create session
var session_active = database.collection('session_active')
// find free server
var free_server
var servers_length = servers.length
for (var i = 0; i < servers_length; ++i) {
var sessions = await session_active.find({'server_id':servers[i].server_id}).toArray()
if (servers[i].limit > sessions.length) {
free_server = servers[i]
break
}
}
if (free_server == null) {
response = {msg:'SESSION_LIMIT_REACHED'}
callback(response)
return
}
var session_id = crypto.randomBytes(16).toString('base64')
// create session on session server
new http_client(free_server.ip, free_server.port).post(
JSON.stringify({verb:'CREATE_SESSION', title:request.title, session_id:session_id}), async function(answer) {
var json_answer = await JSON.parse(answer)
if (json_answer.link == null) {
response = {msg:'SESSION_SERVER_NOT_WORKING'}
callback(response)
return
}
// generate random code for session access
var code = Math.floor(1000 + Math.random() * 9000)
while ((await session_active.find({'code':code}).toArray()).length) {
code = Math.floor(1000 + Math.random() * 9000)
}
// add session to database
await session_active.insertOne({
server_id:free_server.server_id,
session_id:session_id,
connection_link:json_answer.link,
connection_code:code
})
response = {msg:'SESSION_CREATED', link:json_answer.link}
callback(response)
return
},
function(error) {
logger_runtime.error(error)
callback({msg:'SESSION_SERVER_DISABLED'})
return
})
} else if (request.verb == 'CONNECT_SESSION') {
callback(response)
return
} else if (request.verb == 'CLOSE_SESSION') {
await database.collection('session_active').deleteOne({session_id:request.session_id})
response = {msg:"SESSION_CLOSED", session_id:request.session_id}
callback(response)
return
} else {
response = {msg:'UNKNOWN_VERB'}
callback(response)
return
}
} catch(e) {
logger_runtime.error(e)
}
}
// start http server
const server = http.createServer(function(request, response) {
if (request.method == 'POST') {
var body = ''
request.on('data', function(data) {
body += data
})
request.on('end', function() {
logger_runtime.log('received: ', body)
response.writeHead(200, {'Content-Type': 'application/json'})
try {
create_response(JSON.parse(body), function(message) {
response.end(JSON.stringify(message))
logger_runtime.log('response:', message)
})
}
catch(e) {
logger_runtime.error(e)
}
})
}
else {
response.writeHead(501, {'Content-Type': 'application/json'})
response.end(JSON.stringify({msg:'UNKNOWN_METHOD'}))
}
})
try {
server.listen(config.port, config.ip)
logger_init.log(`Listening at http://${config.ip}:${config.port}`)
} catch (e) {
logger_init.error(e)
}
+26
View File
@@ -0,0 +1,26 @@
module.exports = class mongodb_client {
mongo_client
constructor(url) {
const {MongoClient} = require('mongodb')
this.mongo_client = new MongoClient(url)
}
// database connect function
async connect_database(database_name) {
try {
await this.mongo_client.connect()
const database = this.mongo_client.db(database_name)
console.log('Database connection status:', await database.command({ping: 1}))
return database
}
catch(e) {
console.error(e)
}
// finally
// {
// await mongodb_client.close()
// console.log("Database connection closed")
// }
}
}
File diff suppressed because it is too large Load Diff
+18
View File
@@ -0,0 +1,18 @@
{
"name": "coordinator",
"version": "1.0.0",
"description": "",
"main": "coordinator.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"type": "commonjs",
"dependencies": {
"express": "^4.18.2",
"mongodb": "^4.12.0",
"node-datetime": "^2.1.2"
}
}
+37
View File
@@ -0,0 +1,37 @@
const http = require('http')
module.exports = class http_client {
constructor(ip, port) {
this.#ip = ip
this.#port = port
}
async post(data, callback_answer, callback_error) {
// Build the post string from an object
// An object of options to indicate where to post to
var post_options = {
host: this.#ip,
port: this.#port,
method: 'POST',
headers: {
'Content-Type': 'applicaiton/json',
'Content-Length': Buffer.byteLength(data)
}
}
// Set up the request
var request = http.request(post_options, function(res) {
res.setEncoding('utf8')
res.on('data', async function (answer) {
callback_answer(answer)
})
})
request.on('error', function(e) {
callback_error(e)
})
// post the data
request.write(data)
request.end()
}
#ip
#port
}
+34
View File
@@ -0,0 +1,34 @@
const fs = require('fs')
const path = require('path')
const node_time = require('node-datetime')
const util = require('util')
module.exports = class logger {
constructor(filepath) {
this.#filepath = filepath
this.#filename = path.basename(filepath)
var dirname = path.dirname(filepath)
if (!fs.existsSync(dirname))
fs.mkdirSync(dirname, {recursive:true})
}
error(...arg) {
this.#log('error', arg)
}
log(...arg) {
this.#log('', arg)
}
#log(message = '', ...arg) {
var args = ''
arg.forEach(value => {
args += util.format(value) + ' '
})
args = (node_time.create().format('Y-m-d H:M:S')).toString() + ((message != '') ? ': ' + message : '') + ': ' + args
console.log(this.#filename + ': ' + args)
fs.appendFileSync(this.#filepath, args + '\n')
}
#filepath
#filename
}
+47
View File
@@ -0,0 +1,47 @@
class port {
constructor(value) {
this.#value = value
this.#free = true
}
get_value() {
return this.#value
}
set_free() {
this.#free = true
}
set_busy() {
this.#free = false
}
is_free() {
return this.#free
}
#value
#free
}
module.exports = class port_alloc {
constructor(port_begin, count) {
this.#ports = new Array()
for (var i = port_begin; i < port_begin + count; ++i) {
this.#ports.push(new port(i))
}
}
get() {
for (var i in this.#ports) {
if (this.#ports[i].is_free()) {
this.#ports[i].set_busy()
return this.#ports[i].get_value()
}
}
throw new Error('no free ports')
}
free(busy_port) {
for (var i in this.#ports) {
if (this.#ports[i].get_value() == busy_port) {
this.#ports[i].set_free()
return
}
}
}
#ports
}
+28
View File
@@ -0,0 +1,28 @@
{
"ip" : "127.0.0.1",
"port" : 3002,
"session_ports":{
"count": 50,
"app_begin": 47000,
"http_begin": 47500
},
"coordinator":{
"ip":"192.168.1.115",
"port":3001
},
"webrtc_args_static":{
"cirrus_path":"D:/shared/Builds/Ivaz_Optimized_2/Samples/PixelStreaming/WebServers/SignallingWebServer/cirrus.js"
},
"app_args_runtime":{
"ip":"-PixelStreamingIP=",
"port":"-PixelStreamingPort="
},
"app_args_static":[
"-RenderOffScreen",
"-PixelStreamingEncoderMaxBitrate=15000000",
"-ResX 1280 -ResY 720",
"-PixelStreamingEncoderMinQP=25",
"-PixelStreamingEncoderMultipass=QUARTER",
"-PixelStreamingWebRTCMaxFps=60"
]
}
File diff suppressed because it is too large Load Diff
+19
View File
@@ -0,0 +1,19 @@
{
"name": "coordinator",
"version": "1.0.0",
"description": "",
"main": "coordinator.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"type": "commonjs",
"dependencies": {
"express": "^4.18.2",
"mongodb": "^4.12.0",
"node-datetime": "^2.1.2",
"signal": "^7.0.6"
}
}
+153
View File
@@ -0,0 +1,153 @@
// session server
// http
const http = require('http')
// filesystem
const fs = require('fs')
// server config
const config = require('./config.json')
// node date time
const node_time = require('node-datetime')
// titles for process creation
const titles = require('./titles.json')
const { spawn } = require('node:child_process')
const child_process = require('child_process')
const port_alloc = require('../lib/port_alloc.js')
const http_client = require('../lib/http_client.js')
const logger = require('../lib/logger')
// logger
const logger_init = new logger('./logs/init.log')
const logger_runtime = new logger('./logs/runtime.log')
var app_port_alloc = new port_alloc(config.session_ports.app_begin, config.session_ports.count)
var http_port_alloc = new port_alloc(config.session_ports.http_begin, config.session_ports.count)
// response function
async function create_response(request, callback) {
var response = ''
try {
if (request.verb == 'CREATE_SESSION') {
var app_info
titles.forEach((title) => {
if (title.title == request.title) {
app_info = title
}
})
if (app_info == null) {
response = {msg:'TITLE_NOT_EXISTS'}
callback(response)
return
}
var app_port
var http_port
try {
app_port = app_port_alloc.get()
http_port = http_port_alloc.get()
} catch(e) {
logger_runtime.error(e)
response = {msg:'PORT_BUSY'}
callback(response)
return
}
// start app_proc
const app_proc = spawn(app_info.path, [].concat(config.app_args_static,
[config.app_args_runtime.ip + config.ip, config.app_args_runtime.port + app_port.toString()]), {
detached: true
})
app_proc.on('error', function(err) {
logger_runtime.error(err)
response = {msg:'APP_PROC_ERROR'}
callback(response)
return
})
app_proc.on('close', (code) => {
app_port_alloc.free(app_port)
})
// start webrtc server
const webrtc_proc = child_process.fork(config.webrtc_args_static.cirrus_path, [http_port.toString(), app_port.toString()], {
detached: true
})
webrtc_proc.on('error', function(err) {
logger_runtime.error(err)
response = {msg:'WEBRTC_PROC_ERROR'}
callback(response)
return
})
webrtc_proc.on('close', (code) => {
http_port_alloc.free(http_port)
// if webrtc server closed, kill app proc and send session end message to server
if (process.platform === 'win32') {
spawn('taskkill', ['/pid', app_proc.pid, '/f', '/t'])
}
else {
app_proc.kill('SIGINT')
}
new http_client(config.coordinator.ip, config.coordinator.port).post(
JSON.stringify({verb:'CLOSE_SESSION', session_id:request.session_id}), function(answer) {
},
function(error) {
})
})
response = {msg:'SESSION_CREATED', link:`http://${config.ip+':'+http_port}`}
callback(response)
return
} else if(request.verb == 'CONNECT_SESSION') {
response = ''
callback(response)
return
} else {
response = {msg:'UNKNOWN_VERB'}
callback(response)
return
}
}
catch(e) {
logger_runtime.error(e)
}
}
// start http server
const server = http.createServer(function(request, response) {
if (request.method == 'POST') {
var body = ''
request.on('data', function(data) {
body += data
})
request.on('end', function() {
logger_runtime.log('received: ', body)
response.writeHead(200, {'Content-Type': 'application/json'})
try {
create_response(JSON.parse(body), function(message) {
response.end(JSON.stringify(message))
logger_runtime.log('response:', message)
})
}
catch(e) {
logger_runtime.error(e)
}
})
}
else {
response.writeHead(501, {'Content-Type': 'application/json'})
response.end(JSON.stringify({msg:'UNKNOWN_METHOD'}))
}
})
try {
server.listen(config.port, config.ip)
logger_init.log(`Listening at http://${config.ip}:${config.port}`)
} catch (e) {
logger_init.error(e)
}
+10
View File
@@ -0,0 +1,10 @@
[
{
"title":"Fortis",
"path":"D:/shared/Builds/Fortis_UnStable_64/WindowsNoEditor/FORTIS_Taktika.exe"
},
{
"title":"Ivazowsky",
"path":"D:/shared/Builds/Ivaz_Optimized_2/Ivazowsky.exe"
}
]