Page MenuHomePhabricator

D3488.id10730.diff
No OneTemporary

D3488.id10730.diff

diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
@@ -9,6 +9,7 @@
#include <folly/MPMCQueue.h>
#include <grpcpp/grpcpp.h>
+#include <condition_variable>
#include <iostream>
#include <memory>
#include <string>
@@ -34,9 +35,13 @@
const size_t chunkSize =
GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
folly::MPMCQueue<std::string> dataChunks;
+ std::condition_variable *terminationNotifier;
public:
- BlobPutClientReactor(const std::string &holder, const std::string &hash);
+ BlobPutClientReactor(
+ const std::string &holder,
+ const std::string &hash,
+ std::condition_variable *terminationNotifier);
void scheduleSendingDataChunk(std::string &dataChunk);
std::unique_ptr<grpc::Status> prepareRequest(
blob::PutRequest &request,
@@ -46,25 +51,23 @@
BlobPutClientReactor::BlobPutClientReactor(
const std::string &holder,
- const std::string &hash)
+ const std::string &hash,
+ std::condition_variable *terminationNotifier)
: holder(holder),
hash(hash),
- dataChunks(folly::MPMCQueue<std::string>(100)) {
+ dataChunks(folly::MPMCQueue<std::string>(100)),
+ terminationNotifier(terminationNotifier) {
}
void BlobPutClientReactor::scheduleSendingDataChunk(std::string &dataChunk) {
std::cout << "[BC] here schedule sending data chunks 1: "
<< std::hash<std::thread::id>{}(std::this_thread::get_id())
<< std::endl;
- // std::cout << "here schedule sending data chunks 1.1" << std::endl;
- // std::unique_ptr<std::string> upt = std::make_unique<std::string>(str);
- // std::cout << "here schedule sending data chunks 1.2" << std::endl;
if (!this->dataChunks.write(std::move(dataChunk))) {
std::cout << "here schedule sending data chunks 2" << std::endl;
throw std::runtime_error(
"Error scheduling sending a data chunk to send to the blob service");
}
- // this->dataChunks.blockingWrite(std::move(str));
std::cout << "[BC] here schedule sending data chunks 3" << std::endl;
}
@@ -108,6 +111,7 @@
void BlobPutClientReactor::doneCallback() {
std::cout << "[BC] blob put client done " << this->status.error_code() << "/"
<< this->status.error_message() << std::endl;
+ this->terminationNotifier->notify_one();
}
} // namespace reactor
diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
--- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
@@ -17,31 +17,23 @@
class ServiceBlobClient {
std::unique_ptr<blob::BlobService::Stub> stub;
+public:
ServiceBlobClient() {
+ // todo handle different types of connection(e.g. load balancer)
std::string targetStr = "blob-server:50051";
std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials());
this->stub = blob::BlobService::NewStub(channel);
}
-public:
- static ServiceBlobClient &getInstance() {
- // todo consider threads
- static ServiceBlobClient instance;
- return instance;
- }
-
- std::unique_ptr<reactor::BlobPutClientReactor> putReactor;
-
- void put(const std::string &holder, const std::string &hash) {
+ void put(std::shared_ptr<reactor::BlobPutClientReactor> putReactor) {
std::cout << "blob client - put initialize" << std::endl;
- if (this->putReactor != nullptr && !this->putReactor->isDone()) {
+ if (putReactor == nullptr) {
throw std::runtime_error(
- "trying to run reactor while the previous one is not finished yet");
+ "put reactor is being used but has not been initialized");
}
- this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash));
- this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor));
- this->putReactor->nextWrite();
+ this->stub->async()->Put(&putReactor->context, &(*putReactor));
+ putReactor->nextWrite();
}
// void get(const std::string &holder);
// void remove(const std::string &holder);
diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h
@@ -7,8 +7,10 @@
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
+#include <condition_variable>
#include <iostream>
#include <memory>
+#include <mutex>
#include <string>
#include <chrono>
@@ -31,6 +33,11 @@
std::string keyEntropy;
std::string dataHash;
std::string backupID;
+ std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
+ ServiceBlobClient blobClient;
+ std::mutex blobPutClientReactorMutex;
+ std::condition_variable waitingForBlobClientCV;
+ std::mutex waitingForBlobClientCVMutex;
std::string generateBackupID();
@@ -39,6 +46,10 @@
backup::CreateNewBackupRequest request,
backup::CreateNewBackupResponse *response) override;
void doneCallback();
+
+ virtual ~CreateNewBackupReactor() {
+ std::cout << "[CNR] DTOR" << std::endl;
+ }
};
std::string CreateNewBackupReactor::generateBackupID() {
@@ -49,6 +60,9 @@
std::unique_ptr<ServerBidiReactorStatus> CreateNewBackupReactor::handleRequest(
backup::CreateNewBackupRequest request,
backup::CreateNewBackupResponse *response) {
+ // we make sure that the blob client's state is flushed to the main memory
+ // as there may be multiple threads from the pool taking over here
+ const std::lock_guard<std::mutex> lock(this->blobPutClientReactorMutex);
std::cout << "[CNR] here handle request" << std::endl;
switch (this->state) {
case State::KEY_ENTROPY: {
@@ -70,20 +84,17 @@
// TODO confirm - holder may be a backup id
this->backupID = this->generateBackupID();
- ServiceBlobClient::getInstance().put(this->backupID, this->dataHash);
+ this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
+ this->backupID, this->dataHash, &this->waitingForBlobClientCV);
+ this->blobClient.put(this->putReactor);
return nullptr;
}
case State::DATA_CHUNKS: {
std::cout << "[CNR] here handle request data chunk "
<< request.newcompactionchunk().size() << std::endl;
- // TODO initialize blob client reactor
- if (ServiceBlobClient::getInstance().putReactor == nullptr) {
- throw std::runtime_error(
- "blob client reactor has not been initialized");
- }
std::cout << "[CNR] here enqueueing data chunk" << std::endl;
- ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk(
+ this->putReactor->scheduleSendingDataChunk(
*request.mutable_newcompactionchunk());
return nullptr;
@@ -93,12 +104,17 @@
}
void CreateNewBackupReactor::doneCallback() {
+ const std::lock_guard<std::mutex> lock(this->blobPutClientReactorMutex);
std::cout << "[CNR] create new backup done " << this->status.status.error_code()
<< "/" << this->status.status.error_message() << std::endl;
std::cout << "[CNR] enqueueing empty chunk to end blob upload" << std::endl;
std::string emptyString = "";
- ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk(
- emptyString);
+ this->putReactor->scheduleSendingDataChunk(emptyString);
+ std::cout << "[CNR] waiting for the blob client to complete" << std::endl;
+ std::unique_lock<std::mutex> lock2(this->waitingForBlobClientCVMutex);
+ this->waitingForBlobClientCV.wait(lock2);
+ std::cout << "[CNR] the blob client to completed, CNR can exit gracefully"
+ << std::endl;
}
} // namespace reactor

File Metadata

Mime Type
text/plain
Expires
Sun, Nov 24, 2:38 PM (17 h, 39 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2576382
Default Alt Text
D3488.id10730.diff (8 KB)

Event Timeline