Files
2022-08-29 12:54:49 +05:00

579 lines
18 KiB
C++

// websocket
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <iostream>
#include <string>
#include <thread>
#include <functional>
// json
#include <nlohmann/json.hpp>
// windows child process
#include <windows.h>
#include <stdio.h>
#include <tchar.h>
// windows terminate process
#include <tlhelp32.h>
//hash
#include <boost/functional/hash.hpp>
// time
#include <chrono>
#include <ctime>
// fixed int
#include <cstdint>
// user
#include "user.h"
#include <vector>
// session
#include "sessionManager.h"
// current directory
#include <filesystem>
// file read/write
#include <fstream>
// openssl
#define BOOST_NETWORK_ENABLE_HTTPS
// using SSL
#define USE_SSL
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
// end process by process id
void EndProc(DWORD procId)
{
PROCESSENTRY32 pe;
memset(&pe, 0, sizeof(PROCESSENTRY32));
pe.dwSize = sizeof(PROCESSENTRY32);
HANDLE hSnap = ::CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
if (::Process32First(hSnap, &pe))
{
BOOL bContinue = TRUE;
// kill child processes
while (bContinue)
{
// only kill child processes
if (pe.th32ParentProcessID == procId)
{
HANDLE hChildProc = ::OpenProcess(PROCESS_ALL_ACCESS, FALSE, pe.th32ProcessID);
if (hChildProc)
{
::TerminateProcess(hChildProc, 1);
::CloseHandle(hChildProc);
}
}
bContinue = ::Process32Next(hSnap, &pe);
}
// kill the main process
HANDLE hProc = ::OpenProcess(PROCESS_ALL_ACCESS, FALSE, procId);
if (hProc)
{
::TerminateProcess(hProc, 1);
::CloseHandle(hProc);
}
}
}
// create process
bool CreateProc(
std::string path,
std::string args,
PROCESS_INFORMATION* processInfo
)
{
std::wstring path_ws = std::wstring(path.begin(), path.end());
const wchar_t* path_cw = path_ws.c_str();
std::wstring cmd_ws = std::wstring(args.begin(), args.end());
wchar_t* cmd_w = const_cast<wchar_t*>(cmd_ws.c_str());
// creating process
STARTUPINFO startupInfo = { sizeof(startupInfo) };
if (!CreateProcess(path_cw, cmd_w, NULL, NULL, TRUE, 0, NULL, NULL, &startupInfo, processInfo))
return false;
return true;
}
template <typename T>
uint32_t hash6(T arg)
{
boost::hash<T> h;
return h(arg) % 1000000;
}
LRESULT CALLBACK GLWindowProc(HWND hWnd, UINT msg, WPARAM wParam, LPARAM lParam)
{
if (msg == WM_DESTROY) PostQuitMessage(0);
return DefWindowProc(hWnd, msg, wParam, lParam);
}
// log
HANDLE hConsoleOutputColor;
std::ofstream logFile;
void LOG(std::stringstream&& s, WORD color)
{
SetConsoleTextAttribute(hConsoleOutputColor, color);
std::cout << s.str() << std::endl;
if (logFile.is_open())
{
logFile << s.str() << std::endl;
}
}
#ifdef USE_SSL
void do_session(tcp::socket sock, ssl::context& ctx, sessionManager& sessManager, std::string ip, uint64_t& user_n, std::string appPath, std::string appArgs, uint16_t portStream, uint16_t portHttp);
#endif
#ifndef USE_SSL
void do_session(tcp::socket sock, /*ssl::context& ctx,*/ sessionManager& sessManager, std::string ip, uint64_t& user_n, std::string appPath, std::string appArgs, uint16_t portStream, uint16_t portHttp);
#endif
// creating listen socket and validate client messages
int main(int argc, char* argv[])
{
// disable console input
HANDLE hInput = GetStdHandle(STD_INPUT_HANDLE);
DWORD prev_mode;
GetConsoleMode(hInput, &prev_mode);
SetConsoleMode(hInput, ENABLE_EXTENDED_FLAGS |
(prev_mode & ~ENABLE_QUICK_EDIT_MODE));
// configure logs function
{
// get output handle for log function
hConsoleOutputColor = GetStdHandle(STD_OUTPUT_HANDLE);
// open log file
std::string logsFolder = std::filesystem::current_path().string() + "\\ServerLogs";
std::string logFileName = std::to_string(std::chrono::system_clock::now().time_since_epoch().count()) + ".log";
std::wstring logsFolder_LPCWSTR = std::wstring(logsFolder.begin(), logsFolder.end());
logFile.open(logsFolder + "\\" + logFileName, std::ios::out);
if (!logFile)
{
if (CreateDirectory(logsFolder_LPCWSTR.c_str(), NULL) || ERROR_ALREADY_EXISTS == GetLastError())
{
logFile.open(logsFolder + "\\" + logFileName, std::ios::out);
}
else
{
std::cout << "create direcotry failed: " << logsFolder << std::endl;
}
}
}
// args
// 1 - copies amount
// 2 - server ip
// 3 - server port
// 4 - path_to_node.js
// 5 - start_http_port
// 6 - start_streamer_port
// 7 - path to crt
// 8 - path to key
// 9 - path to pem
// 10 - path_to_application
// 11 - application_args (without -pixelStreamingPort cos its argv[3])
// sessions
uint16_t sessionLimit = std::atoi(argv[1]);
sessionManager sessManager;
sessManager.setlimit(sessionLimit);
// user count
uint64_t user_n = 0;
// merging app arguments
std::string appPath = argv[10];
std::string appArgs;
for (int i = 11; i < argc; ++i)
appArgs += argv[i] + std::string(" ");
appArgs += "-PixelStreamingPort=";
// first app ports
uint16_t portHttp = std::atoi(argv[5]);
uint16_t portStream = std::atoi(argv[6]);
// starting node js servers
for (uint16_t i = 0; i < sessionLimit; ++i)
{
system((std::string("start node ") + argv[4] + " " + std::to_string(portHttp + i) + " " + std::to_string(portStream + i)).c_str());
}
// ip and socket init
std::string ip(argv[2]);
LOG((std::stringstream() << "ip: " << ip), 7);
auto const address = net::ip::make_address(ip);
auto const port = static_cast<uint16_t>(std::atoi(argv[3]));
LOG((std::stringstream() << "port: " << port), 7);
try
{
// the io_context is required for all I/O
net::io_context ioc{ 1 };
#ifdef USE_SSL
// the SSL context is required, and holds certificates
ssl::context ctx{ ssl::context::tlsv12 };
//// loading certificate, key, pem
ctx.use_certificate_chain_file(argv[7]);
ctx.use_private_key_file(argv[8], boost::asio::ssl::context::file_format::pem);
ctx.use_tmp_dh_file(argv[9]);
#endif
// the acceptor receives incoming connections
tcp::acceptor acceptor{ ioc, {address, port} };
while (true)
{
// This will receive the new connection
tcp::socket socket(ioc);
// Block until we get a connection
acceptor.accept(socket);
// Launch the session, transferring ownership of the socket
std::thread
{
&do_session,
std::move(socket),
#ifdef USE_SSL
std::ref(ctx),
#endif
std::ref(sessManager),
ip,
std::ref(user_n),
appPath,
appArgs,
portStream,
portHttp
}.detach();
}
}
catch (const std::exception& e)
{
LOG(std::stringstream() << "Error: " << e.what(), 12);
return EXIT_FAILURE;
}
logFile.close();
return 0;
}
#ifdef USE_SSL
void do_session(tcp::socket sock, ssl::context& ctx, sessionManager& sessManager, std::string ip, uint64_t& user_n, std::string appPath, std::string appArgs, uint16_t portStream, uint16_t portHttp)
#endif
#ifndef USE_SSL
void do_session(tcp::socket sock, /*ssl::context& ctx,*/ sessionManager& sessManager, std::string ip, uint64_t& user_n, std::string appPath, std::string appArgs, uint16_t portStream, uint16_t portHttp)
#endif
{
std::string user_ip = sock.remote_endpoint().address().to_string();
uint64_t user_id = std::chrono::system_clock::now().time_since_epoch().count();
user_id = hash6(user_ip + std::to_string(user_id));
uint64_t thisUserNum = ++user_n;
LOG((std::stringstream() << "user(" << thisUserNum << ") connected (ip: " << user_ip << "), (id: " << user_id << ")"), 7);
#ifdef USE_SSL
// Construct the websocket stream around the socket
websocket::stream<beast::ssl_stream<tcp::socket&>> ws(sock, ctx);
#endif
#ifndef USE_SSL
websocket::stream<tcp::socket> ws{ std::move(sock) };
#endif
try
{
#ifdef USE_SSL
// Perform the SSL handshake
ws.next_layer().handshake(ssl::stream_base::server);
#endif
// Set a decorator to change the Server of the handshake
ws.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-sync-ssl");
}));
ws.accept();
}
catch (const std::exception& e)
{
LOG(std::stringstream() << "Error: " << e.what(), 12);
return;
}
catch (beast::system_error const& se)
{
LOG(std::stringstream() << "Error: " << se.what(), 12);
return;
}
nlohmann::json jsonData;
std::string message;
std::string content;
//session sessTmp;
session* sessCur = nullptr;
uint32_t sessId = 0;
while (true)
{
try
{
// This buffer will hold the incoming message
// buffer types https://www.boost.org/doc/libs/1_75_0/libs/beast/doc/html/beast/using_io/buffer_types.html
// check for the best one
// beast::multi_buffer buffer;
beast::flat_buffer buffer;
// Read a message
ws.read(buffer);
// cout buffer to string
//std::cout << beast::buffers_to_string(buffer.data()) << std::endl;
// validate json errors
try
{
jsonData = nlohmann::json::parse(beast::buffers_to_string(buffer.data()));
message = jsonData.at("message");
}
catch (nlohmann::json::parse_error& e)
{
std::cout << "Parse error:" << e.what() << std::endl;
}
catch (nlohmann::json::out_of_range& e)
{
std::cout << "Out of range:" << e.what() << std::endl;
}
catch (nlohmann::json::exception& e)
{
std::cout << "JSON exception: " << e.what() << std::endl;
}
catch (std::exception& e)
{
std::cout << "Unknown exception" << e.what() << std::endl;
}
// validate client message
if (message == "NEW_USER")
{
buffer.clear();
boost::beast::ostream(buffer) << "connected";
ws.write(buffer.data());
}
else if (message == "NEW_SESS")
{
buffer.clear();
if (sessManager.isLimitReached())
{
boost::beast::ostream(buffer)
<< std::string("{\"message\" : \"SESS_LIMIT\"}").c_str();
ws.write(buffer.data());
continue;
}
if (sessManager.existsOwnerId(user_id))
{
boost::beast::ostream(buffer)
<< std::string("{\"message\" : \"PERSON_LIMIT\"}").c_str();
ws.write(buffer.data());
continue;
}
// get session port
//uint16_t sessMinPort = sessManager.getMinPort(portHttp);
uint16_t sessHttpPort = sessManager.getMinPort(portHttp);//(sessMinPort == 0) ? portHttp : sessMinPort + 1;
uint16_t sessStreamPort = portStream + (sessHttpPort - portHttp);
// creating process
PROCESS_INFORMATION appProcInfo;
if (!CreateProc(appPath, appPath + " " + appArgs + std::to_string(sessStreamPort), &appProcInfo))
{
LOG((std::stringstream() << "CreateProcess failed (" << GetLastError() << ")."), 12);
continue;
}
boost::beast::ostream(buffer)
<< std::string("{\"message\" : \"CREATING_SESSION\", \"content\" : \"true\"}").c_str();
ws.write(buffer.data());
// bind app process close listen
std::thread([appProcInfo]()
{
WaitForSingleObject(appProcInfo.hProcess, INFINITE);
CloseHandle(appProcInfo.hProcess);
CloseHandle(appProcInfo.hThread);
}).detach();
sessId = hash6(user_id);
// create session
sessManager.add(
session(
sessId,
ip + std::to_string(sessHttpPort),
user_ip,
sessHttpPort
));
if (sessManager.getById(sessId, &sessCur))
{
// add user
sessCur->addUser(user(user_id));
// add process ids
sessCur->addProcId(GetProcessId(appProcInfo.hProcess));
//sessCur->addProcId(GetProcessId(nodeProcInfo.hProcess));
}
LOG((std::stringstream() << "session ["
<< sessId << "] created, (httpPort: " << sessHttpPort << "), (streamPort: " << std::to_string(sessStreamPort) + ")"), 10);
// send respond
buffer.clear();
boost::beast::ostream(buffer)
<< (std::string("{\"message\" : \"SESS_CREATION\", \"id\" : \"")
+ std::to_string(sessId) + "\", \"port\" : \"" + std::to_string(sessHttpPort) + "\"}");
ws.write(buffer.data());
}
else if (message == "SESS_CONNECT")
{
// read session id
try
{
jsonData = nlohmann::json::parse(beast::buffers_to_string(buffer.data()));
content = jsonData.at("content");
}
catch (nlohmann::json::parse_error& e)
{
std::cout << "Parse error:" << e.what() << std::endl;
}
catch (nlohmann::json::out_of_range& e)
{
std::cout << "Out of range:" << e.what() << std::endl;
}
catch (nlohmann::json::exception& e)
{
std::cout << "JSON exception: " << e.what() << std::endl;
}
catch (std::exception& e)
{
std::cout << "Unknown exception" << e.what() << std::endl;
}
sessId = atoi(content.c_str());
// connect to session
if (sessManager.getById(sessId, &sessCur))
{
if (sessCur->existsUserId(user_id))
{
LOG((std::stringstream() << "user exists: " << user_id), 7);
continue;
}
sessCur->addUser(user(user_id));
LOG((std::stringstream() << "user(" << thisUserNum << ") connected to session (id: " << sessCur->getId()
<< "), (users: " << sessCur->getUsersCount() << ")"), 14);
buffer.clear();
boost::beast::ostream(buffer)
<< (std::string("{\"message\" : \"SESS_CONNECT\", \"content\" : \"") + std::to_string(sessCur->getPort()) + "\"}");
ws.write(buffer.data());
}
else
{
LOG((std::stringstream() << "no session (id: " << content << ")"), 14);
buffer.clear();
boost::beast::ostream(buffer)
<< (std::string("{\"message\" : \"SESS_NOT_EXISTS\"}"));
ws.write(buffer.data());
}
}
// Echo the message back
// ws.text(ws.got_text());
// boost::beast::ostream(buffer) << "something";
// ws.write(buffer.data());
}
catch (const std::exception& e)
{
LOG((std::stringstream() << "Error: " << e.what()), 14);
if (sessId)
{
sessManager.getById(sessId, &sessCur);
if (sessCur == nullptr)
{
std::cout << "nullptr session" << std::endl;
}
else
{
sessCur->removeUserById(user_id);
if (!sessCur->getUsersCount())
{
LOG((std::stringstream() << "session [" << sessCur->getId() << "] destroyed"), 12);
std::vector<DWORD> procIds(sessCur->getProcIds());
for (auto& id : procIds)
EndProc(id);
sessManager.remove(sessId);
}
}
}
LOG((std::stringstream() << "user(" << thisUserNum << ") disconnected"), 7);
break;
}
catch (beast::system_error const& se)
{
LOG((std::stringstream() << "user(" << thisUserNum << ") fdisconnected"), 7);
break;
/* if (se.code() != websocket::error::closed)
{
std::cerr << "Error: " << se.code().message() << std::endl;
break;
}*/
}
}
}