database separated and stability improved, celebrate argument parse added, fastest session server search added, session server work check added, session server websocket get added, error handling improved, overal stability improved, overal scalability improved, everything refactored
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
const {port} = require('./config')
|
||||
const app = require('express')()
|
||||
const helmet = require('helmet')
|
||||
const routes = require('./src/routes')
|
||||
const { errors } = require('celebrate')
|
||||
const body_parser = require('body-parser')
|
||||
const {request_logger, error_logger} = require('./src/middlewares/logger')
|
||||
const {options, cors} = require('./src/middlewares/cors')
|
||||
const {error_handler} = require('./src/middlewares/error_handler')
|
||||
//const limiter = require('./src/middlewares/limiter')
|
||||
|
||||
app.options(options)
|
||||
|
||||
app.use(cors)
|
||||
|
||||
app.use(helmet())
|
||||
|
||||
app.use(body_parser.json())
|
||||
|
||||
app.use(request_logger)
|
||||
|
||||
//app.use(limiter)
|
||||
|
||||
app.use(routes)
|
||||
|
||||
app.use(errors())
|
||||
|
||||
app.use(error_logger)
|
||||
|
||||
app.use(error_handler)
|
||||
|
||||
|
||||
// start listen server
|
||||
app.listen(port)
|
||||
console.log('all config.js tests passed successfully [not developed]')
|
||||
console.log(`Listening at http://localhost:${port}`)
|
||||
@@ -0,0 +1,17 @@
|
||||
module.exports = {
|
||||
port: process.env.PORT,
|
||||
database_url: process.env.DATABASE_URL,
|
||||
database_name: process.env.DATABASE_NAME,
|
||||
log_path: "./logs/runtime.log",
|
||||
allowed_cors: [
|
||||
"http://192.168.1.171:3000",
|
||||
"http://localhost:3000",
|
||||
"https://stream.graff.tech/",
|
||||
"http://212.220.216.185:3005",
|
||||
"https://stream.graff.tech"
|
||||
],
|
||||
limiter: {
|
||||
window_minutes: 10,
|
||||
max_requests_per_window: 1000
|
||||
}
|
||||
}
|
||||
+1
-1
Submodule lib updated: 92ebcbb613...b3fa29e4c5
Generated
+1849
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,27 @@
|
||||
{
|
||||
"name": "ls",
|
||||
"version": "1.0.0",
|
||||
"description": "coordinator",
|
||||
"main": "app.js",
|
||||
"directories": {
|
||||
"lib": "lib",
|
||||
"test": "tests"
|
||||
},
|
||||
"scripts": {
|
||||
"start": "node ./app.js",
|
||||
"test": "echo \"Error: no test specified\" && exit 1"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "http://192.168.1.163:3000/EgorSuv/pixel-streaming-coordinator.git"
|
||||
},
|
||||
"author": "egor",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"body-parser": "^1.20.1",
|
||||
"celebrate": "^15.0.1",
|
||||
"express": "^4.18.2",
|
||||
"express-rate-limit": "^6.7.0",
|
||||
"request": "^2.88.2"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
const database = require('../database/database')
|
||||
const not_found_error = require('../../lib/src/http/errors/not_found_error')
|
||||
const {get_fastest_session_server, run_session} = require('../modules/session_server')
|
||||
|
||||
const create_session = async (req, res, next) => {
|
||||
const {title} = req.body
|
||||
|
||||
var free_servers = await database.get_free_session_servers(title)
|
||||
|
||||
if (!free_servers) {
|
||||
next(new not_found_error('no free session server or title not exists'))
|
||||
return
|
||||
}
|
||||
|
||||
var session_server = await get_fastest_session_server(free_servers)
|
||||
|
||||
if (!session_server) {
|
||||
next(new not_found_error('all session servers are not available'))
|
||||
return
|
||||
}
|
||||
|
||||
var session_id = await database.generate_session_id()
|
||||
if (!session_id) {
|
||||
next(new not_found_error('can not generate session id'))
|
||||
return
|
||||
}
|
||||
|
||||
var websocket_url = await run_session(session_server.url, session_id, title)
|
||||
|
||||
if (!websocket_url) {
|
||||
next(new not_found_error('websocket_url not valid'))
|
||||
return
|
||||
}
|
||||
|
||||
var add_active_session_result = await database.add_active_session(session_server.url, session_id, websocket_url)
|
||||
|
||||
if (!add_active_session_result) {
|
||||
next(new not_found_error('add_active_session error'))
|
||||
return
|
||||
}
|
||||
res.json({session_id:session_id})
|
||||
}
|
||||
|
||||
const connect_session = async (req, res, next) => {
|
||||
var websocket_url = await database.get_session_websocket_url(req.query.session_id)
|
||||
if (!websocket_url) {
|
||||
next(new not_found_error('session not exists'))
|
||||
return
|
||||
}
|
||||
res.json({websocket_url:websocket_url})
|
||||
}
|
||||
|
||||
const schedule_session = async (req, res, next) => {
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
create_session,
|
||||
schedule_session,
|
||||
connect_session
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
const server_error = require('../../lib/src/http/errors/server_error')
|
||||
const database = require('../database/database')
|
||||
|
||||
const session_closed = async (req, res, next) => {
|
||||
var remove_result = await database.remove_active_session(req.body.session_id)
|
||||
if (!remove_result) {
|
||||
next(new server_error('remove session error'))
|
||||
return
|
||||
}
|
||||
res.json({message:"removed"})
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
session_closed
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
const database = require('../database/database.js')
|
||||
const not_found_error = require('../../lib/src/http/errors/not_found_error')
|
||||
|
||||
const get_titles = async (req, res, next) => {
|
||||
var titles = await database.get_titles()
|
||||
|
||||
if (!titles) {
|
||||
next(new not_found_error('titles not found'))
|
||||
return
|
||||
}
|
||||
|
||||
res.json(titles)
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
get_titles
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
const { MongoClient } = require("mongodb")
|
||||
const { database_url, database_name } = require('../../config')
|
||||
const crypto = require("crypto")
|
||||
let max_database_connection_timeout = 1000
|
||||
|
||||
const get_db = async () => {
|
||||
const client = new MongoClient(database_url, {
|
||||
serverSelectionTimeoutMS: max_database_connection_timeout,
|
||||
useUnifiedTopology: true
|
||||
})
|
||||
try {
|
||||
await client.connect()
|
||||
} catch (err) {
|
||||
return null
|
||||
}
|
||||
return client.db(database_name)
|
||||
}
|
||||
|
||||
const get_titles = async (start, count) => {
|
||||
var db = await get_db()
|
||||
if (!db) {
|
||||
return null
|
||||
}
|
||||
var titles = await db.collection('title').find().skip(parseInt(start)).limit(parseInt(count)).toArray()
|
||||
return (!titles.length) ? null : titles
|
||||
}
|
||||
|
||||
const get_free_session_servers = async (title) => {
|
||||
var db = await get_db()
|
||||
if (!db) {
|
||||
return null
|
||||
}
|
||||
|
||||
var servers = await db.collection('session_server').find({title:title}).toArray()
|
||||
|
||||
if (!servers.length) {
|
||||
return null
|
||||
}
|
||||
|
||||
var active_session = db.collection('active_session')
|
||||
|
||||
var free_servers = []
|
||||
var servers_length = servers.length
|
||||
for (var i = 0; i < servers_length; ++i) {
|
||||
var sessions = await active_session.find({url:servers[i].url}).toArray()
|
||||
if (servers[i].limit > sessions.length) {
|
||||
free_servers.push(servers[i])
|
||||
}
|
||||
}
|
||||
return free_servers
|
||||
}
|
||||
|
||||
const add_active_session = async (server_url, session_id, websocket_url) => {
|
||||
var db = await get_db()
|
||||
if (!db) {
|
||||
return false
|
||||
}
|
||||
|
||||
var active_session = db.collection('active_session')
|
||||
|
||||
await active_session.insertOne({
|
||||
server_url: server_url,
|
||||
session_id: session_id,
|
||||
websocket_url: websocket_url
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
const generate_session_id = async () => {
|
||||
var db = await get_db()
|
||||
if (!db) {
|
||||
return null
|
||||
}
|
||||
|
||||
var active_session = db.collection('active_session')
|
||||
var session_id = {}
|
||||
|
||||
var max_id_len = 6
|
||||
var sessions_with_id = 0
|
||||
while(true) {
|
||||
session_id = crypto.randomBytes(max_id_len/2).toString('hex').toUpperCase()
|
||||
sessions_with_id = await active_session.find({session_id:session_id}).toArray().length
|
||||
if (!sessions_with_id) {
|
||||
return session_id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const remove_active_session = async (session_id) => {
|
||||
var db = await get_db()
|
||||
if (!db) {
|
||||
return false
|
||||
}
|
||||
|
||||
await db.collection('active_session').deleteOne({session_id:{$in:[session_id]}})
|
||||
return true
|
||||
}
|
||||
|
||||
const get_session_websocket_url = async (session_id) => {
|
||||
var db = await get_db()
|
||||
if (!db) {
|
||||
return null
|
||||
}
|
||||
var active_session = db.collection('active_session')
|
||||
var session = await active_session.findOne({session_id:session_id})
|
||||
if (!session) {
|
||||
return null
|
||||
}
|
||||
return session.websocket_url
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
get_titles,
|
||||
get_free_session_servers,
|
||||
add_active_session,
|
||||
generate_session_id,
|
||||
remove_active_session,
|
||||
get_session_websocket_url
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
const {allowed_cors} = require('../../config')
|
||||
|
||||
const options = (req, res) => {
|
||||
//'GET, POST, OPTIONS, PUT, PATCH, DELETE'
|
||||
res.setHeader('Access-Control-Allow-Methods', 'OPTIONS, GET, POST')
|
||||
res.end()
|
||||
}
|
||||
|
||||
const cors = (req, res, next) => {
|
||||
const {origin} = req.headers
|
||||
if (allowed_cors.includes(origin)) {
|
||||
res.setHeader('Access-Control-Allow-Origin', origin)
|
||||
}
|
||||
res.setHeader('Access-Control-Allow-Credentials', true)
|
||||
res.setHeader('Access-Control-Allow-Headers', 'X-Requested-With,content-type')
|
||||
next()
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
options,
|
||||
cors
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
const server_error = require('../../lib/src/http/errors/server_error')
|
||||
|
||||
const error_handler = (err, req, res, next) => {
|
||||
if (!err.status_code) {
|
||||
err = new server_error(err.message)
|
||||
}
|
||||
res.status(err.status_code).json({message:err.message})
|
||||
next(err)
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
error_handler
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
const rate_limit = require('express-rate-limit')
|
||||
const {window_minutes, max_requests_per_window} = require('../../config')
|
||||
|
||||
const limiter = rate_limit({
|
||||
window_ms: window_minutes * 60 * 1000,
|
||||
max: max_requests_per_window, // limit each IP per window_ms
|
||||
standardHeaders: true, // Return rate limit info in the `RateLimit-*` headers
|
||||
legacyHeaders: false, // Disable the `X-RateLimit-*` headers
|
||||
})
|
||||
|
||||
module.exports = limiter
|
||||
@@ -0,0 +1,20 @@
|
||||
const _logger = require('../../lib/src/logger')
|
||||
const {log_path} = require('../../config')
|
||||
|
||||
const logger_runtime = new _logger(log_path)
|
||||
|
||||
const request_logger = (req, res, next) => {
|
||||
var {url} = req
|
||||
logger_runtime.log(url)
|
||||
next()
|
||||
}
|
||||
|
||||
const error_logger = (err, req, res, next) => {
|
||||
logger_runtime.error(err.message)
|
||||
next(err)
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
request_logger,
|
||||
error_logger
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
var request = require('request')
|
||||
let max_response_timeout = 1000
|
||||
|
||||
const get_fastest_session_server = async (free_servers) => {
|
||||
var fastest_server = null
|
||||
var free_servers_length = free_servers.length
|
||||
|
||||
for (var i = 0; i < free_servers_length; ++i) {
|
||||
var options = {
|
||||
url: free_servers[i].url + '/ping',
|
||||
method: "GET",
|
||||
timeout: max_response_timeout
|
||||
}
|
||||
|
||||
var f = (async function(server) {
|
||||
request(options,
|
||||
async function(err, answer, data) {
|
||||
if (err) {
|
||||
return
|
||||
}
|
||||
if (!fastest_server) {
|
||||
fastest_server = server
|
||||
}
|
||||
}
|
||||
)
|
||||
})(free_servers[i])
|
||||
}
|
||||
|
||||
var timeout_count = 0
|
||||
var check_times = 1000
|
||||
while (true) {
|
||||
++timeout_count
|
||||
if (fastest_server) {
|
||||
return fastest_server
|
||||
}
|
||||
if (timeout_count > check_times) {
|
||||
return fastest_server
|
||||
}
|
||||
await new Promise(resolve => setTimeout(resolve, max_response_timeout / check_times))
|
||||
}
|
||||
}
|
||||
|
||||
const run_session = async (server_url, session_id, title) => {
|
||||
var options = {
|
||||
url: server_url + '/session/run',
|
||||
method: "POST",
|
||||
timeout: max_response_timeout,
|
||||
headers: {
|
||||
"content-type": "application/json"
|
||||
},
|
||||
json: {
|
||||
title: title,
|
||||
session_id: session_id
|
||||
}
|
||||
}
|
||||
|
||||
return new Promise(function (resolve, reject) {
|
||||
request(options,
|
||||
async function(err, answer, data) {
|
||||
if (err || !data.websocket_url) {
|
||||
resolve()
|
||||
} else {
|
||||
resolve(data.websocket_url)
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
get_fastest_session_server,
|
||||
run_session
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
const router = require('express').Router()
|
||||
const router_title = require('./title')
|
||||
const router_session = require('./session')
|
||||
const router_session_server = require('./session_server')
|
||||
|
||||
router.use('/title', router_title)
|
||||
router.use('/session', router_session)
|
||||
router.use('/session_server', router_session_server)
|
||||
|
||||
module.exports = router
|
||||
@@ -0,0 +1,24 @@
|
||||
const router = require('express').Router()
|
||||
const { celebrate, Joi, Segments} = require('celebrate')
|
||||
|
||||
const {
|
||||
create_session,
|
||||
schedule_session,
|
||||
connect_session
|
||||
} = require('../controllers/session')
|
||||
|
||||
router.post('/create', celebrate({
|
||||
[Segments.BODY]: Joi.object().keys({
|
||||
title: Joi.string().required()
|
||||
})
|
||||
}), create_session)
|
||||
|
||||
router.get('/connect', celebrate({
|
||||
[Segments.QUERY]: Joi.object().keys({
|
||||
session_id: Joi.string().required()
|
||||
})
|
||||
}), connect_session)
|
||||
|
||||
router.post('/schedule', schedule_session)
|
||||
|
||||
module.exports = router
|
||||
@@ -0,0 +1,11 @@
|
||||
const router = require('express').Router()
|
||||
const {session_closed} = require('../controllers/session_server')
|
||||
const { celebrate, Joi, Segments} = require('celebrate')
|
||||
|
||||
router.post('/session_closed', celebrate({
|
||||
[Segments.BODY]: Joi.object().keys({
|
||||
session_id: Joi.string().required()
|
||||
})
|
||||
}), session_closed)
|
||||
|
||||
module.exports = router
|
||||
@@ -0,0 +1,6 @@
|
||||
const router = require('express').Router()
|
||||
const {get_titles} = require('../controllers/title')
|
||||
|
||||
router.get('/get', get_titles)
|
||||
|
||||
module.exports = router
|
||||
Reference in New Issue
Block a user