Page MenuHomePhabricator

D3912.id12406.diff
No OneTemporary

D3912.id12406.diff

diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt
--- a/services/backup/CMakeLists.txt
+++ b/services/backup/CMakeLists.txt
@@ -29,6 +29,7 @@
include_directories(
./lib_src
+ ./lib_src/server-base-reactors
./src
./src/grpc-client
./src/DatabaseEntities
diff --git a/services/blob/CMakeLists.txt b/services/blob/CMakeLists.txt
--- a/services/blob/CMakeLists.txt
+++ b/services/blob/CMakeLists.txt
@@ -30,6 +30,7 @@
include_directories(
./lib_src
+ ./lib_src/server-base-reactors
./src
./src/DatabaseEntities
./src/Reactors/
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
deleted file mode 100644
--- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
+++ /dev/null
@@ -1,157 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <iostream>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-struct ServerBidiReactorStatus {
- grpc::Status status;
- bool sendLastResponse;
- ServerBidiReactorStatus(
- grpc::Status status = grpc::Status::OK,
- bool sendLastResponse = false)
- : status(status), sendLastResponse(sendLastResponse) {
- }
-};
-
-template <class Request, class Response>
-class ServerBidiReactorBase : public grpc::ServerBidiReactor<Request, Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> utility;
- Request request;
- Response response;
-
-protected:
- ServerBidiReactorStatus status;
- bool readingAborted = false;
-
-public:
- ServerBidiReactorBase();
-
- void terminate(const grpc::Status &status) override;
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
-
- void OnDone() override;
- void OnReadDone(bool ok) override;
- void OnWriteDone(bool ok) override;
- std::shared_ptr<ReactorStatusHolder> getUtility() override;
-
- void terminate(ServerBidiReactorStatus status);
- ServerBidiReactorStatus getStatus() const;
- void setStatus(const ServerBidiReactorStatus &status);
-
- virtual std::unique_ptr<ServerBidiReactorStatus>
- handleRequest(Request request, Response *response) = 0;
-};
-
-template <class Request, class Response>
-ServerBidiReactorBase<Request, Response>::ServerBidiReactorBase() {
- this->utility->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->terminate(ServerBidiReactorStatus(status));
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnDone() {
- this->utility->state = ReactorState::DONE;
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- ServerBidiReactorStatus status) {
- this->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->setStatus(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what())));
- }
- if (this->utility->state != ReactorState::RUNNING) {
- return;
- }
- if (this->getStatus().sendLastResponse) {
- this->StartWriteAndFinish(
- &this->response, grpc::WriteOptions(), this->getStatus().status);
- } else {
- this->Finish(this->getStatus().status);
- }
- this->utility->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-ServerBidiReactorStatus
-ServerBidiReactorBase<Request, Response>::getStatus() const {
- return this->status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::setStatus(
- const ServerBidiReactorStatus &status) {
- this->status = status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- this->readingAborted = true;
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(ServerBidiReactorStatus(grpc::Status::OK));
- return;
- }
- try {
- this->response = Response();
- std::unique_ptr<ServerBidiReactorStatus> status =
- this->handleRequest(this->request, &this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- } catch (std::runtime_error &e) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what())));
- }
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::ABORTED, "write failed")));
- return;
- }
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerBidiReactorBase<Request, Response>::getUtility() {
- return this->utility;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h
deleted file mode 100644
--- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h
+++ /dev/null
@@ -1,109 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <iostream>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-template <class Request, class Response>
-class ServerReadReactorBase : public grpc::ServerReadReactor<Request>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> utility;
- Request request;
-
-protected:
- Response *response;
-
-public:
- ServerReadReactorBase(Response *response);
-
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
-
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone() override;
- std::shared_ptr<ReactorStatusHolder> getUtility() override;
-
- virtual std::unique_ptr<grpc::Status> readRequest(Request request) = 0;
-};
-
-template <class Request, class Response>
-ServerReadReactorBase<Request, Response>::ServerReadReactorBase(
- Response *response)
- : response(response) {
- this->utility->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- try {
- std::unique_ptr<grpc::Status> status = this->readRequest(this->request);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- return;
- }
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->utility->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->utility->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- if (!this->utility->getStatus().ok()) {
- std::cout << "error: " << this->utility->getStatus().error_message()
- << std::endl;
- }
- if (this->utility->state != ReactorState::RUNNING) {
- return;
- }
- this->Finish(this->utility->getStatus());
- this->utility->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnDone() {
- this->utility->state = ReactorState::DONE;
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerReadReactorBase<Request, Response>::getUtility() {
- return this->utility;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
deleted file mode 100644
--- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
+++ /dev/null
@@ -1,131 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <iostream>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-template <class Request, class Response>
-class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> utility;
- Response response;
- bool initialized = false;
-
- void nextWrite();
-
-protected:
- // this is a const ref since it's not meant to be modified
- const Request &request;
-
-public:
- ServerWriteReactorBase(const Request *request);
-
- void start();
-
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
-
- virtual void initialize(){};
- void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status);
- void OnDone() override;
- std::shared_ptr<ReactorStatusHolder> getUtility() override;
-
- virtual std::unique_ptr<grpc::Status> writeResponse(Response *response) = 0;
-};
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->utility->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->utility->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- if (!this->utility->getStatus().ok()) {
- std::cout << "error: " << this->utility->getStatus().error_message()
- << std::endl;
- }
- if (this->utility->state != ReactorState::RUNNING) {
- return;
- }
- this->Finish(this->utility->getStatus());
- this->utility->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-ServerWriteReactorBase<Request, Response>::ServerWriteReactorBase(
- const Request *request)
- : request(*request) {
- // we cannot call this->start() here because it's going to call it on
- // the base class, not derived leading to the runtime error of calling
- // a pure virtual function
- // start has to be exposed as a public function and called explicitly
- // to initialize writing
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::nextWrite() {
- try {
- if (!this->initialized) {
- this->initialize();
- this->initialized = true;
- }
- this->response = Response();
- std::unique_ptr<grpc::Status> status = this->writeResponse(&this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- } catch (std::runtime_error &e) {
- std::cout << "error: " << e.what() << std::endl;
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::start() {
- this->utility->state = ReactorState::RUNNING;
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnDone() {
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerWriteReactorBase<Request, Response>::getUtility() {
- return this->utility;
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error"));
- return;
- }
- this->nextWrite();
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
rename from services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
rename to services/lib/src/server-base-reactors/ServerBidiReactorBase.h
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h
rename from services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h
rename to services/lib/src/server-base-reactors/ServerReadReactorBase.h
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
rename from services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
rename to services/lib/src/server-base-reactors/ServerWriteReactorBase.h

File Metadata

Mime Type
text/plain
Expires
Sun, Sep 29, 2:25 PM (21 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2201034
Default Alt Text
D3912.id12406.diff (13 KB)

Event Timeline