Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F33430727
D3466.1769011201.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
4 KB
Referenced Files
None
Subscribers
None
D3466.1769011201.diff
View Options
diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
new file mode 100644
--- /dev/null
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h
@@ -0,0 +1,91 @@
+#pragma once
+
+#include "ClientBidiReactorBase.h"
+#include "Constants.h"
+
+#include "../_generated/blob.grpc.pb.h"
+#include "../_generated/blob.pb.h"
+
+#include <folly/MPMCQueue.h>
+#include <grpcpp/grpcpp.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+namespace comm {
+namespace network {
+namespace reactor {
+
+class BlobPutClientReactor
+ : public ClientBidiReactorBase<blob::PutRequest, blob::PutResponse> {
+
+ enum class State {
+ SEND_HOLDER = 0,
+ SEND_HASH = 1,
+ SEND_CHUNKS = 2,
+ };
+
+ State state = State::SEND_HOLDER;
+ const std::string hash;
+ const std::string holder;
+ size_t currentDataSize = 0;
+ const size_t chunkSize =
+ GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
+ folly::MPMCQueue<std::string> dataChunks;
+
+public:
+ BlobPutClientReactor(const std::string &holder, const std::string &hash);
+ void scheduleSendingDataChunk(const std::string &dataChunk);
+ std::unique_ptr<grpc::Status> prepareRequest(
+ blob::PutRequest &request,
+ std::shared_ptr<blob::PutResponse> previousResponse) override;
+};
+
+BlobPutClientReactor::BlobPutClientReactor(
+ const std::string &holder,
+ const std::string &hash)
+ : holder(holder),
+ hash(hash),
+ dataChunks(folly::MPMCQueue<std::string>(100)) {
+}
+
+void BlobPutClientReactor::scheduleSendingDataChunk(
+ const std::string &dataChunk) {
+ // TODO: we may be copying a big chunk of data, but `write` seems to only
+ // accept `std::move`
+ std::string str = std::string(dataChunk);
+ if (!this->dataChunks.write(std::move(str))) {
+ throw std::runtime_error(
+ "Error scheduling sending a data chunk to send to the blob service");
+ }
+}
+
+std::unique_ptr<grpc::Status> BlobPutClientReactor::prepareRequest(
+ blob::PutRequest &request,
+ std::shared_ptr<blob::PutResponse> previousResponse) {
+ if (this->state == State::SEND_HOLDER) {
+ this->request.set_holder(this->holder);
+ this->state = State::SEND_HASH;
+ return nullptr;
+ }
+ if (this->state == State::SEND_HASH) {
+ request.set_blobhash(this->hash);
+ this->state = State::SEND_CHUNKS;
+ return nullptr;
+ }
+ if (previousResponse->dataexists()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ std::string dataChunk;
+ this->dataChunks.blockingRead(dataChunk);
+ if (dataChunk.empty()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ request.set_datachunk(dataChunk);
+ return nullptr;
+}
+
+} // namespace reactor
+} // namespace network
+} // namespace comm
diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
new file mode 100644
--- /dev/null
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include "BlobPutClientReactor.h"
+
+#include "../_generated/blob.grpc.pb.h"
+#include "../_generated/blob.pb.h"
+
+#include <grpcpp/grpcpp.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+namespace comm {
+namespace network {
+
+class ServiceBlobClient {
+ std::unique_ptr<blob::BlobService::Stub> stub;
+
+ ServiceBlobClient() {
+ // TODO: handle other types of connection
+ std::string targetStr = "blob-server:50051";
+ std::shared_ptr<grpc::Channel> channel =
+ grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials());
+ this->stub = blob::BlobService::NewStub(channel);
+ }
+
+public:
+ static ServiceBlobClient &getInstance() {
+ // todo consider threads
+ static ServiceBlobClient instance;
+ return instance;
+ }
+
+ std::unique_ptr<reactor::BlobPutClientReactor> putReactor;
+
+ void put(const std::string &holder, const std::string &hash) {
+ if (this->putReactor != nullptr && !this->putReactor->isDone()) {
+ throw std::runtime_error(
+ "trying to run reactor while the previous one is not finished yet");
+ }
+ this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash));
+ this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor));
+ this->putReactor->nextWrite();
+ }
+ // void get(const std::string &holder);
+ // void remove(const std::string &holder);
+};
+
+} // namespace network
+} // namespace comm
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jan 21, 4:00 PM (11 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5968026
Default Alt Text
D3466.1769011201.diff (4 KB)
Attached To
Mode
D3466: [services] Backup - Add blob client
Attached
Detach File
Event Timeline
Log In to Comment