diff --git a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp --- a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp +++ b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp @@ -6,6 +6,10 @@ #include "MultiPartUploader.h" #include "Tools.h" +#include "BidiReactorBase.h" + +#include + #include #include @@ -48,7 +52,93 @@ grpc::ServerBidiReactor * BlobServiceImpl::Put(grpc::CallbackServerContext *context) { - return nullptr; + class PutReactor + : public BidiReactorBase { + std::string holder; + std::string blobHash; + std::string currentChunk; + std::unique_ptr s3Path; + std::shared_ptr blobItem; + std::unique_ptr uploader; + + public: + std::unique_ptr handleRequest( + blob::PutRequest request, + blob::PutResponse *response) override { + if (this->holder.empty()) { + if (request.holder().empty()) { + throw std::runtime_error("holder has not been provided"); + } + this->holder = request.holder(); + return nullptr; + } + if (this->blobHash.empty()) { + if (request.blobhash().empty()) { + throw std::runtime_error("blob hash has not been provided"); + } + this->blobHash = request.blobhash(); + this->blobItem = database::DatabaseManager::getInstance().findBlobItem( + this->blobHash); + if (this->blobItem != nullptr) { + this->s3Path = + std::make_unique(this->blobItem->getS3Path()); + response->set_dataexists(true); + std::cout << "data does exist" << std::endl; + this->sendLastResponse = true; + return std::make_unique(grpc::Status::OK); + } + this->s3Path = std::make_unique( + generateS3Path(BLOB_BUCKET_NAME, this->blobHash)); + this->blobItem = + std::make_shared(this->blobHash, *s3Path); + response->set_dataexists(false); + std::cout << "data does not exist" << std::endl; + return nullptr; + } + if (!request.datachunk().empty()) { + if (this->uploader == nullptr) { + std::cout << "initialize uploader" << std::endl; + this->uploader = std::make_unique( + getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName()); + } + std::cout << "adding chunk to current chunk" << std::endl; + this->currentChunk += request.datachunk(); + if (this->currentChunk.size() > + AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { + std::cout << "adding chunk to uploader" << std::endl; + this->uploader->addPart(this->currentChunk); + this->currentChunk.clear(); + } + return nullptr; + } + return std::make_unique(grpc::Status( + grpc::StatusCode::INVALID_ARGUMENT, "data chunk expected")); + } + + void doneCallback() override { + std::cout << "done callback " << this->status.error_code() << "/" + << this->status.error_message() << std::endl; + if (this->status.error_code()) { + if (this->readingAborted && this->uploader != nullptr) { + std::cout << "finalizing uploader" << std::endl; + if (!currentChunk.empty()) { + this->uploader->addPart(this->currentChunk); + } + this->uploader->finishUpload(); + std::cout << "adding records to the database" << std::endl; + database::DatabaseManager::getInstance().putBlobItem(*this->blobItem); + const database::ReverseIndexItem reverseIndexItem( + holder, this->blobHash); + database::DatabaseManager::getInstance().putReverseIndexItem( + reverseIndexItem); + } else { + throw std::runtime_error(this->status.error_message()); + } + } + } + }; + + return new PutReactor(); } grpc::ServerWriteReactor *BlobServiceImpl::Get( diff --git a/services/blob/docker-server/contents/server/src/Reactors/BidiReactorBase.h b/services/blob/docker-server/contents/server/src/Reactors/BidiReactorBase.h --- a/services/blob/docker-server/contents/server/src/Reactors/BidiReactorBase.h +++ b/services/blob/docker-server/contents/server/src/Reactors/BidiReactorBase.h @@ -16,6 +16,7 @@ protected: grpc::Status status; bool readingAborted = false; + bool sendLastResponse = false; public: BidiReactorBase(); @@ -47,7 +48,11 @@ template void BidiReactorBase::terminate(grpc::Status status) { this->status = status; - this->Finish(status); + if (this->sendLastResponse) { + this->StartWriteAndFinish(&this->response, grpc::WriteOptions(), status); + } else { + this->Finish(status); + } } template