diff --git a/native/cpp/CommonCpp/Tools/MPMCQueue.h b/native/cpp/CommonCpp/Tools/MPMCQueue.h new file mode 100644 --- /dev/null +++ b/native/cpp/CommonCpp/Tools/MPMCQueue.h @@ -0,0 +1,65 @@ +// based on: +// https://morestina.net/blog/1400/minimalistic-blocking-bounded-queue-for-c + +#include +#include + +template class MPMCQueue { + std::deque content; + size_t capacity; + + std::mutex mutex; + std::condition_variable not_empty; + std::condition_variable not_full; + + MPMCQueue(const MPMCQueue &) = delete; + MPMCQueue(MPMCQueue &&) = delete; + MPMCQueue &operator=(const MPMCQueue &) = delete; + MPMCQueue &operator=(MPMCQueue &&) = delete; + +public: + MPMCQueue(size_t capacity) : capacity(capacity) { + } + + void push(T &&item) { + { + std::unique_lock lk(mutex); + not_full.wait(lk, [this]() { return content.size() < capacity; }); + content.push_back(std::move(item)); + } + not_empty.notify_one(); + } + + bool try_push(T &&item) { + { + std::unique_lock lk(mutex); + if (content.size() == capacity) + return false; + content.push_back(std::move(item)); + } + not_empty.notify_one(); + return true; + } + + void pop(T &item) { + { + std::unique_lock lk(mutex); + not_empty.wait(lk, [this]() { return !content.empty(); }); + item = std::move(content.front()); + content.pop_front(); + } + not_full.notify_one(); + } + + bool try_pop(T &item) { + { + std::unique_lock lk(mutex); + if (content.empty()) + return false; + item = std::move(content.front()); + content.pop_front(); + } + not_full.notify_one(); + return true; + } +}; \ No newline at end of file diff --git a/native/cpp/CommonCpp/Tools/WorkerThread.h b/native/cpp/CommonCpp/Tools/WorkerThread.h --- a/native/cpp/CommonCpp/Tools/WorkerThread.h +++ b/native/cpp/CommonCpp/Tools/WorkerThread.h @@ -1,6 +1,6 @@ #pragma once -#include +#include "MPMCQueue.h" #include #include #include @@ -11,7 +11,7 @@ class WorkerThread { std::unique_ptr thread; - folly::MPMCQueue> tasks; + MPMCQueue> tasks; const std::string name; public: diff --git a/native/cpp/CommonCpp/Tools/WorkerThread.cpp b/native/cpp/CommonCpp/Tools/WorkerThread.cpp --- a/native/cpp/CommonCpp/Tools/WorkerThread.cpp +++ b/native/cpp/CommonCpp/Tools/WorkerThread.cpp @@ -5,11 +5,11 @@ namespace comm { WorkerThread::WorkerThread(const std::string name) - : tasks(folly::MPMCQueue>(500)), name(name) { + : tasks(MPMCQueue>(500)), name(name) { auto job = [this]() { while (true) { std::unique_ptr lastTask; - this->tasks.blockingRead(lastTask); + this->tasks.pop(lastTask); if (lastTask == nullptr) { break; } @@ -20,7 +20,7 @@ } void WorkerThread::scheduleTask(const taskType task) { - if (!this->tasks.write(std::make_unique(std::move(task)))) { + if (!this->tasks.try_push(std::make_unique(std::move(task)))) { std::string errorMessage{ "Error scheduling task on the " + this->name + " worker thread"}; Logger::log(errorMessage); @@ -29,7 +29,7 @@ } WorkerThread::~WorkerThread() { - this->tasks.blockingWrite(nullptr); + this->tasks.push(nullptr); try { this->thread->join(); } catch (const std::system_error &error) {