Page MenuHomePhabricator

D4885.id15783.diff
No OneTemporary

D4885.id15783.diff

diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt
--- a/services/backup/CMakeLists.txt
+++ b/services/backup/CMakeLists.txt
@@ -66,14 +66,10 @@
backup
PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/src
- ${CMAKE_CURRENT_SOURCE_DIR}/src/grpc-client
${CMAKE_CURRENT_SOURCE_DIR}/src/DatabaseEntities
${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors
${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server
${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server/base-reactors
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/blob
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/base-reactors
${Boost_INCLUDE_DIR}
)
@@ -91,7 +87,6 @@
comm-blob-grpc
comm-backup-grpc
comm-services-common
- comm-client-base-reactors
comm-server-base-reactors
backup::rust_lib
)
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <ClientReadReactorBase.h>
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <condition_variable>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class BlobGetClientReactor
- : public ClientReadReactorBase<blob::GetRequest, blob::GetResponse> {
- std::string holder;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
- std::condition_variable *terminationNotifier;
-
-public:
- BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks,
- std::condition_variable *terminationNotifier);
-
- std::unique_ptr<grpc::Status>
- readResponse(blob::GetResponse &response) override;
- void doneCallback() override;
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-
-#include "BlobGetClientReactor.h"
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-BlobGetClientReactor::BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks,
- std::condition_variable *terminationNotifier)
- : holder(holder),
- dataChunks(dataChunks),
- terminationNotifier(terminationNotifier) {
-}
-
-std::unique_ptr<grpc::Status>
-BlobGetClientReactor::readResponse(blob::GetResponse &response) {
- if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) {
- throw std::runtime_error("error reading data from the blob service");
- }
- return nullptr;
-}
-
-void BlobGetClientReactor::doneCallback() {
- this->dataChunks->write("");
- this->terminationNotifier->notify_one();
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#pragma once
-
-#include "Constants.h"
-#include "GlobalConstants.h"
-
-#include "blob.grpc.pb.h"
-#include "blob.pb.h"
-
-#include "ClientBidiReactorBase.h"
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <condition_variable>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class BlobPutClientReactor
- : public ClientBidiReactorBase<blob::PutRequest, blob::PutResponse> {
-
- enum class State {
- SEND_HOLDER = 0,
- SEND_HASH = 1,
- SEND_CHUNKS = 2,
- };
-
- State state = State::SEND_HOLDER;
- const std::string hash;
- const std::string holder;
- size_t currentDataSize = 0;
- const size_t chunkSize =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
- folly::MPMCQueue<std::string> dataChunks;
- std::condition_variable *terminationNotifier;
-
-public:
- BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier);
- void scheduleSendingDataChunk(std::unique_ptr<std::string> dataChunk);
- std::unique_ptr<grpc::Status> prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) override;
- void doneCallback() override;
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-#include "BlobPutClientReactor.h"
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-BlobPutClientReactor::BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier)
- : holder(holder),
- hash(hash),
- dataChunks(folly::MPMCQueue<std::string>(100)),
- terminationNotifier(terminationNotifier) {
-}
-
-void BlobPutClientReactor::scheduleSendingDataChunk(
- std::unique_ptr<std::string> dataChunk) {
- if (!this->dataChunks.write(std::move(*dataChunk))) {
- throw std::runtime_error(
- "Error scheduling sending a data chunk to send to the blob service");
- }
-}
-
-std::unique_ptr<grpc::Status> BlobPutClientReactor::prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) {
- if (this->state == State::SEND_HOLDER) {
- this->request.set_holder(this->holder);
- this->state = State::SEND_HASH;
- return nullptr;
- }
- if (this->state == State::SEND_HASH) {
- request.set_blobhash(this->hash);
- this->state = State::SEND_CHUNKS;
- return nullptr;
- }
- if (previousResponse->dataexists()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- std::string dataChunk;
- this->dataChunks.blockingRead(dataChunk);
- if (dataChunk.empty()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- request.set_datachunk(dataChunk);
- return nullptr;
-}
-
-void BlobPutClientReactor::doneCallback() {
- this->terminationNotifier->notify_one();
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
--- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
+++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
@@ -3,10 +3,8 @@
#include <glog/logging.h>
#include "BackupItem.h"
-#include "BlobPutClientReactor.h"
#include "Constants.h"
#include "DatabaseManager.h"
-#include "ServiceBlobClient.h"
#include "Tools.h"
namespace comm {
@@ -74,21 +72,10 @@
// put into S3
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor =
- std::make_shared<reactor::BlobPutClientReactor>(
- holder, newLogItem->getDataHash(), &blobPutDoneCV);
- ServiceBlobClient().put(putReactor);
- std::unique_lock<std::mutex> lockPut(blobPutDoneCVMutex);
- putReactor->scheduleSendingDataChunk(
- std::make_unique<std::string>(std::move(data)));
- putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) {
- blobPutDoneCV.wait(lockPut);
- }
- if (!putReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- putReactor->getStatusHolder()->getStatus().error_message());
- }
+ // todo:blob perform put
+ // todo:blob perform put:add chunk (std::move(data))
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
return newLogItem;
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
@@ -1,7 +1,5 @@
#pragma once
-#include "ServiceBlobClient.h"
-
#include "backup.grpc.pb.h"
#include "backup.pb.h"
@@ -34,9 +32,7 @@
std::string dataHash;
std::string holder;
std::string backupID;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
- ServiceBlobClient blobClient;
std::mutex reactorStateMutex;
std::condition_variable blobPutDoneCV;
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -64,14 +64,12 @@
}
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->holder, this->dataHash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
+ // todo:blob perform put:initialize
return nullptr;
}
case State::DATA_CHUNKS: {
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(
- std::move(*request.mutable_newcompactionchunk())));
+ // todo:blob perform put:add chunk
+ // (std::move(*request.mutable_newcompactionchunk())
return nullptr;
}
}
@@ -80,21 +78,9 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->putReactor == nullptr) {
- return;
- }
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lock2(this->blobPutDoneCVMutex);
- if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) {
- this->blobPutDoneCV.wait(lock2);
- }
- if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) {
- throw std::runtime_error("put reactor has not been terminated properly");
- }
- if (!this->putReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->putReactor->getStatusHolder()->getStatus().error_message());
- }
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
+
// TODO add recovery data
// TODO handle attachments holders
database::BackupItem backupItem(
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -1,10 +1,9 @@
#pragma once
#include "BackupItem.h"
-#include "BlobGetClientReactor.h"
#include "DatabaseEntitiesTools.h"
+#include "GlobalConstants.h"
#include "LogItem.h"
-#include "ServiceBlobClient.h"
#include <backup.grpc.pb.h>
#include <backup.pb.h>
@@ -33,10 +32,7 @@
};
std::shared_ptr<database::BackupItem> backupItem;
- std::shared_ptr<reactor::BlobGetClientReactor> getReactor;
std::mutex reactorStateMutex;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
- ServiceBlobClient blobClient;
State state = State::COMPACTION;
std::vector<std::shared_ptr<database::LogItem>> logs;
size_t currentLogIndex = 0;
@@ -44,9 +40,7 @@
std::string internalBuffer;
std::string previousLogID;
bool endOfQueue = false;
-
- std::condition_variable blobGetDoneCV;
- std::mutex blobGetDoneCVMutex;
+ bool clientInitialized = false;
const size_t chunkLimit =
GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -9,8 +9,7 @@
PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request)
: ServerWriteReactorBase<
backup::PullBackupRequest,
- backup::PullBackupResponse>(request),
- dataChunks(std::make_shared<folly::MPMCQueue<std::string>>(100)) {
+ backup::PullBackupResponse>(request) {
}
void PullBackupReactor::initializeGetReactor(const std::string &holder) {
@@ -18,10 +17,8 @@
throw std::runtime_error(
"get reactor cannot be initialized when backup item is missing");
}
- this->getReactor.reset(new reactor::BlobGetClientReactor(
- holder, this->dataChunks, &this->blobGetDoneCV));
- this->getReactor->request.set_holder(holder);
- this->blobClient.get(this->getReactor);
+ // todo:blob perform get initialize
+ this->clientInitialized = true;
}
void PullBackupReactor::initialize() {
@@ -59,7 +56,7 @@
extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size();
extraBytesNeeded += this->backupItem->getBackupID().size();
- if (this->getReactor == nullptr) {
+ if (!this->clientInitialized) {
extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size();
extraBytesNeeded += this->backupItem->getAttachmentHolders().size();
response->set_attachmentholders(this->backupItem->getAttachmentHolders());
@@ -67,7 +64,7 @@
}
std::string dataChunk;
if (this->internalBuffer.size() < this->chunkLimit) {
- this->dataChunks->blockingRead(dataChunk);
+ // todo:blob perform blocking read
}
if (!dataChunk.empty() ||
this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) {
@@ -76,14 +73,6 @@
response->set_compactionchunk(dataChunk);
return nullptr;
}
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error(
- "dangling data discovered after reading compaction");
- }
- if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->getReactor->getStatusHolder()->getStatus().error_message());
- }
this->state = State::LOGS;
if (!this->internalBuffer.empty()) {
response->set_compactionchunk(std::move(this->internalBuffer));
@@ -98,12 +87,6 @@
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex == this->logs.size()) {
- // we reached the end of the logs collection so we just want to
- // terminate either we terminate with an error if we have some dangling
- // data or with success if we don't
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error("dangling data discovered after reading logs");
- }
if (!this->internalBuffer.empty()) {
response->set_logid(this->previousLogID);
response->set_logchunk(std::move(this->internalBuffer));
@@ -151,14 +134,10 @@
// we get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) {
- this->dataChunks->blockingRead(dataChunk);
+ // todo:blob perform blocking read
}
this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0);
dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
- if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->getReactor->getStatusHolder()->getStatus().error_message());
- }
// if we get an empty chunk, we reset the currentLog so we can read the
// next one from the logs collection.
// If there's data inside, we write it to the client and proceed.
@@ -204,19 +183,8 @@
void PullBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- std::unique_lock<std::mutex> lockGet(this->blobGetDoneCVMutex);
- if (this->getReactor != nullptr) {
- if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) {
- this->blobGetDoneCV.wait(lockGet);
- }
- if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) {
- throw std::runtime_error("get reactor has not been terminated properly");
- }
- if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->getReactor->getStatusHolder()->getStatus().error_message());
- }
- }
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
if (!this->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->getStatusHolder()->getStatus().error_message());
diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.h
@@ -2,7 +2,6 @@
#include "LogItem.h"
#include "ServerReadReactorBase.h"
-#include "ServiceBlobClient.h"
#include "backup.grpc.pb.h"
#include "backup.pb.h"
@@ -43,9 +42,6 @@
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
- ServiceBlobClient blobClient;
-
void storeInDatabase();
std::string generateLogID(const std::string &backupID);
void initializePutReactor();
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -42,11 +42,7 @@
throw std::runtime_error(
"put reactor cannot be initialized with empty hash");
}
- if (this->putReactor == nullptr) {
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->blobHolder, this->hash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
- }
+ // todo:blob perform put:initialize
}
std::unique_ptr<grpc::Status>
@@ -102,11 +98,7 @@
"), merge them into bigger parts instead");
}
if (this->persistenceMethod == PersistenceMethod::BLOB) {
- if (this->putReactor == nullptr) {
- throw std::runtime_error(
- "put reactor is being used but has not been initialized");
- }
- this->putReactor->scheduleSendingDataChunk(std::move(chunk));
+ // todo:blob perform put:add chunk (std::move(chunk))
return nullptr;
}
this->value += std::move(*chunk);
@@ -118,8 +110,7 @@
this->blobHolder =
tools::generateHolder(this->hash, this->backupID, this->logID);
this->initializePutReactor();
- this->putReactor->scheduleSendingDataChunk(
- std::make_unique<std::string>(this->value));
+ // todo:blob perform put:add chunk (this->value)
this->value = "";
} else {
this->persistenceMethod = PersistenceMethod::DB;
@@ -143,20 +134,12 @@
throw std::runtime_error("Invalid persistence method detected");
}
- if (this->persistenceMethod == PersistenceMethod::DB ||
- this->putReactor == nullptr) {
+ if (this->persistenceMethod == PersistenceMethod::DB) {
this->storeInDatabase();
return;
}
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lockPut(this->blobPutDoneCVMutex);
- if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) {
- this->blobPutDoneCV.wait(lockPut);
- }
- if (!this->putReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->putReactor->getStatusHolder()->getStatus().error_message());
- }
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
// store in db only when we successfully upload chunks
this->storeInDatabase();
}
diff --git a/services/backup/src/grpc-client/ServiceBlobClient.h b/services/backup/src/grpc-client/ServiceBlobClient.h
deleted file mode 100644
--- a/services/backup/src/grpc-client/ServiceBlobClient.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#pragma once
-
-#include "BlobGetClientReactor.h"
-#include "BlobPutClientReactor.h"
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-
-class ServiceBlobClient {
- std::unique_ptr<blob::BlobService::Stub> stub;
-
-public:
- ServiceBlobClient() {
- // todo handle different types of connection(e.g. load balancer)
- std::string targetStr = "blob-server:50051";
- std::shared_ptr<grpc::Channel> channel =
- grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials());
- this->stub = blob::BlobService::NewStub(channel);
- }
-
- void put(std::shared_ptr<reactor::BlobPutClientReactor> putReactor) {
- if (putReactor == nullptr) {
- throw std::runtime_error(
- "put reactor is being used but has not been initialized");
- }
- this->stub->async()->Put(&putReactor->context, &(*putReactor));
- putReactor->start();
- }
-
- void get(std::shared_ptr<reactor::BlobGetClientReactor> getReactor) {
- if (getReactor == nullptr) {
- throw std::runtime_error(
- "get reactor is being used but has not been initialized");
- }
- this->stub->async()->Get(
- &getReactor->context, &getReactor->request, &(*getReactor));
- getReactor->start();
- }
- // void remove(const std::string &holder);
-};
-
-} // namespace network
-} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Sun, Nov 17, 3:42 AM (21 h, 51 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2512517
Default Alt Text
D4885.id15783.diff (21 KB)

Event Timeline