Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3115707
D3471.id10547.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Referenced Files
None
Subscribers
None
D3471.id10547.diff
View Options
diff --git a/services/blob/docker-server/contents/server/src/BlobServiceImpl.h b/services/blob/docker-server/contents/server/src/BlobServiceImpl.h
--- a/services/blob/docker-server/contents/server/src/BlobServiceImpl.h
+++ b/services/blob/docker-server/contents/server/src/BlobServiceImpl.h
@@ -14,7 +14,7 @@
namespace comm {
namespace network {
-class BlobServiceImpl final : public blob::BlobService::Service {
+class BlobServiceImpl final : public blob::BlobService::CallbackService {
void verifyBlobHash(
const std::string &expectedBlobHash,
const database::S3Path &s3Path);
@@ -27,16 +27,13 @@
BlobServiceImpl();
virtual ~BlobServiceImpl();
- grpc::Status
- Put(grpc::ServerContext *context,
- grpc::ServerReaderWriter<blob::PutResponse, blob::PutRequest> *stream)
- override;
- grpc::Status
- Get(grpc::ServerContext *context,
- const blob::GetRequest *request,
- grpc::ServerWriter<blob::GetResponse> *writer) override;
- grpc::Status Remove(
- grpc::ServerContext *context,
+ grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
+ Put(grpc::CallbackServerContext *context) override;
+ grpc::ServerWriteReactor<blob::GetResponse> *
+ Get(grpc::CallbackServerContext *context,
+ const blob::GetRequest *request) override;
+ grpc::ServerUnaryReactor *Remove(
+ grpc::CallbackServerContext *context,
const blob::RemoveRequest *request,
google::protobuf::Empty *response) override;
};
diff --git a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp
--- a/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp
+++ b/services/blob/docker-server/contents/server/src/BlobServiceImpl.cpp
@@ -6,6 +6,9 @@
#include "MultiPartUploader.h"
#include "Tools.h"
+#include "GetReactor.h"
+#include "PutReactor.h"
+
#include <iostream>
#include <memory>
@@ -46,112 +49,25 @@
lvalue = rvalue;
}
-grpc::Status BlobServiceImpl::Put(
- grpc::ServerContext *context,
- grpc::ServerReaderWriter<blob::PutResponse, blob::PutRequest> *stream) {
- blob::PutRequest request;
- std::string holder;
- std::string receivedBlobHash;
- std::unique_ptr<database::S3Path> s3Path;
- std::shared_ptr<database::BlobItem> blobItem;
- std::unique_ptr<MultiPartUploader> uploader;
- std::string currentChunk;
- blob::PutResponse response;
- try {
- while (stream->Read(&request)) {
- const std::string requestHolder = request.holder();
- const std::string requestBlobHash = request.blobhash();
- const std::string receivedDataChunk = request.datachunk();
- if (requestHolder.size()) {
- assignVariableIfEmpty("holder", holder, requestHolder);
- } else if (requestBlobHash.size()) {
- assignVariableIfEmpty("blob hash", receivedBlobHash, requestBlobHash);
- } else if (receivedDataChunk.size()) {
- if (s3Path == nullptr) {
- throw std::runtime_error(
- "S3 path or/and MPU has not been created but data "
- "chunks are being pushed");
- }
- if (uploader == nullptr) {
- uploader = std::make_unique<MultiPartUploader>(
- getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName());
- }
- currentChunk += receivedDataChunk;
- if (currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) {
- uploader->addPart(currentChunk);
- currentChunk.clear();
- }
- }
- if (holder.size() && receivedBlobHash.size() && s3Path == nullptr) {
- blobItem = database::DatabaseManager::getInstance().findBlobItem(
- receivedBlobHash);
- if (blobItem != nullptr) {
- s3Path = std::make_unique<database::S3Path>(blobItem->getS3Path());
- response.set_dataexists(true);
- stream->Write(response);
- break;
- }
- s3Path = std::make_unique<database::S3Path>(
- generateS3Path(BLOB_BUCKET_NAME, receivedBlobHash));
- response.set_dataexists(false);
- stream->Write(response);
- }
- }
- if (!currentChunk.empty()) {
- uploader->addPart(currentChunk);
- }
- if (blobItem == nullptr) {
- uploader->finishUpload();
- }
- this->verifyBlobHash(receivedBlobHash, *s3Path);
- if (blobItem == nullptr) {
- blobItem =
- std::make_shared<database::BlobItem>(receivedBlobHash, *s3Path);
- database::DatabaseManager::getInstance().putBlobItem(*blobItem);
- }
- const database::ReverseIndexItem reverseIndexItem(holder, receivedBlobHash);
- database::DatabaseManager::getInstance().putReverseIndexItem(
- reverseIndexItem);
- } catch (std::runtime_error &e) {
- std::cout << "error: " << e.what() << std::endl;
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- return grpc::Status::OK;
+grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
+BlobServiceImpl::Put(grpc::CallbackServerContext *context) {
+ return new reactor::PutReactor();
}
-grpc::Status BlobServiceImpl::Get(
- grpc::ServerContext *context,
- const blob::GetRequest *request,
- grpc::ServerWriter<blob::GetResponse> *writer) {
- const std::string holder = request->holder();
- try {
- database::S3Path s3Path = findS3Path(holder);
-
- AwsS3Bucket bucket = getBucket(s3Path.getBucketName());
- blob::GetResponse response;
- std::function<void(const std::string &)> callback =
- [&response, &writer](const std::string &chunk) {
- response.set_datachunk(chunk);
- if (!writer->Write(response)) {
- throw std::runtime_error("writer interrupted sending data");
- }
- };
+grpc::ServerWriteReactor<blob::GetResponse> *BlobServiceImpl::Get(
+ grpc::CallbackServerContext *context,
+ const blob::GetRequest *request) {
- bucket.getObjectDataChunks(
- s3Path.getObjectName(),
- callback,
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE);
- } catch (std::runtime_error &e) {
- std::cout << "error: " << e.what() << std::endl;
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- return grpc::Status::OK;
+ reactor::GetReactor *gr = new reactor::GetReactor(request);
+ gr->NextWrite();
+ return gr;
}
-grpc::Status BlobServiceImpl::Remove(
- grpc::ServerContext *context,
+grpc::ServerUnaryReactor *BlobServiceImpl::Remove(
+ grpc::CallbackServerContext *context,
const blob::RemoveRequest *request,
google::protobuf::Empty *response) {
+ grpc::Status status = grpc::Status::OK;
const std::string holder = request->holder();
try {
std::shared_ptr<database::ReverseIndexItem> reverseIndexItem =
@@ -180,9 +96,11 @@
}
} catch (std::runtime_error &e) {
std::cout << "error: " << e.what() << std::endl;
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
}
- return grpc::Status::OK;
+ auto *reactor = context->DefaultReactor();
+ reactor->Finish(status);
+ return reactor;
}
} // namespace network
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Nov 1, 9:35 PM (22 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2402059
Default Alt Text
D3471.id10547.diff (7 KB)
Attached To
Mode
D3471: [services] Blob - Update service implementation
Attached
Detach File
Event Timeline
Log In to Comment