diff --git a/services/blob/src/BlobServiceImpl.cpp b/services/blob/src/BlobServiceImpl.cpp index 725fd4d36..3bc78389f 100644 --- a/services/blob/src/BlobServiceImpl.cpp +++ b/services/blob/src/BlobServiceImpl.cpp @@ -1,189 +1,107 @@ #include "BlobServiceImpl.h" #include "AwsTools.h" #include "Constants.h" #include "DatabaseManager.h" #include "MultiPartUploader.h" #include "Tools.h" +#include "GetReactor.h" +#include "PutReactor.h" + #include #include namespace comm { namespace network { BlobServiceImpl::BlobServiceImpl() { Aws::InitAPI({}); if (!getBucket(BLOB_BUCKET_NAME).isAvailable()) { throw std::runtime_error("bucket " + BLOB_BUCKET_NAME + " not available"); } } BlobServiceImpl::~BlobServiceImpl() { Aws::ShutdownAPI({}); } void BlobServiceImpl::verifyBlobHash( const std::string &expectedBlobHash, const database::S3Path &s3Path) { const std::string computedBlobHash = computeHashForFile(s3Path); if (expectedBlobHash != computedBlobHash) { throw std::runtime_error( "blob hash mismatch, expected: [" + expectedBlobHash + "], computed: [" + computedBlobHash + "]"); } } void BlobServiceImpl::assignVariableIfEmpty( const std::string &label, std::string &lvalue, const std::string &rvalue) { if (!lvalue.empty()) { throw std::runtime_error( "multiple assignment for variable " + label + " is not allowed"); } lvalue = rvalue; } -grpc::Status BlobServiceImpl::Put( - grpc::ServerContext *context, - grpc::ServerReaderWriter *stream) { - blob::PutRequest request; - std::string holder; - std::string receivedBlobHash; - std::unique_ptr s3Path; - std::shared_ptr blobItem; - std::unique_ptr uploader; - std::string currentChunk; - blob::PutResponse response; - try { - while (stream->Read(&request)) { - const std::string requestHolder = request.holder(); - const std::string requestBlobHash = request.blobhash(); - const std::string receivedDataChunk = request.datachunk(); - if (requestHolder.size()) { - assignVariableIfEmpty("holder", holder, requestHolder); - } else if (requestBlobHash.size()) { - assignVariableIfEmpty("blob hash", receivedBlobHash, requestBlobHash); - } else if (receivedDataChunk.size()) { - if (s3Path == nullptr) { - throw std::runtime_error( - "S3 path or/and MPU has not been created but data " - "chunks are being pushed"); - } - if (uploader == nullptr) { - uploader = std::make_unique( - getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName()); - } - currentChunk += receivedDataChunk; - if (currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { - uploader->addPart(currentChunk); - currentChunk.clear(); - } - } - if (holder.size() && receivedBlobHash.size() && s3Path == nullptr) { - blobItem = database::DatabaseManager::getInstance().findBlobItem( - receivedBlobHash); - if (blobItem != nullptr) { - s3Path = std::make_unique(blobItem->getS3Path()); - response.set_dataexists(true); - stream->Write(response); - break; - } - s3Path = std::make_unique( - generateS3Path(BLOB_BUCKET_NAME, receivedBlobHash)); - response.set_dataexists(false); - stream->Write(response); - } - } - if (!currentChunk.empty()) { - uploader->addPart(currentChunk); - } - if (blobItem == nullptr) { - uploader->finishUpload(); - } - this->verifyBlobHash(receivedBlobHash, *s3Path); - if (blobItem == nullptr) { - blobItem = - std::make_shared(receivedBlobHash, *s3Path); - database::DatabaseManager::getInstance().putBlobItem(*blobItem); - } - const database::ReverseIndexItem reverseIndexItem(holder, receivedBlobHash); - database::DatabaseManager::getInstance().putReverseIndexItem( - reverseIndexItem); - } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - return grpc::Status::OK; +grpc::ServerBidiReactor * +BlobServiceImpl::Put(grpc::CallbackServerContext *context) { + return new reactor::PutReactor(); } -grpc::Status BlobServiceImpl::Get( - grpc::ServerContext *context, - const blob::GetRequest *request, - grpc::ServerWriter *writer) { - const std::string holder = request->holder(); - try { - database::S3Path s3Path = findS3Path(holder); - - AwsS3Bucket bucket = getBucket(s3Path.getBucketName()); - blob::GetResponse response; - std::function callback = - [&response, &writer](const std::string &chunk) { - response.set_datachunk(chunk); - if (!writer->Write(response)) { - throw std::runtime_error("writer interrupted sending data"); - } - }; +grpc::ServerWriteReactor *BlobServiceImpl::Get( + grpc::CallbackServerContext *context, + const blob::GetRequest *request) { - bucket.getObjectDataChunks( - s3Path.getObjectName(), - callback, - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE); - } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - return grpc::Status::OK; + reactor::GetReactor *gr = new reactor::GetReactor(request); + gr->NextWrite(); + return gr; } -grpc::Status BlobServiceImpl::Remove( - grpc::ServerContext *context, +grpc::ServerUnaryReactor *BlobServiceImpl::Remove( + grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) { + grpc::Status status = grpc::Status::OK; const std::string holder = request->holder(); try { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error("no item found for holder: " + holder); } // TODO handle cleanup here properly // for now the object's being removed right away const std::string blobHash = reverseIndexItem->getBlobHash(); if (!database::DatabaseManager::getInstance().removeReverseIndexItem( holder)) { throw std::runtime_error( "could not remove an item for holder " + holder + "(probably does not exist)"); } if (database::DatabaseManager::getInstance() .findReverseIndexItemsByHash(reverseIndexItem->getBlobHash()) .size() == 0) { database::S3Path s3Path = findS3Path(*reverseIndexItem); AwsS3Bucket bucket = getBucket(s3Path.getBucketName()); bucket.removeObject(s3Path.getObjectName()); database::DatabaseManager::getInstance().removeBlobItem(blobHash); } } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } - return grpc::Status::OK; + auto *reactor = context->DefaultReactor(); + reactor->Finish(status); + return reactor; } } // namespace network } // namespace comm diff --git a/services/blob/src/BlobServiceImpl.h b/services/blob/src/BlobServiceImpl.h index 76ceecd46..3f1bcefde 100644 --- a/services/blob/src/BlobServiceImpl.h +++ b/services/blob/src/BlobServiceImpl.h @@ -1,45 +1,42 @@ #pragma once #include "S3Path.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include namespace comm { namespace network { -class BlobServiceImpl final : public blob::BlobService::Service { +class BlobServiceImpl final : public blob::BlobService::CallbackService { void verifyBlobHash( const std::string &expectedBlobHash, const database::S3Path &s3Path); void assignVariableIfEmpty( const std::string &label, std::string &lvalue, const std::string &rvalue); public: BlobServiceImpl(); virtual ~BlobServiceImpl(); - grpc::Status - Put(grpc::ServerContext *context, - grpc::ServerReaderWriter *stream) - override; - grpc::Status - Get(grpc::ServerContext *context, - const blob::GetRequest *request, - grpc::ServerWriter *writer) override; - grpc::Status Remove( - grpc::ServerContext *context, + grpc::ServerBidiReactor * + Put(grpc::CallbackServerContext *context) override; + grpc::ServerWriteReactor * + Get(grpc::CallbackServerContext *context, + const blob::GetRequest *request) override; + grpc::ServerUnaryReactor *Remove( + grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) override; }; } // namespace network } // namespace comm