Page MenuHomePhabricator

D3786.id11711.diff
No OneTemporary

D3786.id11711.diff

diff --git a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h
--- a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h
+++ b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
namespace comm {
@@ -8,12 +10,8 @@
template <class Request, class Response>
class ClientBidiReactorBase
- : public grpc::ClientBidiReactor<Request, Response> {
+ : public grpc::ClientBidiReactor<Request, Response>, public BaseReactor {
std::shared_ptr<Response> response = nullptr;
- bool terminated = false;
- bool done = false;
- bool initialized = 0;
-
void nextWrite();
protected:
@@ -24,19 +22,14 @@
grpc::ClientContext context;
void start();
- void terminate(const grpc::Status &status);
- bool isTerminated();
- bool isDone();
void OnWriteDone(bool ok) override;
void OnReadDone(bool ok) override;
+ void terminate(const grpc::Status &status) override;
void OnDone(const grpc::Status &status) override;
virtual std::unique_ptr<grpc::Status> prepareRequest(
Request &request,
std::shared_ptr<Response> previousResponse) = 0;
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
@@ -54,47 +47,16 @@
return;
}
this->StartWrite(&this->request);
- if (!this->initialized) {
- this->StartCall();
- this->initialized = true;
- }
}
template <class Request, class Response>
void ClientBidiReactorBase<Request, Response>::start() {
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->status.ok()) {
- this->status = status;
- }
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
- }
- if (this->terminated) {
+ if (this->state != ReactorState::NONE) {
return;
}
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- this->StartWritesDone();
- this->terminated = true;
-}
-
-template <class Request, class Response>
-bool ClientBidiReactorBase<Request, Response>::isTerminated() {
- return this->terminated;
-}
-
-template <class Request, class Response>
-bool ClientBidiReactorBase<Request, Response>::isDone() {
- return this->done;
+ this->state = ReactorState::RUNNING;
+ this->nextWrite();
+ this->StartCall();
}
template <class Request, class Response>
@@ -117,11 +79,33 @@
this->nextWrite();
}
+template <class Request, class Response>
+void ClientBidiReactorBase<Request, Response>::terminate(
+ const grpc::Status &status) {
+ if (this->status.ok()) {
+ this->status = status;
+ }
+ if (!this->status.ok()) {
+ std::cout << "error: " << this->status.error_message() << std::endl;
+ }
+ if (this->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->terminateCallback();
+ try {
+ this->validate();
+ } catch (std::runtime_error &e) {
+ this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ }
+ this->StartWritesDone();
+ this->state = ReactorState::TERMINATED;
+}
+
template <class Request, class Response>
void ClientBidiReactorBase<Request, Response>::OnDone(
const grpc::Status &status) {
+ this->state = ReactorState::DONE;
this->terminate(status);
- this->done = true;
this->doneCallback();
}
diff --git a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h
--- a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h
+++ b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
namespace comm {
@@ -7,13 +9,9 @@
namespace reactor {
template <class Request, class Response>
-class ClientReadReactorBase : public grpc::ClientReadReactor<Response> {
+class ClientReadReactorBase : public grpc::ClientReadReactor<Response>,
+ public BaseReactor {
Response response;
- bool done = false;
- bool terminated = false;
- bool initialized = false;
-
- void terminate(const grpc::Status &status);
protected:
grpc::Status status = grpc::Status::OK;
@@ -24,43 +22,21 @@
void start();
void OnReadDone(bool ok) override;
+ void terminate(const grpc::Status &status) override;
void OnDone(const grpc::Status &status) override;
- bool isDone();
- bool isTerminated();
virtual std::unique_ptr<grpc::Status> readResponse(Response &response) = 0;
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->status.ok()) {
- this->status = status;
- }
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
- }
- if (this->terminated) {
+void ClientReadReactorBase<Request, Response>::start() {
+ if (this->state != ReactorState::NONE) {
return;
}
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- this->terminated = true;
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::start() {
this->StartRead(&this->response);
- if (!this->initialized) {
+ if (this->state != ReactorState::RUNNING) {
this->StartCall();
- this->initialized = true;
+ this->state = ReactorState::RUNNING;
}
}
@@ -86,21 +62,32 @@
}
template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::OnDone(
+void ClientReadReactorBase<Request, Response>::terminate(
const grpc::Status &status) {
- this->terminated = true;
- this->terminate(status);
- this->doneCallback();
-}
-
-template <class Request, class Response>
-bool ClientReadReactorBase<Request, Response>::isDone() {
- return this->done;
+ if (this->status.ok()) {
+ this->status = status;
+ }
+ if (!this->status.ok()) {
+ std::cout << "error: " << this->status.error_message() << std::endl;
+ }
+ if (this->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->terminateCallback();
+ try {
+ this->validate();
+ } catch (std::runtime_error &e) {
+ this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ }
+ this->state = ReactorState::TERMINATED;
}
template <class Request, class Response>
-bool ClientReadReactorBase<Request, Response>::isTerminated() {
- return this->terminated;
+void ClientReadReactorBase<Request, Response>::OnDone(
+ const grpc::Status &status) {
+ this->state = ReactorState::DONE;
+ this->terminate(status);
+ this->doneCallback();
}
} // namespace reactor
diff --git a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h
--- a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h
+++ b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
namespace comm {
@@ -7,29 +9,22 @@
namespace reactor {
template <class Request, class Response>
-class ClientWriteReactorBase : public grpc::ClientWriteReactor<Request> {
+class ClientWriteReactorBase : public grpc::ClientWriteReactor<Request>,
+ public BaseReactor {
grpc::Status status = grpc::Status::OK;
- bool done = false;
- bool terminated = false;
- bool initialized = 0;
Request request;
void nextWrite();
-
public:
Response response;
grpc::ClientContext context;
+ void start();
void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status);
- bool isDone();
- bool isTerminated();
+ void terminate(const grpc::Status &status) override;
void OnDone(const grpc::Status &status) override;
virtual std::unique_ptr<grpc::Status> prepareRequest(Request &request) = 0;
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
@@ -53,6 +48,10 @@
template <class Request, class Response>
void ClientWriteReactorBase<Request, Response>::start() {
+ if (this->start != ReactorState::NONE) {
+ return;
+ }
+ this->state = ReactorState::RUNNING;
this->nextWrite();
}
@@ -74,7 +73,7 @@
if (!this->status.ok()) {
std::cout << "error: " << this->status.error_message() << std::endl;
}
- if (this->terminated) {
+ if (this->state != ReactorState::RUNNING) {
return;
}
this->terminateCallback();
@@ -83,25 +82,15 @@
} catch (std::runtime_error &e) {
this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
}
- this->terminated = true;
+ this->state = ReactorState::TERMINATED;
this->StartWritesDone();
}
-template <class Request, class Response>
-bool ClientWriteReactorBase<Request, Response>::isDone() {
- return this->done;
-}
-
-template <class Request, class Response>
-bool ClientWriteReactorBase<Request, Response>::isTerminated() {
- return this->terminated;
-}
-
template <class Request, class Response>
void ClientWriteReactorBase<Request, Response>::OnDone(
const grpc::Status &status) {
+ this->state = ReactorState::DONE;
this->terminate(status);
- this->done = true;
this->doneCallback();
}
diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h
@@ -3,6 +3,8 @@
#include "../_generated/blob.grpc.pb.h"
#include "../_generated/blob.pb.h"
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
#include <memory>
@@ -13,8 +15,8 @@
namespace network {
namespace reactor {
-class BlobAppendHolderClientReactor : public grpc::ClientUnaryReactor {
- bool done = false;
+class BlobAppendHolderClientReactor : public grpc::ClientUnaryReactor,
+ public BaseReactor {
grpc::Status status = grpc::Status::OK;
std::condition_variable *terminationNotifier;
@@ -28,7 +30,6 @@
const std::string &hash,
std::condition_variable *terminationNotifier);
void OnDone(const grpc::Status &status);
- bool isDone() const;
grpc::Status getStatus() const;
};
diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp
--- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp
+++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp
@@ -15,14 +15,10 @@
void BlobAppendHolderClientReactor::OnDone(const grpc::Status &status) {
this->status = status;
- this->done = true;
+ this->state = ReactorState::DONE;
this->terminationNotifier->notify_one();
}
-bool BlobAppendHolderClientReactor::isDone() const {
- return this->done;
-}
-
grpc::Status BlobAppendHolderClientReactor::getStatus() const {
return this->status;
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -85,21 +85,22 @@
}
this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
std::unique_lock<std::mutex> lock2(this->blobPutDoneCVMutex);
- if (this->putReactor->isDone() && !this->putReactor->getStatus().ok()) {
+ if (this->putReactor->getState() == ReactorState::DONE &&
+ !this->putReactor->getStatus().ok()) {
throw std::runtime_error(this->putReactor->getStatus().error_message());
}
- if (!this->putReactor->isDone()) {
+ if (this->putReactor->getState() != ReactorState::DONE) {
this->blobPutDoneCV.wait(lock2);
}
if (this->putReactor->getDataExists()) {
this->initializeHolderReactor();
std::unique_lock<std::mutex> lockHolder(this->blobAppendHolderDoneCVMutex);
- if (this->holderReactor->isDone() &&
+ if (this->holderReactor->getState() == ReactorState::DONE &&
!this->holderReactor->getStatus().ok()) {
throw std::runtime_error(
this->holderReactor->getStatus().error_message());
}
- if (!this->holderReactor->isDone()) {
+ if (this->holderReactor->getState() != ReactorState::DONE) {
this->blobAppendHolderDoneCV.wait(lockHolder);
}
}
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -150,7 +150,7 @@
}
this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
std::unique_lock<std::mutex> lockPut(this->blobPutDoneCVMutex);
- if (!this->putReactor->isDone()) {
+ if (this->putReactor->getState() != ReactorState::DONE) {
this->blobPutDoneCV.wait(lockPut);
} else if (!this->putReactor->getStatus().ok()) {
throw std::runtime_error(this->putReactor->getStatus().error_message());
@@ -158,15 +158,15 @@
if (this->putReactor->getDataExists()) {
this->initializeHolderReactor();
std::unique_lock<std::mutex> lockHolder(this->blobAppendHolderDoneCVMutex);
- if (!this->holderReactor->isDone()) {
+ if (this->holderReactor->getState() != ReactorState::DONE) {
this->blobAppendHolderDoneCV.wait(lockHolder);
} else if (!this->holderReactor->getStatus().ok()) {
throw std::runtime_error(
this->holderReactor->getStatus().error_message());
}
+ // store in db only when we successfully upload chunks
+ this->storeInDatabase();
}
- // store in db only when we successfully upload chunks
- this->storeInDatabase();
}
void SendLogReactor::doneCallback() {

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 26, 5:26 PM (11 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2707728
Default Alt Text
D3786.id11711.diff (14 KB)

Event Timeline