diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -102,12 +102,7 @@ this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), request.mutable_logdata()->c_str()); - put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->blobHolder.c_str()); return nullptr; } @@ -124,23 +119,13 @@ this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->blobHolder.c_str()); - put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->blobHolder.c_str()); put_client_write_cxx( this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->hash.c_str()); - rust::String responseStr = put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = + put_client_blocking_read_cxx(this->blobHolder.c_str()); // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique(grpc::Status::OK); @@ -149,12 +134,7 @@ this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(this->value).c_str()); - put_client_blocking_read_cxx( - this->blobHolder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->blobHolder.c_str()); this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h @@ -1,13 +1,15 @@ #pragma once #include "BaseReactor.h" +#include "Worker.h" -#include #include +#include #include #include #include +#include namespace comm { namespace network { @@ -66,38 +68,46 @@ this->terminate(grpc::Status::OK); return; } - try { - std::unique_ptr status = this->readRequest(this->request); - if (status != nullptr) { - this->terminate(*status); - return; - } - } catch (std::exception &e) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - return; - } - this->StartRead(&this->request); + Worker::getInstance().schedule( + [this]() { + std::unique_ptr status = this->readRequest(this->request); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartRead(&this->request); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + }); } template void ServerReadReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::exception &e) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; + + Worker::getInstance().schedule( + [this]() { + this->terminateCallback(); + this->validate(); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->statusHolder->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + if (!this->statusHolder->getStatus().ok()) { + LOG(ERROR) << this->statusHolder->getStatus().error_message(); + } + if (this->statusHolder->state != ReactorState::RUNNING) { + return; + } + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; + }); } template