Page MenuHomePhabricator

D3786.id12039.diff
No OneTemporary

D3786.id12039.diff

diff --git a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h
--- a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h
+++ b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
namespace comm {
@@ -7,36 +9,33 @@
namespace reactor {
template <class Request, class Response>
-class ClientBidiReactorBase
- : public grpc::ClientBidiReactor<Request, Response> {
+class ClientBidiReactorBase : public grpc::ClientBidiReactor<Request, Response>,
+ public BaseReactor {
+ std::shared_ptr<ReactorUtility> utility;
std::shared_ptr<Response> response = nullptr;
- bool terminated = false;
- bool done = false;
- bool initialized = 0;
-
void nextWrite();
protected:
Request request;
- grpc::Status status = grpc::Status::OK;
public:
grpc::ClientContext context;
void start();
- void terminate(const grpc::Status &status);
- bool isTerminated();
- bool isDone();
+
+ void validate() override{};
+ void doneCallback() override{};
+ void terminateCallback() override{};
+
void OnWriteDone(bool ok) override;
void OnReadDone(bool ok) override;
+ void terminate(const grpc::Status &status) override;
void OnDone(const grpc::Status &status) override;
+ std::shared_ptr<ReactorUtility> getUtility() override;
virtual std::unique_ptr<grpc::Status> prepareRequest(
Request &request,
std::shared_ptr<Response> previousResponse) = 0;
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
@@ -54,47 +53,16 @@
return;
}
this->StartWrite(&this->request);
- if (!this->initialized) {
- this->StartCall();
- this->initialized = true;
- }
}
template <class Request, class Response>
void ClientBidiReactorBase<Request, Response>::start() {
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->status.ok()) {
- this->status = status;
- }
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
- }
- if (this->terminated) {
+ if (this->utility->state != ReactorState::NONE) {
return;
}
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- this->StartWritesDone();
- this->terminated = true;
-}
-
-template <class Request, class Response>
-bool ClientBidiReactorBase<Request, Response>::isTerminated() {
- return this->terminated;
-}
-
-template <class Request, class Response>
-bool ClientBidiReactorBase<Request, Response>::isDone() {
- return this->done;
+ this->utility->state = ReactorState::RUNNING;
+ this->nextWrite();
+ this->StartCall();
}
template <class Request, class Response>
@@ -117,14 +85,44 @@
this->nextWrite();
}
+template <class Request, class Response>
+void ClientBidiReactorBase<Request, Response>::terminate(
+ const grpc::Status &status) {
+ if (this->utility->getStatus().ok()) {
+ this->utility->setStatus(status);
+ }
+ if (!this->utility->getStatus().ok()) {
+ std::cout << "error: " << this->utility->getStatus().error_message()
+ << std::endl;
+ }
+ if (this->utility->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->terminateCallback();
+ try {
+ this->validate();
+ } catch (std::runtime_error &e) {
+ this->utility->setStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
+ }
+ this->StartWritesDone();
+ this->utility->state = ReactorState::TERMINATED;
+}
+
template <class Request, class Response>
void ClientBidiReactorBase<Request, Response>::OnDone(
const grpc::Status &status) {
+ this->utility->state = ReactorState::DONE;
this->terminate(status);
- this->done = true;
this->doneCallback();
}
+template <class Request, class Response>
+std::shared_ptr<ReactorUtility>
+ClientBidiReactorBase<Request, Response>::getUtility() {
+ return this->utility;
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h
--- a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h
+++ b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
namespace comm {
@@ -7,60 +9,38 @@
namespace reactor {
template <class Request, class Response>
-class ClientReadReactorBase : public grpc::ClientReadReactor<Response> {
+class ClientReadReactorBase : public grpc::ClientReadReactor<Response>,
+ public BaseReactor {
+ std::shared_ptr<ReactorUtility> utility;
Response response;
- bool done = false;
- bool terminated = false;
- bool initialized = false;
-
- void terminate(const grpc::Status &status);
-
-protected:
- grpc::Status status = grpc::Status::OK;
public:
Request request;
grpc::ClientContext context;
void start();
+
+ void validate() override{};
+ void doneCallback() override{};
+ void terminateCallback() override{};
+
void OnReadDone(bool ok) override;
+ void terminate(const grpc::Status &status) override;
void OnDone(const grpc::Status &status) override;
- bool isDone();
- bool isTerminated();
+ std::shared_ptr<ReactorUtility> getUtility() override;
virtual std::unique_ptr<grpc::Status> readResponse(Response &response) = 0;
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->status.ok()) {
- this->status = status;
- }
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
- }
- if (this->terminated) {
+void ClientReadReactorBase<Request, Response>::start() {
+ if (this->utility->state != ReactorState::NONE) {
return;
}
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- this->terminated = true;
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::start() {
this->StartRead(&this->response);
- if (!this->initialized) {
+ if (this->utility->state != ReactorState::RUNNING) {
this->StartCall();
- this->initialized = true;
+ this->utility->state = ReactorState::RUNNING;
}
}
@@ -86,21 +66,40 @@
}
template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::OnDone(
+void ClientReadReactorBase<Request, Response>::terminate(
const grpc::Status &status) {
- this->terminated = true;
- this->terminate(status);
- this->doneCallback();
+ if (this->utility->getStatus().ok()) {
+ this->utility->setStatus(status);
+ }
+ if (!this->utility->getStatus().ok()) {
+ std::cout << "error: " << this->utility->getStatus().error_message()
+ << std::endl;
+ }
+ if (this->utility->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->terminateCallback();
+ try {
+ this->validate();
+ } catch (std::runtime_error &e) {
+ this->utility->setStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
+ }
+ this->utility->state = ReactorState::TERMINATED;
}
template <class Request, class Response>
-bool ClientReadReactorBase<Request, Response>::isDone() {
- return this->done;
+void ClientReadReactorBase<Request, Response>::OnDone(
+ const grpc::Status &status) {
+ this->utility->state = ReactorState::DONE;
+ this->terminate(status);
+ this->doneCallback();
}
template <class Request, class Response>
-bool ClientReadReactorBase<Request, Response>::isTerminated() {
- return this->terminated;
+std::shared_ptr<ReactorUtility>
+ClientReadReactorBase<Request, Response>::getUtility() {
+ return this->utility;
}
} // namespace reactor
diff --git a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h
--- a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h
+++ b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
namespace comm {
@@ -7,11 +9,9 @@
namespace reactor {
template <class Request, class Response>
-class ClientWriteReactorBase : public grpc::ClientWriteReactor<Request> {
- grpc::Status status = grpc::Status::OK;
- bool done = false;
- bool terminated = false;
- bool initialized = 0;
+class ClientWriteReactorBase : public grpc::ClientWriteReactor<Request>,
+ public BaseReactor {
+ std::shared_ptr<ReactorUtility> utility;
Request request;
void nextWrite();
@@ -20,16 +20,18 @@
Response response;
grpc::ClientContext context;
+ void start();
+
+ void validate() override{};
+ void doneCallback() override{};
+ void terminateCallback() override{};
+
void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status);
- bool isDone();
- bool isTerminated();
+ void terminate(const grpc::Status &status) override;
void OnDone(const grpc::Status &status) override;
+ std::shared_ptr<ReactorUtility> getUtility() override;
virtual std::unique_ptr<grpc::Status> prepareRequest(Request &request) = 0;
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
@@ -53,6 +55,10 @@
template <class Request, class Response>
void ClientWriteReactorBase<Request, Response>::start() {
+ if (this->start != ReactorState::NONE) {
+ return;
+ }
+ this->utility->state = ReactorState::RUNNING;
this->nextWrite();
}
@@ -68,43 +74,41 @@
template <class Request, class Response>
void ClientWriteReactorBase<Request, Response>::terminate(
const grpc::Status &status) {
- if (this->status.ok()) {
- this->status = status;
+ if (this->utility->getStatus().ok()) {
+ this->utility->setStatus(status);
}
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
+ if (!this->utility->getStatus().ok()) {
+ std::cout << "error: " << this->utility->getStatus().error_message()
+ << std::endl;
}
- if (this->terminated) {
+ if (this->utility->state != ReactorState::RUNNING) {
return;
}
this->terminateCallback();
try {
this->validate();
} catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ this->utility->setStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
}
- this->terminated = true;
+ this->utility->state = ReactorState::TERMINATED;
this->StartWritesDone();
}
-template <class Request, class Response>
-bool ClientWriteReactorBase<Request, Response>::isDone() {
- return this->done;
-}
-
-template <class Request, class Response>
-bool ClientWriteReactorBase<Request, Response>::isTerminated() {
- return this->terminated;
-}
-
template <class Request, class Response>
void ClientWriteReactorBase<Request, Response>::OnDone(
const grpc::Status &status) {
+ this->utility->state = ReactorState::DONE;
this->terminate(status);
- this->done = true;
this->doneCallback();
}
+template <class Request, class Response>
+std::shared_ptr<ReactorUtility>
+ClientWriteReactorBase<Request, Response>::getUtility() {
+ return this->utility;
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
@@ -28,7 +28,6 @@
std::unique_ptr<grpc::Status>
readResponse(blob::GetResponse &response) override;
void doneCallback() override;
- grpc::Status getStatus() const;
};
} // namespace reactor
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
+++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
@@ -23,10 +23,6 @@
this->dataChunks->write("");
}
-grpc::Status BlobGetClientReactor::getStatus() const {
- return this->status;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
@@ -46,7 +46,6 @@
blob::PutRequest &request,
std::shared_ptr<blob::PutResponse> previousResponse) override;
void doneCallback() override;
- grpc::Status getStatus() const;
};
} // namespace reactor
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
+++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
@@ -53,10 +53,6 @@
this->terminationNotifier->notify_one();
}
-grpc::Status BlobPutClientReactor::getStatus() const {
- return this->status;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -68,10 +68,16 @@
}
this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
std::unique_lock<std::mutex> lock2(this->blobPutDoneCVMutex);
- if (!this->putReactor->isDone()) {
+ if (this->putReactor->getUtility()->state == ReactorState::DONE &&
+ !this->putReactor->getUtility()->getStatus().ok()) {
+ throw std::runtime_error(
+ this->putReactor->getUtility()->getStatus().error_message());
+ }
+ if (this->putReactor->getUtility()->state != ReactorState::DONE) {
this->blobPutDoneCV.wait(lock2);
- } else if (!this->putReactor->getStatus().ok()) {
- throw std::runtime_error(this->putReactor->getStatus().error_message());
+ } else if (!this->putReactor->getUtility()->getStatus().ok()) {
+ throw std::runtime_error(
+ this->putReactor->getUtility()->getStatus().error_message());
}
try {
// TODO add recovery data
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -67,8 +67,9 @@
throw std::runtime_error(
"dangling data discovered after reading compaction");
}
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
+ if (!this->getReactor->getUtility()->getStatus().ok()) {
+ throw std::runtime_error(
+ this->getReactor->getUtility()->getStatus().error_message());
}
this->state = State::LOGS;
}
@@ -116,8 +117,9 @@
// get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
this->dataChunks->blockingRead(dataChunk);
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
+ if (!this->getReactor->getUtility()->getStatus().ok()) {
+ throw std::runtime_error(
+ this->getReactor->getUtility()->getStatus().error_message());
}
// if we get an empty chunk, we reset the currentLog so we can read the next
// one from the logs collection.
@@ -135,8 +137,9 @@
}
void PullBackupReactor::terminateCallback() {
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
+ if (!this->getReactor->getUtility()->getStatus().ok()) {
+ throw std::runtime_error(
+ this->getReactor->getUtility()->getStatus().error_message());
}
}
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -133,10 +133,11 @@
}
this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
std::unique_lock<std::mutex> lockPut(this->blobPutDoneCVMutex);
- if (!this->putReactor->isDone()) {
+ if (this->putReactor->getUtility()->state != ReactorState::DONE) {
this->blobPutDoneCV.wait(lockPut);
- } else if (!this->putReactor->getStatus().ok()) {
- throw std::runtime_error(this->putReactor->getStatus().error_message());
+ } else if (!this->putReactor->getUtility()->getStatus().ok()) {
+ throw std::runtime_error(
+ this->putReactor->getUtility()->getStatus().error_message());
}
// store in db only when we successfully upload chunks
this->storeInDatabase();

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 26, 5:36 PM (10 h, 45 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2707764
Default Alt Text
D3786.id12039.diff (17 KB)

Event Timeline