Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3530553
D3912.id12217.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Referenced Files
None
Subscribers
None
D3912.id12217.diff
View Options
diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt
--- a/services/backup/CMakeLists.txt
+++ b/services/backup/CMakeLists.txt
@@ -29,6 +29,7 @@
include_directories(
./lib_src
+ ./lib_src/server-base-reactors
./src
./src/grpc-client
./src/DatabaseEntities
diff --git a/services/blob/CMakeLists.txt b/services/blob/CMakeLists.txt
--- a/services/blob/CMakeLists.txt
+++ b/services/blob/CMakeLists.txt
@@ -30,6 +30,7 @@
include_directories(
./lib_src
+ ./lib_src/server-base-reactors
./src
./src/DatabaseEntities
./src/Reactors/
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
deleted file mode 100644
--- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
+++ /dev/null
@@ -1,157 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <iostream>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-struct ServerBidiReactorStatus {
- grpc::Status status;
- bool sendLastResponse;
- ServerBidiReactorStatus(
- grpc::Status status = grpc::Status::OK,
- bool sendLastResponse = false)
- : status(status), sendLastResponse(sendLastResponse) {
- }
-};
-
-template <class Request, class Response>
-class ServerBidiReactorBase : public grpc::ServerBidiReactor<Request, Response>,
- public BaseReactor {
- std::shared_ptr<ReactorUtility> utility;
- Request request;
- Response response;
-
-protected:
- ServerBidiReactorStatus status;
- bool readingAborted = false;
-
-public:
- ServerBidiReactorBase();
-
- void terminate(const grpc::Status &status) override;
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
-
- void OnDone() override;
- void OnReadDone(bool ok) override;
- void OnWriteDone(bool ok) override;
- std::shared_ptr<ReactorUtility> getUtility() override;
-
- void terminate(ServerBidiReactorStatus status);
- ServerBidiReactorStatus getStatus() const;
- void setStatus(const ServerBidiReactorStatus &status);
-
- virtual std::unique_ptr<ServerBidiReactorStatus>
- handleRequest(Request request, Response *response) = 0;
-};
-
-template <class Request, class Response>
-ServerBidiReactorBase<Request, Response>::ServerBidiReactorBase() {
- this->utility->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->terminate(ServerBidiReactorStatus(status));
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnDone() {
- this->utility->state = ReactorState::DONE;
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- ServerBidiReactorStatus status) {
- this->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->setStatus(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what())));
- }
- if (this->utility->state != ReactorState::RUNNING) {
- return;
- }
- if (this->getStatus().sendLastResponse) {
- this->StartWriteAndFinish(
- &this->response, grpc::WriteOptions(), this->getStatus().status);
- } else {
- this->Finish(this->getStatus().status);
- }
- this->utility->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-ServerBidiReactorStatus
-ServerBidiReactorBase<Request, Response>::getStatus() const {
- return this->status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::setStatus(
- const ServerBidiReactorStatus &status) {
- this->status = status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- this->readingAborted = true;
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(ServerBidiReactorStatus(grpc::Status::OK));
- return;
- }
- try {
- this->response = Response();
- std::unique_ptr<ServerBidiReactorStatus> status =
- this->handleRequest(this->request, &this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- } catch (std::runtime_error &e) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what())));
- }
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::ABORTED, "write failed")));
- return;
- }
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorUtility>
-ServerBidiReactorBase<Request, Response>::getUtility() {
- return this->utility;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h
deleted file mode 100644
--- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h
+++ /dev/null
@@ -1,109 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <iostream>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-template <class Request, class Response>
-class ServerReadReactorBase : public grpc::ServerReadReactor<Request>,
- public BaseReactor {
- std::shared_ptr<ReactorUtility> utility;
- Request request;
-
-protected:
- Response *response;
-
-public:
- ServerReadReactorBase(Response *response);
-
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
-
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone() override;
- std::shared_ptr<ReactorUtility> getUtility() override;
-
- virtual std::unique_ptr<grpc::Status> readRequest(Request request) = 0;
-};
-
-template <class Request, class Response>
-ServerReadReactorBase<Request, Response>::ServerReadReactorBase(
- Response *response)
- : response(response) {
- this->utility->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- try {
- std::unique_ptr<grpc::Status> status = this->readRequest(this->request);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- return;
- }
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->utility->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->utility->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- if (!this->utility->getStatus().ok()) {
- std::cout << "error: " << this->utility->getStatus().error_message()
- << std::endl;
- }
- if (this->utility->state != ReactorState::RUNNING) {
- return;
- }
- this->Finish(this->utility->getStatus());
- this->utility->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnDone() {
- this->utility->state = ReactorState::DONE;
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorUtility>
-ServerReadReactorBase<Request, Response>::getUtility() {
- return this->utility;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
deleted file mode 100644
--- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
+++ /dev/null
@@ -1,131 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <iostream>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-template <class Request, class Response>
-class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>,
- public BaseReactor {
- std::shared_ptr<ReactorUtility> utility;
- Response response;
- bool initialized = false;
-
- void nextWrite();
-
-protected:
- // this is a const ref since it's not meant to be modified
- const Request &request;
-
-public:
- ServerWriteReactorBase(const Request *request);
-
- void start();
-
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
-
- virtual void initialize(){};
- void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status);
- void OnDone() override;
- std::shared_ptr<ReactorUtility> getUtility() override;
-
- virtual std::unique_ptr<grpc::Status> writeResponse(Response *response) = 0;
-};
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->utility->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->utility->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- if (!this->utility->getStatus().ok()) {
- std::cout << "error: " << this->utility->getStatus().error_message()
- << std::endl;
- }
- if (this->utility->state != ReactorState::RUNNING) {
- return;
- }
- this->Finish(this->utility->getStatus());
- this->utility->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-ServerWriteReactorBase<Request, Response>::ServerWriteReactorBase(
- const Request *request)
- : request(*request) {
- // we cannot call this->start() here because it's going to call it on
- // the base class, not derived leading to the runtime error of calling
- // a pure virtual function
- // start has to be exposed as a public function and called explicitly
- // to initialize writing
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::nextWrite() {
- try {
- if (!this->initialized) {
- this->initialize();
- this->initialized = true;
- }
- this->response = Response();
- std::unique_ptr<grpc::Status> status = this->writeResponse(&this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- } catch (std::runtime_error &e) {
- std::cout << "error: " << e.what() << std::endl;
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::start() {
- this->utility->state = ReactorState::RUNNING;
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnDone() {
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorUtility>
-ServerWriteReactorBase<Request, Response>::getUtility() {
- return this->utility;
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error"));
- return;
- }
- this->nextWrite();
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/lib/lib_src/server-base-reactors/ServerBidiReactorBase.h
rename from services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
rename to services/lib/lib_src/server-base-reactors/ServerBidiReactorBase.h
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/lib/lib_src/server-base-reactors/ServerReadReactorBase.h
rename from services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h
rename to services/lib/lib_src/server-base-reactors/ServerReadReactorBase.h
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/lib/lib_src/server-base-reactors/ServerWriteReactorBase.h
rename from services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
rename to services/lib/lib_src/server-base-reactors/ServerWriteReactorBase.h
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 26, 3:04 AM (9 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2703708
Default Alt Text
D3912.id12217.diff (13 KB)
Attached To
Mode
D3912: [services] Lib - Move server base reactors
Attached
Detach File
Event Timeline
Log In to Comment