diff --git a/services/blob/src/BlobServiceImpl.h b/services/blob/src/BlobServiceImpl.h --- a/services/blob/src/BlobServiceImpl.h +++ b/services/blob/src/BlobServiceImpl.h @@ -14,7 +14,7 @@ namespace comm { namespace network { -class BlobServiceImpl final : public blob::BlobService::Service { +class BlobServiceImpl final : public blob::BlobService::CallbackService { void verifyBlobHash( const std::string &expectedBlobHash, const database::S3Path &s3Path); @@ -27,16 +27,13 @@ BlobServiceImpl(); virtual ~BlobServiceImpl(); - grpc::Status - Put(grpc::ServerContext *context, - grpc::ServerReaderWriter *stream) - override; - grpc::Status - Get(grpc::ServerContext *context, - const blob::GetRequest *request, - grpc::ServerWriter *writer) override; - grpc::Status Remove( - grpc::ServerContext *context, + grpc::ServerBidiReactor * + Put(grpc::CallbackServerContext *context) override; + grpc::ServerWriteReactor * + Get(grpc::CallbackServerContext *context, + const blob::GetRequest *request) override; + grpc::ServerUnaryReactor *Remove( + grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) override; }; diff --git a/services/blob/src/BlobServiceImpl.cpp b/services/blob/src/BlobServiceImpl.cpp --- a/services/blob/src/BlobServiceImpl.cpp +++ b/services/blob/src/BlobServiceImpl.cpp @@ -6,6 +6,9 @@ #include "MultiPartUploader.h" #include "Tools.h" +#include "GetReactor.h" +#include "PutReactor.h" + #include #include @@ -46,112 +49,25 @@ lvalue = rvalue; } -grpc::Status BlobServiceImpl::Put( - grpc::ServerContext *context, - grpc::ServerReaderWriter *stream) { - blob::PutRequest request; - std::string holder; - std::string receivedBlobHash; - std::unique_ptr s3Path; - std::shared_ptr blobItem; - std::unique_ptr uploader; - std::string currentChunk; - blob::PutResponse response; - try { - while (stream->Read(&request)) { - const std::string requestHolder = request.holder(); - const std::string requestBlobHash = request.blobhash(); - const std::string receivedDataChunk = request.datachunk(); - if (requestHolder.size()) { - assignVariableIfEmpty("holder", holder, requestHolder); - } else if (requestBlobHash.size()) { - assignVariableIfEmpty("blob hash", receivedBlobHash, requestBlobHash); - } else if (receivedDataChunk.size()) { - if (s3Path == nullptr) { - throw std::runtime_error( - "S3 path or/and MPU has not been created but data " - "chunks are being pushed"); - } - if (uploader == nullptr) { - uploader = std::make_unique( - getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName()); - } - currentChunk += receivedDataChunk; - if (currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { - uploader->addPart(currentChunk); - currentChunk.clear(); - } - } - if (holder.size() && receivedBlobHash.size() && s3Path == nullptr) { - blobItem = database::DatabaseManager::getInstance().findBlobItem( - receivedBlobHash); - if (blobItem != nullptr) { - s3Path = std::make_unique(blobItem->getS3Path()); - response.set_dataexists(true); - stream->Write(response); - break; - } - s3Path = std::make_unique( - generateS3Path(BLOB_BUCKET_NAME, receivedBlobHash)); - response.set_dataexists(false); - stream->Write(response); - } - } - if (!currentChunk.empty()) { - uploader->addPart(currentChunk); - } - if (blobItem == nullptr) { - uploader->finishUpload(); - } - this->verifyBlobHash(receivedBlobHash, *s3Path); - if (blobItem == nullptr) { - blobItem = - std::make_shared(receivedBlobHash, *s3Path); - database::DatabaseManager::getInstance().putBlobItem(*blobItem); - } - const database::ReverseIndexItem reverseIndexItem(holder, receivedBlobHash); - database::DatabaseManager::getInstance().putReverseIndexItem( - reverseIndexItem); - } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - return grpc::Status::OK; +grpc::ServerBidiReactor * +BlobServiceImpl::Put(grpc::CallbackServerContext *context) { + return new reactor::PutReactor(); } -grpc::Status BlobServiceImpl::Get( - grpc::ServerContext *context, - const blob::GetRequest *request, - grpc::ServerWriter *writer) { - const std::string holder = request->holder(); - try { - database::S3Path s3Path = findS3Path(holder); - - AwsS3Bucket bucket = getBucket(s3Path.getBucketName()); - blob::GetResponse response; - std::function callback = - [&response, &writer](const std::string &chunk) { - response.set_datachunk(chunk); - if (!writer->Write(response)) { - throw std::runtime_error("writer interrupted sending data"); - } - }; +grpc::ServerWriteReactor *BlobServiceImpl::Get( + grpc::CallbackServerContext *context, + const blob::GetRequest *request) { - bucket.getObjectDataChunks( - s3Path.getObjectName(), - callback, - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE); - } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - return grpc::Status::OK; + reactor::GetReactor *gr = new reactor::GetReactor(request); + gr->NextWrite(); + return gr; } -grpc::Status BlobServiceImpl::Remove( - grpc::ServerContext *context, +grpc::ServerUnaryReactor *BlobServiceImpl::Remove( + grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) { + grpc::Status status = grpc::Status::OK; const std::string holder = request->holder(); try { std::shared_ptr reverseIndexItem = @@ -180,9 +96,11 @@ } } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } - return grpc::Status::OK; + auto *reactor = context->DefaultReactor(); + reactor->Finish(status); + return reactor; } } // namespace network