Page MenuHomePhorge

D14970.1765041711.diff
No OneTemporary

Size
3 KB
Referenced Files
None
Subscribers
None

D14970.1765041711.diff

diff --git a/native/cpp/CommonCpp/Tools/MPMCQueue.h b/native/cpp/CommonCpp/Tools/MPMCQueue.h
new file mode 100644
--- /dev/null
+++ b/native/cpp/CommonCpp/Tools/MPMCQueue.h
@@ -0,0 +1,65 @@
+// based on:
+// https://morestina.net/blog/1400/minimalistic-blocking-bounded-queue-for-c
+
+#include <deque>
+#include <mutex>
+
+template <typename T> class MPMCQueue {
+ std::deque<T> content;
+ size_t capacity;
+
+ std::mutex mutex;
+ std::condition_variable not_empty;
+ std::condition_variable not_full;
+
+ MPMCQueue(const MPMCQueue &) = delete;
+ MPMCQueue(MPMCQueue &&) = delete;
+ MPMCQueue &operator=(const MPMCQueue &) = delete;
+ MPMCQueue &operator=(MPMCQueue &&) = delete;
+
+public:
+ MPMCQueue(size_t capacity) : capacity(capacity) {
+ }
+
+ void push(T &&item) {
+ {
+ std::unique_lock<std::mutex> lk(mutex);
+ not_full.wait(lk, [this]() { return content.size() < capacity; });
+ content.push_back(std::move(item));
+ }
+ not_empty.notify_one();
+ }
+
+ bool try_push(T &&item) {
+ {
+ std::unique_lock<std::mutex> lk(mutex);
+ if (content.size() == capacity)
+ return false;
+ content.push_back(std::move(item));
+ }
+ not_empty.notify_one();
+ return true;
+ }
+
+ void pop(T &item) {
+ {
+ std::unique_lock<std::mutex> lk(mutex);
+ not_empty.wait(lk, [this]() { return !content.empty(); });
+ item = std::move(content.front());
+ content.pop_front();
+ }
+ not_full.notify_one();
+ }
+
+ bool try_pop(T &item) {
+ {
+ std::unique_lock<std::mutex> lk(mutex);
+ if (content.empty())
+ return false;
+ item = std::move(content.front());
+ content.pop_front();
+ }
+ not_full.notify_one();
+ return true;
+ }
+};
\ No newline at end of file
diff --git a/native/cpp/CommonCpp/Tools/WorkerThread.h b/native/cpp/CommonCpp/Tools/WorkerThread.h
--- a/native/cpp/CommonCpp/Tools/WorkerThread.h
+++ b/native/cpp/CommonCpp/Tools/WorkerThread.h
@@ -1,6 +1,6 @@
#pragma once
-#include <folly/MPMCQueue.h>
+#include "MPMCQueue.h"
#include <memory>
#include <string>
#include <thread>
@@ -11,7 +11,7 @@
class WorkerThread {
std::unique_ptr<std::thread> thread;
- folly::MPMCQueue<std::unique_ptr<taskType>> tasks;
+ MPMCQueue<std::unique_ptr<taskType>> tasks;
const std::string name;
public:
diff --git a/native/cpp/CommonCpp/Tools/WorkerThread.cpp b/native/cpp/CommonCpp/Tools/WorkerThread.cpp
--- a/native/cpp/CommonCpp/Tools/WorkerThread.cpp
+++ b/native/cpp/CommonCpp/Tools/WorkerThread.cpp
@@ -5,11 +5,11 @@
namespace comm {
WorkerThread::WorkerThread(const std::string name)
- : tasks(folly::MPMCQueue<std::unique_ptr<taskType>>(500)), name(name) {
+ : tasks(MPMCQueue<std::unique_ptr<taskType>>(500)), name(name) {
auto job = [this]() {
while (true) {
std::unique_ptr<taskType> lastTask;
- this->tasks.blockingRead(lastTask);
+ this->tasks.pop(lastTask);
if (lastTask == nullptr) {
break;
}
@@ -20,7 +20,7 @@
}
void WorkerThread::scheduleTask(const taskType task) {
- if (!this->tasks.write(std::make_unique<taskType>(std::move(task)))) {
+ if (!this->tasks.try_push(std::make_unique<taskType>(std::move(task)))) {
std::string errorMessage{
"Error scheduling task on the " + this->name + " worker thread"};
Logger::log(errorMessage);
@@ -29,7 +29,7 @@
}
WorkerThread::~WorkerThread() {
- this->tasks.blockingWrite(nullptr);
+ this->tasks.push(nullptr);
try {
this->thread->join();
} catch (const std::system_error &error) {

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 6, 5:21 PM (18 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5839770
Default Alt Text
D14970.1765041711.diff (3 KB)

Event Timeline