diff --git a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp index 58648a942..975182f30 100644 --- a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp +++ b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp @@ -1,51 +1,51 @@ #include "BackupServiceImpl.h" #include "CreateNewBackupReactor.h" #include "PullBackupReactor.h" #include "RecoverBackupKeyReactor.h" #include "SendLogReactor.h" #include namespace comm { namespace network { BackupServiceImpl::BackupServiceImpl() { Aws::InitAPI({}); } BackupServiceImpl::~BackupServiceImpl() { Aws::ShutdownAPI({}); } grpc::ServerBidiReactor< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> * BackupServiceImpl::CreateNewBackup(grpc::CallbackServerContext *context) { return new reactor::CreateNewBackupReactor(); } grpc::ServerReadReactor *BackupServiceImpl::SendLog( grpc::CallbackServerContext *context, google::protobuf::Empty *response) { return new reactor::SendLogReactor(response); } grpc::ServerBidiReactor< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> * BackupServiceImpl::RecoverBackupKey(grpc::CallbackServerContext *context) { return new reactor::RecoverBackupKeyReactor(); } grpc::ServerWriteReactor * BackupServiceImpl::PullBackup( grpc::CallbackServerContext *context, const backup::PullBackupRequest *request) { reactor::PullBackupReactor *reactor = new reactor::PullBackupReactor(request); - reactor->NextWrite(); + reactor->start(); return reactor; } } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h index 9bb380d5e..32d396029 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -1,121 +1,128 @@ #include namespace comm { namespace network { namespace reactor { template class ClientBidiReactorBase : public grpc::ClientBidiReactor { std::shared_ptr response = nullptr; bool terminated = false; bool done = false; bool initialized = 0; + void nextWrite(); + protected: Request request; grpc::Status status = grpc::Status::OK; public: grpc::ClientContext context; - void nextWrite(); + void start(); void terminate(const grpc::Status &status); bool isTerminated(); bool isDone(); void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } +template +void ClientBidiReactorBase::start() { + this->nextWrite(); +} + template void ClientBidiReactorBase::terminate( const grpc::Status &status) { if (this->status.ok()) { this->status = status; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } if (this->terminated) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } this->StartWritesDone(); this->terminated = true; } template bool ClientBidiReactorBase::isTerminated() { return this->terminated; } template bool ClientBidiReactorBase::isDone() { return this->done; } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } this->nextWrite(); } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); this->done = true; this->doneCallback(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h index 3792ae359..de0757b39 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -1,102 +1,108 @@ #include namespace comm { namespace network { namespace reactor { template class ClientWriteReactorBase : public grpc::ClientWriteReactor { grpc::Status status = grpc::Status::OK; bool done = false; bool terminated = false; bool initialized = 0; Request request; + void nextWrite(); + public: Response response; grpc::ClientContext context; - void nextWrite(); void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); bool isDone(); bool isTerminated(); void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest(Request &request) = 0; virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } +template +void ClientWriteReactorBase::start() { + this->nextWrite(); +} + template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { if (this->status.ok()) { this->status = status; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } if (this->terminated) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } this->terminated = true; this->StartWritesDone(); } template bool ClientWriteReactorBase::isDone() { return this->done; } template bool ClientWriteReactorBase::isTerminated() { return this->terminated; } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); this->done = true; this->doneCallback(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h index 8496afa52..2f17bc25d 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,110 +1,116 @@ #pragma once #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerWriteReactorBase : public grpc::ServerWriteReactor { Response response; bool initialized = false; std::atomic finished = false; void terminate(grpc::Status status); + void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; grpc::Status status; public: ServerWriteReactorBase(const Request *request); - virtual void NextWrite(); + void start(); void OnDone() override; void OnWriteDone(bool ok) override; virtual std::unique_ptr writeResponse(Response *response) = 0; virtual void initialize(){}; virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ServerWriteReactorBase::terminate(grpc::Status status) { this->status = status; try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } if (this->finished) { return; } this->Finish(this->status); this->finished = true; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { - // we cannot call this->NextWrite() here because it's going to call it on + // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function - // NextWrite has to be exposed as a public function and called explicitly + // start has to be exposed as a public function and called explicitly // to initialize writing } template -void ServerWriteReactorBase::NextWrite() { +void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } +template +void ServerWriteReactorBase::start() { + this->nextWrite(); +} + template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } - this->NextWrite(); + this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/grpc-client/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/grpc-client/ServiceBlobClient.h index b53d7f93a..08869a329 100644 --- a/services/backup/docker-server/contents/server/src/grpc-client/ServiceBlobClient.h +++ b/services/backup/docker-server/contents/server/src/grpc-client/ServiceBlobClient.h @@ -1,52 +1,52 @@ #pragma once #include "BlobGetClientReactor.h" #include "BlobPutClientReactor.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { class ServiceBlobClient { std::unique_ptr stub; public: ServiceBlobClient() { // todo handle different types of connection(e.g. load balancer) std::string targetStr = "blob-server:50051"; std::shared_ptr channel = grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); this->stub = blob::BlobService::NewStub(channel); } void put(std::shared_ptr putReactor) { if (putReactor == nullptr) { throw std::runtime_error( "put reactor is being used but has not been initialized"); } this->stub->async()->Put(&putReactor->context, &(*putReactor)); - putReactor->nextWrite(); + putReactor->start(); } void get(std::shared_ptr getReactor) { if (getReactor == nullptr) { throw std::runtime_error( "get reactor is being used but has not been initialized"); } this->stub->async()->Get( &getReactor->context, &getReactor->request, &(*getReactor)); getReactor->start(); } // void remove(const std::string &holder); }; } // namespace network } // namespace comm diff --git a/services/blob/src/BlobServiceImpl.cpp b/services/blob/src/BlobServiceImpl.cpp index 3bc78389f..7c0321aa6 100644 --- a/services/blob/src/BlobServiceImpl.cpp +++ b/services/blob/src/BlobServiceImpl.cpp @@ -1,107 +1,107 @@ #include "BlobServiceImpl.h" #include "AwsTools.h" #include "Constants.h" #include "DatabaseManager.h" #include "MultiPartUploader.h" #include "Tools.h" #include "GetReactor.h" #include "PutReactor.h" #include #include namespace comm { namespace network { BlobServiceImpl::BlobServiceImpl() { Aws::InitAPI({}); if (!getBucket(BLOB_BUCKET_NAME).isAvailable()) { throw std::runtime_error("bucket " + BLOB_BUCKET_NAME + " not available"); } } BlobServiceImpl::~BlobServiceImpl() { Aws::ShutdownAPI({}); } void BlobServiceImpl::verifyBlobHash( const std::string &expectedBlobHash, const database::S3Path &s3Path) { const std::string computedBlobHash = computeHashForFile(s3Path); if (expectedBlobHash != computedBlobHash) { throw std::runtime_error( "blob hash mismatch, expected: [" + expectedBlobHash + "], computed: [" + computedBlobHash + "]"); } } void BlobServiceImpl::assignVariableIfEmpty( const std::string &label, std::string &lvalue, const std::string &rvalue) { if (!lvalue.empty()) { throw std::runtime_error( "multiple assignment for variable " + label + " is not allowed"); } lvalue = rvalue; } grpc::ServerBidiReactor * BlobServiceImpl::Put(grpc::CallbackServerContext *context) { return new reactor::PutReactor(); } grpc::ServerWriteReactor *BlobServiceImpl::Get( grpc::CallbackServerContext *context, const blob::GetRequest *request) { reactor::GetReactor *gr = new reactor::GetReactor(request); - gr->NextWrite(); + gr->start(); return gr; } grpc::ServerUnaryReactor *BlobServiceImpl::Remove( grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) { grpc::Status status = grpc::Status::OK; const std::string holder = request->holder(); try { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error("no item found for holder: " + holder); } // TODO handle cleanup here properly // for now the object's being removed right away const std::string blobHash = reverseIndexItem->getBlobHash(); if (!database::DatabaseManager::getInstance().removeReverseIndexItem( holder)) { throw std::runtime_error( "could not remove an item for holder " + holder + "(probably does not exist)"); } if (database::DatabaseManager::getInstance() .findReverseIndexItemsByHash(reverseIndexItem->getBlobHash()) .size() == 0) { database::S3Path s3Path = findS3Path(*reverseIndexItem); AwsS3Bucket bucket = getBucket(s3Path.getBucketName()); bucket.removeObject(s3Path.getObjectName()); database::DatabaseManager::getInstance().removeBlobItem(blobHash); } } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } auto *reactor = context->DefaultReactor(); reactor->Finish(status); return reactor; } } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h index 8496afa52..2f17bc25d 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,110 +1,116 @@ #pragma once #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerWriteReactorBase : public grpc::ServerWriteReactor { Response response; bool initialized = false; std::atomic finished = false; void terminate(grpc::Status status); + void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; grpc::Status status; public: ServerWriteReactorBase(const Request *request); - virtual void NextWrite(); + void start(); void OnDone() override; void OnWriteDone(bool ok) override; virtual std::unique_ptr writeResponse(Response *response) = 0; virtual void initialize(){}; virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ServerWriteReactorBase::terminate(grpc::Status status) { this->status = status; try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } if (this->finished) { return; } this->Finish(this->status); this->finished = true; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { - // we cannot call this->NextWrite() here because it's going to call it on + // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function - // NextWrite has to be exposed as a public function and called explicitly + // start has to be exposed as a public function and called explicitly // to initialize writing } template -void ServerWriteReactorBase::NextWrite() { +void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } +template +void ServerWriteReactorBase::start() { + this->nextWrite(); +} + template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } - this->NextWrite(); + this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm