diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -71,24 +71,14 @@ this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->holder.c_str()); - put_client_blocking_read_cxx( - this->holder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->holder.c_str()); put_client_write_cxx( this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->dataHash.c_str()); - rust::String responseStr = put_client_blocking_read_cxx( - this->holder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = + put_client_blocking_read_cxx(this->holder.c_str()); // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique( @@ -105,12 +95,7 @@ tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::string(std::move(*request.mutable_newcompactionchunk())) .c_str()); - put_client_blocking_read_cxx( - this->holder.c_str()); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx(this->holder.c_str()); return nullptr; } diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h --- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h @@ -1,12 +1,14 @@ #pragma once #include "BaseReactor.h" +#include "ThreadPool.h" #include #include #include #include +#include namespace comm { namespace network { @@ -95,23 +97,27 @@ void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { this->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::exception &e) { - this->setStatus(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - if (this->getStatus().sendLastResponse) { - this->StartWriteAndFinish( - &this->response, grpc::WriteOptions(), this->getStatus().status); - } else { - this->Finish(this->getStatus().status); - } - this->statusHolder->state = ReactorState::TERMINATED; + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->terminateCallback(); + this->validate(); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->setStatus(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err)))); + } + if (this->statusHolder->state != ReactorState::RUNNING) { + return; + } + if (this->getStatus().sendLastResponse) { + this->StartWriteAndFinish( + &this->response, grpc::WriteOptions(), this->getStatus().status); + } else { + this->Finish(this->getStatus().status); + } + this->statusHolder->state = ReactorState::TERMINATED; + }); } template @@ -136,19 +142,23 @@ this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } - try { - this->response = Response(); - std::unique_ptr status = - this->handleRequest(this->request, &this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::exception &e) { - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); - } + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->response = Response(); + std::unique_ptr status = + this->handleRequest(this->request, &this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, *err))); + } + }); } template