diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h --- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h @@ -34,9 +34,15 @@ public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); + + std::atomic ongoingPoolTaskCounter{0}; + Request request; Response response; + void beginPoolTask(); + void finishPoolTask(); + protected: ServerBidiReactorStatus status; bool readingAborted = false; @@ -86,17 +92,20 @@ template void ServerBidiReactorBase::OnDone() { - this->statusHolder->state = ReactorState::DONE; - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; + this->beginPoolTask(); + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->statusHolder->state = ReactorState::DONE; + this->doneCallback(); + }, + [this](std::unique_ptr err) { this->finishPoolTask(); }); } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { this->setStatus(status); + this->beginPoolTask(); ThreadPool::getInstance().scheduleWithCallback( [this]() { this->terminateCallback(); @@ -108,6 +117,7 @@ grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err)))); } if (this->statusHolder->state != ReactorState::RUNNING) { + this->finishPoolTask(); return; } if (this->getStatus().sendLastResponse) { @@ -117,6 +127,7 @@ this->Finish(this->getStatus().status); } this->statusHolder->state = ReactorState::TERMINATED; + this->finishPoolTask(); }); } @@ -142,6 +153,7 @@ this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } + this->beginPoolTask(); ThreadPool::getInstance().scheduleWithCallback( [this]() { this->response = Response(); @@ -158,6 +170,7 @@ this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, *err))); } + this->finishPoolTask(); }); } @@ -177,6 +190,23 @@ return this->statusHolder; } +template +void ServerBidiReactorBase::beginPoolTask() { + this->ongoingPoolTaskCounter++; +} + +template +void ServerBidiReactorBase::finishPoolTask() { + this->ongoingPoolTaskCounter--; + if (!this->ongoingPoolTaskCounter.load() && + this->statusHolder->state == ReactorState::DONE) { + // This looks weird but apparently it is okay to do this. More + // information: + // https://phab.comm.dev/D3246#87890 + delete this; + } +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h @@ -24,8 +24,13 @@ public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); + + std::atomic ongoingPoolTaskCounter{0}; Request request; + void beginPoolTask(); + void finishPoolTask(); + protected: Response *response; @@ -68,6 +73,7 @@ this->terminate(grpc::Status::OK); return; } + this->beginPoolTask(); ThreadPool::getInstance().scheduleWithCallback( [this]() { std::unique_ptr status = this->readRequest(this->request); @@ -81,6 +87,7 @@ if (err != nullptr) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); } + this->finishPoolTask(); }); } @@ -88,7 +95,7 @@ void ServerReadReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); - + this->beginPoolTask(); ThreadPool::getInstance().scheduleWithCallback( [this]() { this->terminateCallback(); @@ -102,21 +109,23 @@ if (!this->statusHolder->getStatus().ok()) { LOG(ERROR) << this->statusHolder->getStatus().error_message(); } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; + if (this->statusHolder->state == ReactorState::RUNNING) { + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; } - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; + this->finishPoolTask(); }); } template void ServerReadReactorBase::OnDone() { - this->statusHolder->state = ReactorState::DONE; - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; + this->beginPoolTask(); + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->statusHolder->state = ReactorState::DONE; + this->doneCallback(); + }, + [this](std::unique_ptr err) { this->finishPoolTask(); }); } template @@ -125,6 +134,23 @@ return this->statusHolder; } +template +void ServerReadReactorBase::beginPoolTask() { + this->ongoingPoolTaskCounter++; +} + +template +void ServerReadReactorBase::finishPoolTask() { + this->ongoingPoolTaskCounter--; + if (!this->ongoingPoolTaskCounter.load() && + this->statusHolder->state == ReactorState::DONE) { + // This looks weird but apparently it is okay to do this. More + // information: + // https://phab.comm.dev/D3246#87890 + delete this; + } +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h @@ -24,10 +24,14 @@ public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); + + std::atomic ongoingPoolTaskCounter{0}; Response response; bool initialized = false; void nextWrite(); + void beginPoolTask(); + void finishPoolTask(); protected: // this is a const ref since it's not meant to be modified @@ -64,6 +68,7 @@ void ServerWriteReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); + this->beginPoolTask(); ThreadPool::getInstance().scheduleWithCallback( [this]() { this->terminateCallback(); @@ -77,11 +82,11 @@ if (!this->statusHolder->getStatus().ok()) { LOG(ERROR) << this->statusHolder->getStatus().error_message(); } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; + if (this->statusHolder->state == ReactorState::RUNNING) { + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; } - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; + this->finishPoolTask(); }); } @@ -98,6 +103,7 @@ template void ServerWriteReactorBase::nextWrite() { + this->beginPoolTask(); ThreadPool::getInstance().scheduleWithCallback( [this]() { if (!this->initialized) { @@ -117,6 +123,7 @@ if (err != nullptr) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); } + this->finishPoolTask(); }); } @@ -128,10 +135,10 @@ template void ServerWriteReactorBase::OnDone() { - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; + this->beginPoolTask(); + ThreadPool::getInstance().scheduleWithCallback( + [this]() { this->doneCallback(); }, + [this](std::unique_ptr err) { this->finishPoolTask(); }); } template @@ -149,6 +156,23 @@ this->nextWrite(); } +template +void ServerWriteReactorBase::beginPoolTask() { + this->ongoingPoolTaskCounter++; +} + +template +void ServerWriteReactorBase::finishPoolTask() { + this->ongoingPoolTaskCounter--; + if (!this->ongoingPoolTaskCounter.load() && + this->statusHolder->state == ReactorState::DONE) { + // This looks weird but apparently it is okay to do this. More + // information: + // https://phab.comm.dev/D3246#87890 + delete this; + } +} + } // namespace reactor } // namespace network } // namespace comm