diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index 84f686e25..5114fa18f 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,139 +1,124 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include "blob_client/src/lib.rs.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { if (this->deviceID.empty()) { throw std::runtime_error( "trying to generate a backup ID with an empty device ID"); } return this->deviceID + std::to_string(tools::getCurrentTimestamp()); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::DEVICE_ID; return nullptr; } case State::DEVICE_ID: { if (!request.has_deviceid()) { throw std::runtime_error("device id expected but not received"); } this->deviceID = request.deviceid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; this->backupID = this->generateBackupID(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) != nullptr) { throw std::runtime_error( "Backup with id [" + this->backupID + "] for user [" + this->userID + "] already exists, creation aborted"); } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); put_client_initialize_cxx(this->holder.c_str()); put_client_write_cxx( this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->holder.c_str()); - put_client_blocking_read_cxx( - this->holder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->holder.c_str()); put_client_write_cxx( this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->dataHash.c_str()); - rust::String responseStr = put_client_blocking_read_cxx( - this->holder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = + put_client_blocking_read_cxx(this->holder.c_str()); // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique( grpc::Status::OK, true); } return nullptr; } case State::DATA_CHUNKS: { if (request.mutable_newcompactionchunk()->empty()) { return std::make_unique(grpc::Status::OK); } put_client_write_cxx( this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::string(std::move(*request.mutable_newcompactionchunk())) .c_str()); - put_client_blocking_read_cxx( - this->holder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->holder.c_str()); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); put_client_terminate_cxx(this->holder.c_str()); // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, tools::getCurrentTimestamp(), tools::generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h index b3bcf9bd4..14a675787 100644 --- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h @@ -1,172 +1,182 @@ #pragma once #include "BaseReactor.h" +#include "ThreadPool.h" #include #include #include #include +#include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; // This is how this type of reactor works: // - repeat: // - read a request from the client // - write a response to the client // - terminate the connection template class ServerBidiReactorBase : public grpc::ServerBidiReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); // these methods come from the BaseReactor(go there for more information) void terminate(const grpc::Status &status) override; void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; void terminate(ServerBidiReactorStatus status); ServerBidiReactorStatus getStatus() const; void setStatus(const ServerBidiReactorStatus &status); // - argument request - request that was sent by the client and received by // the server in the current cycle // - argument response - response that will be sent to the client in the // current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerBidiReactorBase::terminate( const grpc::Status &status) { this->terminate(ServerBidiReactorStatus(status)); } template void ServerBidiReactorBase::OnDone() { this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { this->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::exception &e) { - this->setStatus(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - if (this->getStatus().sendLastResponse) { - this->StartWriteAndFinish( - &this->response, grpc::WriteOptions(), this->getStatus().status); - } else { - this->Finish(this->getStatus().status); - } - this->statusHolder->state = ReactorState::TERMINATED; + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->terminateCallback(); + this->validate(); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->setStatus(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err)))); + } + if (this->statusHolder->state != ReactorState::RUNNING) { + return; + } + if (this->getStatus().sendLastResponse) { + this->StartWriteAndFinish( + &this->response, grpc::WriteOptions(), this->getStatus().status); + } else { + this->Finish(this->getStatus().status); + } + this->statusHolder->state = ReactorState::TERMINATED; + }); } template ServerBidiReactorStatus ServerBidiReactorBase::getStatus() const { return this->status; } template void ServerBidiReactorBase::setStatus( const ServerBidiReactorStatus &status) { this->status = status; } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } - try { - this->response = Response(); - std::unique_ptr status = - this->handleRequest(this->request, &this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::exception &e) { - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); - } + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->response = Response(); + std::unique_ptr status = + this->handleRequest(this->request, &this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, *err))); + } + }); } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } template std::shared_ptr ServerBidiReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm