Page MenuHomePhabricator

D5367.id18265.diff
No OneTemporary

D5367.id18265.diff

diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
--- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
+++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
@@ -34,9 +34,15 @@
public BaseReactor {
std::shared_ptr<ReactorStatusHolder> statusHolder =
std::make_shared<ReactorStatusHolder>();
+
+ std::atomic<int> ongoingPoolTaskCounter{0};
+
Request request;
Response response;
+ void beginPoolTask();
+ void finishPoolTask();
+
protected:
ServerBidiReactorStatus status;
bool readingAborted = false;
@@ -86,17 +92,20 @@
template <class Request, class Response>
void ServerBidiReactorBase<Request, Response>::OnDone() {
- this->statusHolder->state = ReactorState::DONE;
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
+ this->beginPoolTask();
+ ThreadPool::getInstance().scheduleWithCallback(
+ [this]() {
+ this->statusHolder->state = ReactorState::DONE;
+ this->doneCallback();
+ },
+ [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
}
template <class Request, class Response>
void ServerBidiReactorBase<Request, Response>::terminate(
ServerBidiReactorStatus status) {
this->setStatus(status);
+ this->beginPoolTask();
ThreadPool::getInstance().scheduleWithCallback(
[this]() {
this->terminateCallback();
@@ -108,6 +117,7 @@
grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err))));
}
if (this->statusHolder->state != ReactorState::RUNNING) {
+ this->finishPoolTask();
return;
}
if (this->getStatus().sendLastResponse) {
@@ -117,6 +127,7 @@
this->Finish(this->getStatus().status);
}
this->statusHolder->state = ReactorState::TERMINATED;
+ this->finishPoolTask();
});
}
@@ -142,6 +153,7 @@
this->terminate(ServerBidiReactorStatus(grpc::Status::OK));
return;
}
+ this->beginPoolTask();
ThreadPool::getInstance().scheduleWithCallback(
[this]() {
this->response = Response();
@@ -158,6 +170,7 @@
this->terminate(ServerBidiReactorStatus(
grpc::Status(grpc::StatusCode::INTERNAL, *err)));
}
+ this->finishPoolTask();
});
}
@@ -177,6 +190,23 @@
return this->statusHolder;
}
+template <class Request, class Response>
+void ServerBidiReactorBase<Request, Response>::beginPoolTask() {
+ this->ongoingPoolTaskCounter++;
+}
+
+template <class Request, class Response>
+void ServerBidiReactorBase<Request, Response>::finishPoolTask() {
+ this->ongoingPoolTaskCounter--;
+ if (!this->ongoingPoolTaskCounter.load() &&
+ this->statusHolder->state == ReactorState::DONE) {
+ // This looks weird but apparently it is okay to do this. More
+ // information:
+ // https://phab.comm.dev/D3246#87890
+ delete this;
+ }
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h
--- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h
+++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h
@@ -24,8 +24,13 @@
public BaseReactor {
std::shared_ptr<ReactorStatusHolder> statusHolder =
std::make_shared<ReactorStatusHolder>();
+
+ std::atomic<int> ongoingPoolTaskCounter{0};
Request request;
+ void beginPoolTask();
+ void finishPoolTask();
+
protected:
Response *response;
@@ -68,6 +73,7 @@
this->terminate(grpc::Status::OK);
return;
}
+ this->beginPoolTask();
ThreadPool::getInstance().scheduleWithCallback(
[this]() {
std::unique_ptr<grpc::Status> status = this->readRequest(this->request);
@@ -81,6 +87,7 @@
if (err != nullptr) {
this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
}
+ this->finishPoolTask();
});
}
@@ -88,7 +95,7 @@
void ServerReadReactorBase<Request, Response>::terminate(
const grpc::Status &status) {
this->statusHolder->setStatus(status);
-
+ this->beginPoolTask();
ThreadPool::getInstance().scheduleWithCallback(
[this]() {
this->terminateCallback();
@@ -102,21 +109,23 @@
if (!this->statusHolder->getStatus().ok()) {
LOG(ERROR) << this->statusHolder->getStatus().error_message();
}
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
+ if (this->statusHolder->state == ReactorState::RUNNING) {
+ this->Finish(this->statusHolder->getStatus());
+ this->statusHolder->state = ReactorState::TERMINATED;
}
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
+ this->finishPoolTask();
});
}
template <class Request, class Response>
void ServerReadReactorBase<Request, Response>::OnDone() {
- this->statusHolder->state = ReactorState::DONE;
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
+ this->beginPoolTask();
+ ThreadPool::getInstance().scheduleWithCallback(
+ [this]() {
+ this->statusHolder->state = ReactorState::DONE;
+ this->doneCallback();
+ },
+ [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
}
template <class Request, class Response>
@@ -125,6 +134,23 @@
return this->statusHolder;
}
+template <class Request, class Response>
+void ServerReadReactorBase<Request, Response>::beginPoolTask() {
+ this->ongoingPoolTaskCounter++;
+}
+
+template <class Request, class Response>
+void ServerReadReactorBase<Request, Response>::finishPoolTask() {
+ this->ongoingPoolTaskCounter--;
+ if (!this->ongoingPoolTaskCounter.load() &&
+ this->statusHolder->state == ReactorState::DONE) {
+ // This looks weird but apparently it is okay to do this. More
+ // information:
+ // https://phab.comm.dev/D3246#87890
+ delete this;
+ }
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
--- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
+++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
@@ -24,10 +24,14 @@
public BaseReactor {
std::shared_ptr<ReactorStatusHolder> statusHolder =
std::make_shared<ReactorStatusHolder>();
+
+ std::atomic<int> ongoingPoolTaskCounter{0};
Response response;
bool initialized = false;
void nextWrite();
+ void beginPoolTask();
+ void finishPoolTask();
protected:
// this is a const ref since it's not meant to be modified
@@ -64,6 +68,7 @@
void ServerWriteReactorBase<Request, Response>::terminate(
const grpc::Status &status) {
this->statusHolder->setStatus(status);
+ this->beginPoolTask();
ThreadPool::getInstance().scheduleWithCallback(
[this]() {
this->terminateCallback();
@@ -77,11 +82,11 @@
if (!this->statusHolder->getStatus().ok()) {
LOG(ERROR) << this->statusHolder->getStatus().error_message();
}
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
+ if (this->statusHolder->state == ReactorState::RUNNING) {
+ this->Finish(this->statusHolder->getStatus());
+ this->statusHolder->state = ReactorState::TERMINATED;
}
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
+ this->finishPoolTask();
});
}
@@ -98,6 +103,7 @@
template <class Request, class Response>
void ServerWriteReactorBase<Request, Response>::nextWrite() {
+ this->beginPoolTask();
ThreadPool::getInstance().scheduleWithCallback(
[this]() {
if (!this->initialized) {
@@ -117,6 +123,7 @@
if (err != nullptr) {
this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
}
+ this->finishPoolTask();
});
}
@@ -128,10 +135,10 @@
template <class Request, class Response>
void ServerWriteReactorBase<Request, Response>::OnDone() {
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
+ this->beginPoolTask();
+ ThreadPool::getInstance().scheduleWithCallback(
+ [this]() { this->doneCallback(); },
+ [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
}
template <class Request, class Response>
@@ -149,6 +156,23 @@
this->nextWrite();
}
+template <class Request, class Response>
+void ServerWriteReactorBase<Request, Response>::beginPoolTask() {
+ this->ongoingPoolTaskCounter++;
+}
+
+template <class Request, class Response>
+void ServerWriteReactorBase<Request, Response>::finishPoolTask() {
+ this->ongoingPoolTaskCounter--;
+ if (!this->ongoingPoolTaskCounter.load() &&
+ this->statusHolder->state == ReactorState::DONE) {
+ // This looks weird but apparently it is okay to do this. More
+ // information:
+ // https://phab.comm.dev/D3246#87890
+ delete this;
+ }
+}
+
} // namespace reactor
} // namespace network
} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Mon, Dec 2, 5:32 PM (19 h, 31 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2608373
Default Alt Text
D5367.id18265.diff (9 KB)

Event Timeline