Page MenuHomePhabricator

D3521.id10731.diff
No OneTemporary

D3521.id10731.diff

diff --git a/services/blob/src/Reactors/server/PutReactor.h b/services/blob/src/Reactors/server/PutReactor.h
new file mode 100644
--- /dev/null
+++ b/services/blob/src/Reactors/server/PutReactor.h
@@ -0,0 +1,101 @@
+#pragma once
+
+#include "ServerBidiReactorBase.h"
+
+#include "../_generated/blob.grpc.pb.h"
+#include "../_generated/blob.pb.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+namespace comm {
+namespace network {
+namespace reactor {
+
+class PutReactor
+ : public ServerBidiReactorBase<blob::PutRequest, blob::PutResponse> {
+ std::string holder;
+ std::string blobHash;
+ std::string currentChunk;
+ std::unique_ptr<database::S3Path> s3Path;
+ std::shared_ptr<database::BlobItem> blobItem;
+ std::unique_ptr<MultiPartUploader> uploader;
+
+public:
+ std::unique_ptr<ServerBidiReactorStatus> handleRequest(
+ blob::PutRequest request,
+ blob::PutResponse *response) override {
+ if (this->holder.empty()) {
+ if (request.holder().empty()) {
+ throw std::runtime_error("holder has not been provided");
+ }
+ this->holder = request.holder();
+ return nullptr;
+ }
+ if (this->blobHash.empty()) {
+ if (request.blobhash().empty()) {
+ throw std::runtime_error("blob hash has not been provided");
+ }
+ this->blobHash = request.blobhash();
+ this->blobItem =
+ database::DatabaseManager::getInstance().findBlobItem(this->blobHash);
+ if (this->blobItem != nullptr) {
+ this->s3Path =
+ std::make_unique<database::S3Path>(this->blobItem->getS3Path());
+ response->set_dataexists(true);
+ std::cout << "data does exist" << std::endl;
+ return std::make_unique<ServerBidiReactorStatus>(grpc::Status::OK, true);
+ }
+ this->s3Path = std::make_unique<database::S3Path>(
+ generateS3Path(BLOB_BUCKET_NAME, this->blobHash));
+ this->blobItem =
+ std::make_shared<database::BlobItem>(this->blobHash, *s3Path);
+ response->set_dataexists(false);
+ std::cout << "data does not exist" << std::endl;
+ return nullptr;
+ }
+ if (request.datachunk().empty()) {
+ return std::make_unique<ServerBidiReactorStatus>(grpc::Status(
+ grpc::StatusCode::INVALID_ARGUMENT, "data chunk expected"));
+ }
+ if (this->uploader == nullptr) {
+ std::cout << "initialize uploader" << std::endl;
+ this->uploader = std::make_unique<MultiPartUploader>(
+ getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName());
+ }
+ std::cout << "adding chunk to current chunk" << std::endl;
+ this->currentChunk += request.datachunk();
+ if (this->currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) {
+ std::cout << "adding chunk to uploader" << std::endl;
+ this->uploader->addPart(this->currentChunk);
+ this->currentChunk.clear();
+ }
+ return nullptr;
+ }
+
+ void doneCallback() override {
+ std::cout << "done callback " << this->status.status.error_code() << "/"
+ << this->status.status.error_message() << std::endl;
+ if (!this->status.status.error_code()) {
+ return;
+ }
+ if (!this->readingAborted || this->uploader == nullptr) {
+ throw std::runtime_error(this->status.status.error_message());
+ }
+ std::cout << "finalizing uploader" << std::endl;
+ if (!currentChunk.empty()) {
+ this->uploader->addPart(this->currentChunk);
+ }
+ this->uploader->finishUpload();
+ std::cout << "adding records to the database" << std::endl;
+ database::DatabaseManager::getInstance().putBlobItem(*this->blobItem);
+ const database::ReverseIndexItem reverseIndexItem(holder, this->blobHash);
+ database::DatabaseManager::getInstance().putReverseIndexItem(
+ reverseIndexItem);
+ }
+};
+
+} // namespace reactor
+} // namespace network
+} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 6:23 AM (21 h, 53 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2593051
Default Alt Text
D3521.id10731.diff (3 KB)

Event Timeline