Page MenuHomePhorge

D3466.1769011201.diff
No OneTemporary

Size
4 KB
Referenced Files
None
Subscribers
None

D3466.1769011201.diff

diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
new file mode 100644
--- /dev/null
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
@@ -0,0 +1,91 @@
+#pragma once
+
+#include "ClientBidiReactorBase.h"
+#include "Constants.h"
+
+#include "../_generated/blob.grpc.pb.h"
+#include "../_generated/blob.pb.h"
+
+#include <folly/MPMCQueue.h>
+#include <grpcpp/grpcpp.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+namespace comm {
+namespace network {
+namespace reactor {
+
+class BlobPutClientReactor
+ : public ClientBidiReactorBase<blob::PutRequest, blob::PutResponse> {
+
+ enum class State {
+ SEND_HOLDER = 0,
+ SEND_HASH = 1,
+ SEND_CHUNKS = 2,
+ };
+
+ State state = State::SEND_HOLDER;
+ const std::string hash;
+ const std::string holder;
+ size_t currentDataSize = 0;
+ const size_t chunkSize =
+ GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
+ folly::MPMCQueue<std::string> dataChunks;
+
+public:
+ BlobPutClientReactor(const std::string &holder, const std::string &hash);
+ void scheduleSendingDataChunk(const std::string &dataChunk);
+ std::unique_ptr<grpc::Status> prepareRequest(
+ blob::PutRequest &request,
+ std::shared_ptr<blob::PutResponse> previousResponse) override;
+};
+
+BlobPutClientReactor::BlobPutClientReactor(
+ const std::string &holder,
+ const std::string &hash)
+ : holder(holder),
+ hash(hash),
+ dataChunks(folly::MPMCQueue<std::string>(100)) {
+}
+
+void BlobPutClientReactor::scheduleSendingDataChunk(
+ const std::string &dataChunk) {
+ // TODO: we may be copying a big chunk of data, but `write` seems to only
+ // accept `std::move`
+ std::string str = std::string(dataChunk);
+ if (!this->dataChunks.write(std::move(str))) {
+ throw std::runtime_error(
+ "Error scheduling sending a data chunk to send to the blob service");
+ }
+}
+
+std::unique_ptr<grpc::Status> BlobPutClientReactor::prepareRequest(
+ blob::PutRequest &request,
+ std::shared_ptr<blob::PutResponse> previousResponse) {
+ if (this->state == State::SEND_HOLDER) {
+ this->request.set_holder(this->holder);
+ this->state = State::SEND_HASH;
+ return nullptr;
+ }
+ if (this->state == State::SEND_HASH) {
+ request.set_blobhash(this->hash);
+ this->state = State::SEND_CHUNKS;
+ return nullptr;
+ }
+ if (previousResponse->dataexists()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ std::string dataChunk;
+ this->dataChunks.blockingRead(dataChunk);
+ if (dataChunk.empty()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ request.set_datachunk(dataChunk);
+ return nullptr;
+}
+
+} // namespace reactor
+} // namespace network
+} // namespace comm
diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
new file mode 100644
--- /dev/null
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include "BlobPutClientReactor.h"
+
+#include "../_generated/blob.grpc.pb.h"
+#include "../_generated/blob.pb.h"
+
+#include <grpcpp/grpcpp.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+namespace comm {
+namespace network {
+
+class ServiceBlobClient {
+ std::unique_ptr<blob::BlobService::Stub> stub;
+
+ ServiceBlobClient() {
+ // TODO: handle other types of connection
+ std::string targetStr = "blob-server:50051";
+ std::shared_ptr<grpc::Channel> channel =
+ grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials());
+ this->stub = blob::BlobService::NewStub(channel);
+ }
+
+public:
+ static ServiceBlobClient &getInstance() {
+ // todo consider threads
+ static ServiceBlobClient instance;
+ return instance;
+ }
+
+ std::unique_ptr<reactor::BlobPutClientReactor> putReactor;
+
+ void put(const std::string &holder, const std::string &hash) {
+ if (this->putReactor != nullptr && !this->putReactor->isDone()) {
+ throw std::runtime_error(
+ "trying to run reactor while the previous one is not finished yet");
+ }
+ this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash));
+ this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor));
+ this->putReactor->nextWrite();
+ }
+ // void get(const std::string &holder);
+ // void remove(const std::string &holder);
+};
+
+} // namespace network
+} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Wed, Jan 21, 4:00 PM (11 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5968026
Default Alt Text
D3466.1769011201.diff (4 KB)

Event Timeline