Page MenuHomePhabricator

D3488.diff
No OneTemporary

D3488.diff

diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
@@ -9,6 +9,7 @@
#include <folly/MPMCQueue.h>
#include <grpcpp/grpcpp.h>
+#include <condition_variable>
#include <iostream>
#include <memory>
#include <string>
@@ -33,21 +34,28 @@
const size_t chunkSize =
GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
folly::MPMCQueue<std::string> dataChunks;
+ std::condition_variable *terminationNotifier;
public:
- BlobPutClientReactor(const std::string &holder, const std::string &hash);
+ BlobPutClientReactor(
+ const std::string &holder,
+ const std::string &hash,
+ std::condition_variable *terminationNotifier);
void scheduleSendingDataChunk(std::unique_ptr<std::string> dataChunk);
std::unique_ptr<grpc::Status> prepareRequest(
blob::PutRequest &request,
std::shared_ptr<blob::PutResponse> previousResponse) override;
+ void doneCallback() override;
};
BlobPutClientReactor::BlobPutClientReactor(
const std::string &holder,
- const std::string &hash)
+ const std::string &hash,
+ std::condition_variable *terminationNotifier)
: holder(holder),
hash(hash),
- dataChunks(folly::MPMCQueue<std::string>(100)) {
+ dataChunks(folly::MPMCQueue<std::string>(100)),
+ terminationNotifier(terminationNotifier) {
}
void BlobPutClientReactor::scheduleSendingDataChunk(
@@ -83,6 +91,10 @@
return nullptr;
}
+void BlobPutClientReactor::doneCallback() {
+ this->terminationNotifier->notify_one();
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
--- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
@@ -17,31 +17,22 @@
class ServiceBlobClient {
std::unique_ptr<blob::BlobService::Stub> stub;
+public:
ServiceBlobClient() {
- // TODO: handle other types of connection
+ // todo handle different types of connection(e.g. load balancer)
std::string targetStr = "blob-server:50051";
std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials());
this->stub = blob::BlobService::NewStub(channel);
}
-public:
- static ServiceBlobClient &getInstance() {
- // todo consider threads
- static ServiceBlobClient instance;
- return instance;
- }
-
- std::unique_ptr<reactor::BlobPutClientReactor> putReactor;
-
- void put(const std::string &holder, const std::string &hash) {
- if (this->putReactor != nullptr && !this->putReactor->isDone()) {
+ void put(std::shared_ptr<reactor::BlobPutClientReactor> putReactor) {
+ if (putReactor == nullptr) {
throw std::runtime_error(
- "trying to run reactor while the previous one is not finished yet");
+ "put reactor is being used but has not been initialized");
}
- this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash));
- this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor));
- this->putReactor->nextWrite();
+ this->stub->async()->Put(&putReactor->context, &(*putReactor));
+ putReactor->nextWrite();
}
// void get(const std::string &holder);
// void remove(const std::string &holder);
diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h
@@ -7,7 +7,9 @@
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
+#include <condition_variable>
#include <memory>
+#include <mutex>
#include <string>
namespace comm {
@@ -27,6 +29,11 @@
std::string keyEntropy;
std::string dataHash;
std::string backupID;
+ std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
+ ServiceBlobClient blobClient;
+ std::mutex reactorStateMutex;
+ std::condition_variable blobDoneCV;
+ std::mutex blobDoneCVMutex;
std::string generateBackupID();
@@ -45,6 +52,9 @@
std::unique_ptr<ServerBidiReactorStatus> CreateNewBackupReactor::handleRequest(
backup::CreateNewBackupRequest request,
backup::CreateNewBackupResponse *response) {
+ // we make sure that the blob client's state is flushed to the main memory
+ // as there may be multiple threads from the pool taking over here
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
switch (this->state) {
case State::KEY_ENTROPY: {
if (!request.has_keyentropy()) {
@@ -64,20 +74,14 @@
// TODO confirm - holder may be a backup id
this->backupID = this->generateBackupID();
- ServiceBlobClient::getInstance().put(this->backupID, this->dataHash);
+ this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
+ this->backupID, this->dataHash, &this->blobDoneCV);
+ this->blobClient.put(this->putReactor);
return nullptr;
}
case State::DATA_CHUNKS: {
- // TODO initialize blob client reactor
- if (ServiceBlobClient::getInstance().putReactor == nullptr) {
- throw std::runtime_error(
- "blob client reactor has not been initialized");
- }
-
- ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk(
- std::make_unique<std::string>(
- std::move(*request.mutable_newcompactionchunk())));
-
+ this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(
+ std::move(*request.mutable_newcompactionchunk())));
return nullptr;
}
}
@@ -85,8 +89,10 @@
}
void CreateNewBackupReactor::doneCallback() {
- ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk(
- std::make_unique<std::string>(""));
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
+ this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
+ std::unique_lock<std::mutex> lock2(this->blobDoneCVMutex);
+ this->blobDoneCV.wait(lock2);
}
} // namespace reactor

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 1, 7:20 PM (21 h, 49 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2400556
Default Alt Text
D3488.diff (6 KB)

Event Timeline