stop http worker on shutdown

This commit is contained in:
Axel Cocat
2021-08-06 18:24:16 +02:00
parent d7260a0811
commit dd1b8aa71d
2 changed files with 77 additions and 38 deletions

View File

@@ -5,7 +5,7 @@ extern "C"
#include "lauxlib.h" #include "lauxlib.h"
}; };
#if defined TRINITYCORE || defined AZEROTHCORE #if defined TRINITY || defined AZEROTHCORE
#define CPPHTTPLIB_OPENSSL_SUPPORT #define CPPHTTPLIB_OPENSSL_SUPPORT
#endif #endif
#include "libs/httplib.h" #include "libs/httplib.h"
@@ -29,40 +29,76 @@ HttpResponse::HttpResponse(int funcRef, int statusCode, const std::string& body,
{ } { }
HttpManager::HttpManager() HttpManager::HttpManager()
: httpWorkQueue(16), : workQueue(16),
httpResponseQueue(16), responseQueue(16),
startedHttpWorkerThread(false), startedWorkerThread(false),
httpCancelationToken(false), cancelationToken(false),
httpCondVar(), condVar(),
httpCondVarMutex(), condVarMutex(),
parseUrlRegex("^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\\?([^#]*))?(#(.*))?") parseUrlRegex("^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\\?([^#]*))?(#(.*))?")
{ {
StartHttpWorker(); StartHttpWorker();
} }
HttpManager::~HttpManager()
{
StopHttpWorker();
}
void HttpManager::PushRequest(HttpWorkItem* item) void HttpManager::PushRequest(HttpWorkItem* item)
{ {
std::unique_lock<std::mutex> lock(httpCondVarMutex); std::unique_lock<std::mutex> lock(condVarMutex);
httpWorkQueue.push(item); workQueue.push(item);
httpCondVar.notify_one(); condVar.notify_one();
} }
void HttpManager::StartHttpWorker() void HttpManager::StartHttpWorker()
{ {
while (httpWorkQueue.front()) ClearQueues();
if (!startedWorkerThread)
{ {
httpWorkQueue.pop(); cancelationToken.store(false);
workerThread = std::thread(&HttpManager::HttpWorkerThread, this);
startedWorkerThread = true;
} }
while (httpResponseQueue.front()) }
void HttpManager::ClearQueues()
{
while (workQueue.front())
{ {
httpResponseQueue.pop(); HttpWorkItem* item = *workQueue.front();
if (item != nullptr)
{
delete item;
}
workQueue.pop();
} }
if (!startedHttpWorkerThread) while (responseQueue.front())
{ {
httpWorkerThread = std::thread(&HttpManager::HttpWorkerThread, this); HttpResponse* item = *responseQueue.front();
startedHttpWorkerThread = true; if (item != nullptr)
{
delete item;
} }
responseQueue.pop();
}
}
void HttpManager::StopHttpWorker()
{
if (!startedWorkerThread)
{
return;
}
cancelationToken.store(true);
condVar.notify_one();
workerThread.join();
ClearQueues();
startedWorkerThread = false;
} }
void HttpManager::HttpWorkerThread() void HttpManager::HttpWorkerThread()
@@ -70,21 +106,21 @@ void HttpManager::HttpWorkerThread()
while (true) while (true)
{ {
{ {
std::unique_lock<std::mutex> lock(httpCondVarMutex); std::unique_lock<std::mutex> lock(condVarMutex);
httpCondVar.wait(lock, [&] { return httpWorkQueue.front(); }); condVar.wait(lock, [&] { return workQueue.front() != nullptr || cancelationToken.load(); });
} }
if (!httpWorkQueue.front()) if (cancelationToken.load())
{
continue;
}
if (httpCancelationToken)
{ {
break; break;
} }
if (!workQueue.front())
{
continue;
}
HttpWorkItem* req = *httpWorkQueue.front(); HttpWorkItem* req = *workQueue.front();
httpWorkQueue.pop(); workQueue.pop();
if (!req) if (!req)
{ {
continue; continue;
@@ -131,7 +167,7 @@ void HttpManager::HttpWorkerThread()
res = DoRequest(cli2, req, path); res = DoRequest(cli2, req, path);
} }
httpResponseQueue.push(new HttpResponse(req->funcRef, res->status, res->body, res->headers)); responseQueue.push(new HttpResponse(req->funcRef, res->status, res->body, res->headers));
} }
catch (const std::exception& ex) catch (const std::exception& ex)
{ {
@@ -203,10 +239,10 @@ bool HttpManager::ParseUrl(const std::string& url, std::string& host, std::strin
void HttpManager::HandleHttpResponses() void HttpManager::HandleHttpResponses()
{ {
while (!httpResponseQueue.empty()) while (!responseQueue.empty())
{ {
HttpResponse* res = *httpResponseQueue.front(); HttpResponse* res = *responseQueue.front();
httpResponseQueue.pop(); responseQueue.pop();
if (res == nullptr) if (res == nullptr)
{ {

View File

@@ -35,23 +35,26 @@ class HttpManager
{ {
public: public:
HttpManager(); HttpManager();
~HttpManager();
void PushRequest(HttpWorkItem* item);
void StartHttpWorker(); void StartHttpWorker();
void StopHttpWorker();
void PushRequest(HttpWorkItem* item);
void HandleHttpResponses(); void HandleHttpResponses();
private: private:
void ClearQueues();
void HttpWorkerThread(); void HttpWorkerThread();
bool ParseUrl(const std::string& url, std::string& host, std::string& path); bool ParseUrl(const std::string& url, std::string& host, std::string& path);
httplib::Result DoRequest(httplib::Client& client, HttpWorkItem* req, const std::string& path); httplib::Result DoRequest(httplib::Client& client, HttpWorkItem* req, const std::string& path);
rigtorp::SPSCQueue<HttpWorkItem*> httpWorkQueue; rigtorp::SPSCQueue<HttpWorkItem*> workQueue;
rigtorp::SPSCQueue<HttpResponse*> httpResponseQueue; rigtorp::SPSCQueue<HttpResponse*> responseQueue;
std::thread httpWorkerThread; std::thread workerThread;
bool startedHttpWorkerThread; bool startedWorkerThread;
std::atomic_bool httpCancelationToken; std::atomic_bool cancelationToken;
std::condition_variable httpCondVar; std::condition_variable condVar;
std::mutex httpCondVarMutex; std::mutex condVarMutex;
std::regex parseUrlRegex; std::regex parseUrlRegex;
}; };