diff --git a/services/blob/src/Reactors/server/PutReactor.h b/services/blob/src/Reactors/server/PutReactor.h new file mode 100644 --- /dev/null +++ b/services/blob/src/Reactors/server/PutReactor.h @@ -0,0 +1,92 @@ +#pragma once + +#include "ServerBidiReactorBase.h" + +#include "../_generated/blob.grpc.pb.h" +#include "../_generated/blob.pb.h" + +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +class PutReactor + : public ServerBidiReactorBase { + std::string holder; + std::string blobHash; + std::string currentChunk; + std::unique_ptr s3Path; + std::shared_ptr blobItem; + std::unique_ptr uploader; + +public: + std::unique_ptr handleRequest( + blob::PutRequest request, + blob::PutResponse *response) override { + if (this->holder.empty()) { + if (request.holder().empty()) { + throw std::runtime_error("holder has not been provided"); + } + this->holder = request.holder(); + return nullptr; + } + if (this->blobHash.empty()) { + if (request.blobhash().empty()) { + throw std::runtime_error("blob hash has not been provided"); + } + this->blobHash = request.blobhash(); + this->blobItem = + database::DatabaseManager::getInstance().findBlobItem(this->blobHash); + if (this->blobItem != nullptr) { + this->s3Path = + std::make_unique(this->blobItem->getS3Path()); + response->set_dataexists(true); + return std::make_unique( + grpc::Status::OK, true); + } + this->s3Path = std::make_unique( + generateS3Path(BLOB_BUCKET_NAME, this->blobHash)); + this->blobItem = + std::make_shared(this->blobHash, *s3Path); + response->set_dataexists(false); + return nullptr; + } + if (request.datachunk().empty()) { + return std::make_unique(grpc::Status( + grpc::StatusCode::INVALID_ARGUMENT, "data chunk expected")); + } + if (this->uploader == nullptr) { + this->uploader = std::make_unique( + getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName()); + } + this->currentChunk += request.datachunk(); + if (this->currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { + this->uploader->addPart(this->currentChunk); + this->currentChunk.clear(); + } + return nullptr; + } + + void terminateCallback() override { + if (!this->status.status.ok()) { + return; + } + if (!this->readingAborted || this->uploader == nullptr) { + throw std::runtime_error(this->status.status.error_message()); + } + if (!currentChunk.empty()) { + this->uploader->addPart(this->currentChunk); + } + this->uploader->finishUpload(); + database::DatabaseManager::getInstance().putBlobItem(*this->blobItem); + const database::ReverseIndexItem reverseIndexItem(holder, this->blobHash); + database::DatabaseManager::getInstance().putReverseIndexItem( + reverseIndexItem); + } +}; + +} // namespace reactor +} // namespace network +} // namespace comm