diff --git a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp --- a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp +++ b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp @@ -7,6 +7,7 @@ #include "Tools.h" #include "BidiReactorBase.h" +#include "WriteReactorBase.h" #include @@ -144,7 +145,76 @@ grpc::ServerWriteReactor *BlobServiceImpl::Get( grpc::CallbackServerContext *context, const blob::GetRequest *request) { - return nullptr; + class GetReactor + : public WriteReactorBase { + size_t offset = 0; + size_t fileSize = 0; + const size_t chunkSize = + GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; + database::S3Path s3Path; + Aws::S3::Model::GetObjectRequest getRequest; + + public: + using WriteReactorBase:: + WriteReactorBase; + + std::unique_ptr + writeResponse(blob::GetResponse *response) override { + if (this->offset >= this->fileSize) { + return std::make_unique(grpc::Status::OK); + } + + const size_t nextSize = + std::min(this->chunkSize, this->fileSize - this->offset); + + std::string range = "bytes=" + std::to_string(this->offset) + "-" + + std::to_string(this->offset + nextSize); + this->getRequest.SetRange(range); + + Aws::S3::Model::GetObjectOutcome getOutcome = + getS3Client()->GetObject(this->getRequest); + if (!getOutcome.IsSuccess()) { + return std::make_unique( + grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage()); + } + + Aws::IOStream &retrievedFile = + getOutcome.GetResultWithOwnership().GetBody(); + std::string result; + result.resize(nextSize); + retrievedFile.get((char *)result.data(), nextSize + 1); + response->set_datachunk(result); + + this->offset += nextSize; + return nullptr; + } + + void initialize() override { + this->s3Path = findS3Path(this->request.holder()); + this->fileSize = getBucket(s3Path.getBucketName()) + .getObjectSize(s3Path.getObjectName()); + + this->getRequest.SetBucket(this->s3Path.getBucketName()); + this->getRequest.SetKey(this->s3Path.getObjectName()); + + AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName()); + if (!bucket.isAvailable()) { + throw std::runtime_error( + "bucket [" + this->s3Path.getBucketName() + "] not available"); + } + const size_t fileSize = + bucket.getObjectSize(this->s3Path.getObjectName()); + if (this->fileSize == 0) { + throw std::runtime_error("object empty"); + } + }; + + void doneCallback() override{}; + }; + + GetReactor *gr = new GetReactor(request); + gr->NextWrite(); + return gr; } grpc::ServerUnaryReactor *BlobServiceImpl::Remove(