Page MenuHomePhabricator

D3471.id10547.diff
No OneTemporary

D3471.id10547.diff

diff --git a/services/blob/docker-server/contents/server/src/BlobServiceImpl.h b/services/blob/docker-server/contents/server/src/BlobServiceImpl.h
--- a/services/blob/docker-server/contents/server/src/BlobServiceImpl.h
+++ b/services/blob/docker-server/contents/server/src/BlobServiceImpl.h
@@ -14,7 +14,7 @@
namespace comm {
namespace network {
-class BlobServiceImpl final : public blob::BlobService::Service {
+class BlobServiceImpl final : public blob::BlobService::CallbackService {
void verifyBlobHash(
const std::string &expectedBlobHash,
const database::S3Path &s3Path);
@@ -27,16 +27,13 @@
BlobServiceImpl();
virtual ~BlobServiceImpl();
- grpc::Status
- Put(grpc::ServerContext *context,
- grpc::ServerReaderWriter<blob::PutResponse, blob::PutRequest> *stream)
- override;
- grpc::Status
- Get(grpc::ServerContext *context,
- const blob::GetRequest *request,
- grpc::ServerWriter<blob::GetResponse> *writer) override;
- grpc::Status Remove(
- grpc::ServerContext *context,
+ grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
+ Put(grpc::CallbackServerContext *context) override;
+ grpc::ServerWriteReactor<blob::GetResponse> *
+ Get(grpc::CallbackServerContext *context,
+ const blob::GetRequest *request) override;
+ grpc::ServerUnaryReactor *Remove(
+ grpc::CallbackServerContext *context,
const blob::RemoveRequest *request,
google::protobuf::Empty *response) override;
};
diff --git a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp
--- a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp
+++ b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp
@@ -6,6 +6,9 @@
#include "MultiPartUploader.h"
#include "Tools.h"
+#include "GetReactor.h"
+#include "PutReactor.h"
+
#include <iostream>
#include <memory>
@@ -46,112 +49,25 @@
lvalue = rvalue;
}
-grpc::Status BlobServiceImpl::Put(
- grpc::ServerContext *context,
- grpc::ServerReaderWriter<blob::PutResponse, blob::PutRequest> *stream) {
- blob::PutRequest request;
- std::string holder;
- std::string receivedBlobHash;
- std::unique_ptr<database::S3Path> s3Path;
- std::shared_ptr<database::BlobItem> blobItem;
- std::unique_ptr<MultiPartUploader> uploader;
- std::string currentChunk;
- blob::PutResponse response;
- try {
- while (stream->Read(&request)) {
- const std::string requestHolder = request.holder();
- const std::string requestBlobHash = request.blobhash();
- const std::string receivedDataChunk = request.datachunk();
- if (requestHolder.size()) {
- assignVariableIfEmpty("holder", holder, requestHolder);
- } else if (requestBlobHash.size()) {
- assignVariableIfEmpty("blob hash", receivedBlobHash, requestBlobHash);
- } else if (receivedDataChunk.size()) {
- if (s3Path == nullptr) {
- throw std::runtime_error(
- "S3 path or/and MPU has not been created but data "
- "chunks are being pushed");
- }
- if (uploader == nullptr) {
- uploader = std::make_unique<MultiPartUploader>(
- getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName());
- }
- currentChunk += receivedDataChunk;
- if (currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) {
- uploader->addPart(currentChunk);
- currentChunk.clear();
- }
- }
- if (holder.size() && receivedBlobHash.size() && s3Path == nullptr) {
- blobItem = database::DatabaseManager::getInstance().findBlobItem(
- receivedBlobHash);
- if (blobItem != nullptr) {
- s3Path = std::make_unique<database::S3Path>(blobItem->getS3Path());
- response.set_dataexists(true);
- stream->Write(response);
- break;
- }
- s3Path = std::make_unique<database::S3Path>(
- generateS3Path(BLOB_BUCKET_NAME, receivedBlobHash));
- response.set_dataexists(false);
- stream->Write(response);
- }
- }
- if (!currentChunk.empty()) {
- uploader->addPart(currentChunk);
- }
- if (blobItem == nullptr) {
- uploader->finishUpload();
- }
- this->verifyBlobHash(receivedBlobHash, *s3Path);
- if (blobItem == nullptr) {
- blobItem =
- std::make_shared<database::BlobItem>(receivedBlobHash, *s3Path);
- database::DatabaseManager::getInstance().putBlobItem(*blobItem);
- }
- const database::ReverseIndexItem reverseIndexItem(holder, receivedBlobHash);
- database::DatabaseManager::getInstance().putReverseIndexItem(
- reverseIndexItem);
- } catch (std::runtime_error &e) {
- std::cout << "error: " << e.what() << std::endl;
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- return grpc::Status::OK;
+grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
+BlobServiceImpl::Put(grpc::CallbackServerContext *context) {
+ return new reactor::PutReactor();
}
-grpc::Status BlobServiceImpl::Get(
- grpc::ServerContext *context,
- const blob::GetRequest *request,
- grpc::ServerWriter<blob::GetResponse> *writer) {
- const std::string holder = request->holder();
- try {
- database::S3Path s3Path = findS3Path(holder);
-
- AwsS3Bucket bucket = getBucket(s3Path.getBucketName());
- blob::GetResponse response;
- std::function<void(const std::string &)> callback =
- [&response, &writer](const std::string &chunk) {
- response.set_datachunk(chunk);
- if (!writer->Write(response)) {
- throw std::runtime_error("writer interrupted sending data");
- }
- };
+grpc::ServerWriteReactor<blob::GetResponse> *BlobServiceImpl::Get(
+ grpc::CallbackServerContext *context,
+ const blob::GetRequest *request) {
- bucket.getObjectDataChunks(
- s3Path.getObjectName(),
- callback,
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE);
- } catch (std::runtime_error &e) {
- std::cout << "error: " << e.what() << std::endl;
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- return grpc::Status::OK;
+ reactor::GetReactor *gr = new reactor::GetReactor(request);
+ gr->NextWrite();
+ return gr;
}
-grpc::Status BlobServiceImpl::Remove(
- grpc::ServerContext *context,
+grpc::ServerUnaryReactor *BlobServiceImpl::Remove(
+ grpc::CallbackServerContext *context,
const blob::RemoveRequest *request,
google::protobuf::Empty *response) {
+ grpc::Status status = grpc::Status::OK;
const std::string holder = request->holder();
try {
std::shared_ptr<database::ReverseIndexItem> reverseIndexItem =
@@ -180,9 +96,11 @@
}
} catch (std::runtime_error &e) {
std::cout << "error: " << e.what() << std::endl;
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
}
- return grpc::Status::OK;
+ auto *reactor = context->DefaultReactor();
+ reactor->Finish(status);
+ return reactor;
}
} // namespace network

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 1, 9:35 PM (22 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2402059
Default Alt Text
D3471.id10547.diff (7 KB)

Event Timeline