diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index e6a6c0745..11a498a29 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,199 +1,179 @@ #include "SendLogReactor.h" #include "blob_client/src/lib.rs.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { bool storedInBlob = this->persistenceMethod == PersistenceMethod::BLOB; database::LogItem logItem( this->backupID, this->logID, storedInBlob, storedInBlob ? this->blobHolder : this->value, {}, this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { throw std::runtime_error( "trying to put into the database an item with size " + std::to_string(database::LogItem::getItemSize(&logItem)) + " that exceeds the limit " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT)); } database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { return backupID + tools::ID_SEPARATOR + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutClient() { if (this->blobHolder.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty blob holder"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } put_client_initialize_cxx(this->blobHolder.c_str()); } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) == nullptr) { throw std::runtime_error( "trying to send log for a non-existent backup"); } this->logID = this->generateLogID(this->backupID); this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } if (request.mutable_logdata()->size() == 0) { return std::make_unique(grpc::Status::OK); } if (this->persistenceMethod == PersistenceMethod::DB) { throw std::runtime_error( "please do not send multiple tiny chunks (less than " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + "), merge them into bigger parts instead"); } if (this->persistenceMethod == PersistenceMethod::BLOB) { put_client_write_cxx( this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), request.mutable_logdata()->c_str()); - put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->blobHolder.c_str()); return nullptr; } this->value += std::move(*request.mutable_logdata()); database::LogItem logItem = database::LogItem( this->backupID, this->logID, true, this->value, "", this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { this->persistenceMethod = PersistenceMethod::BLOB; this->blobHolder = tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutClient(); put_client_write_cxx( this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->blobHolder.c_str()); - put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->blobHolder.c_str()); put_client_write_cxx( this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->hash.c_str()); - rust::String responseStr = put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = + put_client_blocking_read_cxx(this->blobHolder.c_str()); // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique(grpc::Status::OK); } put_client_write_cxx( this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(this->value).c_str()); - put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->blobHolder.c_str()); this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); put_client_terminate_cxx(this->blobHolder.c_str()); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } if (this->persistenceMethod != PersistenceMethod::BLOB && this->persistenceMethod != PersistenceMethod::DB) { throw std::runtime_error("Invalid persistence method detected"); } if (this->persistenceMethod == PersistenceMethod::DB) { this->storeInDatabase(); return; } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h index 8169db7d9..604bacb3f 100644 --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h @@ -1,120 +1,130 @@ #pragma once #include "BaseReactor.h" +#include "ThreadPool.h" -#include #include +#include #include #include #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - read N requests from the client // - write a final response to the client (may be empty) // - terminate the connection template class ServerReadReactorBase : public grpc::ServerReadReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; protected: Response *response; public: ServerReadReactorBase(Response *response); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; // - argument request - data read from the client in the current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr readRequest(Request request) = 0; }; template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } - try { - std::unique_ptr status = this->readRequest(this->request); - if (status != nullptr) { - this->terminate(*status); - return; - } - } catch (std::exception &e) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - return; - } - this->StartRead(&this->request); + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + std::unique_ptr status = this->readRequest(this->request); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartRead(&this->request); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + }); } template void ServerReadReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::exception &e) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; + + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->terminateCallback(); + this->validate(); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->statusHolder->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + if (!this->statusHolder->getStatus().ok()) { + LOG(ERROR) << this->statusHolder->getStatus().error_message(); + } + if (this->statusHolder->state != ReactorState::RUNNING) { + return; + } + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; + }); } template void ServerReadReactorBase::OnDone() { this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr ServerReadReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm