Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3530905
D4885.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Referenced Files
None
Subscribers
None
D4885.diff
View Options
diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt
--- a/services/backup/CMakeLists.txt
+++ b/services/backup/CMakeLists.txt
@@ -66,14 +66,10 @@
backup
PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/src
- ${CMAKE_CURRENT_SOURCE_DIR}/src/grpc-client
${CMAKE_CURRENT_SOURCE_DIR}/src/DatabaseEntities
${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors
${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server
${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server/base-reactors
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/blob
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/base-reactors
${Boost_INCLUDE_DIR}
)
@@ -91,7 +87,6 @@
comm-blob-grpc
comm-backup-grpc
comm-services-common
- comm-client-base-reactors
comm-server-base-reactors
backup::rust_lib
)
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <ClientReadReactorBase.h>
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <condition_variable>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class BlobGetClientReactor
- : public ClientReadReactorBase<blob::GetRequest, blob::GetResponse> {
- std::string holder;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
- std::condition_variable *terminationNotifier;
-
-public:
- BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks,
- std::condition_variable *terminationNotifier);
-
- std::unique_ptr<grpc::Status>
- readResponse(blob::GetResponse &response) override;
- void doneCallback() override;
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-
-#include "BlobGetClientReactor.h"
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-BlobGetClientReactor::BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks,
- std::condition_variable *terminationNotifier)
- : holder(holder),
- dataChunks(dataChunks),
- terminationNotifier(terminationNotifier) {
-}
-
-std::unique_ptr<grpc::Status>
-BlobGetClientReactor::readResponse(blob::GetResponse &response) {
- if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) {
- throw std::runtime_error("error reading data from the blob service");
- }
- return nullptr;
-}
-
-void BlobGetClientReactor::doneCallback() {
- this->dataChunks->write("");
- this->terminationNotifier->notify_one();
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#pragma once
-
-#include "Constants.h"
-#include "GlobalConstants.h"
-
-#include "blob.grpc.pb.h"
-#include "blob.pb.h"
-
-#include "ClientBidiReactorBase.h"
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <condition_variable>
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class BlobPutClientReactor
- : public ClientBidiReactorBase<blob::PutRequest, blob::PutResponse> {
-
- enum class State {
- SEND_HOLDER = 0,
- SEND_HASH = 1,
- SEND_CHUNKS = 2,
- };
-
- State state = State::SEND_HOLDER;
- const std::string hash;
- const std::string holder;
- size_t currentDataSize = 0;
- const size_t chunkSize =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
- folly::MPMCQueue<std::string> dataChunks;
- std::condition_variable *terminationNotifier;
-
-public:
- BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier);
- void scheduleSendingDataChunk(std::unique_ptr<std::string> dataChunk);
- std::unique_ptr<grpc::Status> prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) override;
- void doneCallback() override;
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
deleted file mode 100644
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-#include "BlobPutClientReactor.h"
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-BlobPutClientReactor::BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier)
- : holder(holder),
- hash(hash),
- dataChunks(folly::MPMCQueue<std::string>(100)),
- terminationNotifier(terminationNotifier) {
-}
-
-void BlobPutClientReactor::scheduleSendingDataChunk(
- std::unique_ptr<std::string> dataChunk) {
- if (!this->dataChunks.write(std::move(*dataChunk))) {
- throw std::runtime_error(
- "Error scheduling sending a data chunk to send to the blob service");
- }
-}
-
-std::unique_ptr<grpc::Status> BlobPutClientReactor::prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) {
- if (this->state == State::SEND_HOLDER) {
- this->request.set_holder(this->holder);
- this->state = State::SEND_HASH;
- return nullptr;
- }
- if (this->state == State::SEND_HASH) {
- request.set_blobhash(this->hash);
- this->state = State::SEND_CHUNKS;
- return nullptr;
- }
- if (previousResponse->dataexists()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- std::string dataChunk;
- this->dataChunks.blockingRead(dataChunk);
- if (dataChunk.empty()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- request.set_datachunk(dataChunk);
- return nullptr;
-}
-
-void BlobPutClientReactor::doneCallback() {
- this->terminationNotifier->notify_one();
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
--- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
+++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
@@ -3,10 +3,8 @@
#include <glog/logging.h>
#include "BackupItem.h"
-#include "BlobPutClientReactor.h"
#include "Constants.h"
#include "DatabaseManager.h"
-#include "ServiceBlobClient.h"
#include "Tools.h"
namespace comm {
@@ -74,21 +72,10 @@
// put into S3
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor =
- std::make_shared<reactor::BlobPutClientReactor>(
- holder, newLogItem->getDataHash(), &blobPutDoneCV);
- ServiceBlobClient().put(putReactor);
- std::unique_lock<std::mutex> lockPut(blobPutDoneCVMutex);
- putReactor->scheduleSendingDataChunk(
- std::make_unique<std::string>(std::move(data)));
- putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) {
- blobPutDoneCV.wait(lockPut);
- }
- if (!putReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- putReactor->getStatusHolder()->getStatus().error_message());
- }
+ // todo:blob perform put
+ // todo:blob perform put:add chunk (std::move(data))
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
return newLogItem;
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
@@ -1,7 +1,5 @@
#pragma once
-#include "ServiceBlobClient.h"
-
#include "backup.grpc.pb.h"
#include "backup.pb.h"
@@ -34,9 +32,7 @@
std::string dataHash;
std::string holder;
std::string backupID;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
- ServiceBlobClient blobClient;
std::mutex reactorStateMutex;
std::condition_variable blobPutDoneCV;
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -64,14 +64,12 @@
}
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->holder, this->dataHash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
+ // todo:blob perform put:initialize
return nullptr;
}
case State::DATA_CHUNKS: {
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(
- std::move(*request.mutable_newcompactionchunk())));
+ // todo:blob perform put:add chunk
+ // (std::move(*request.mutable_newcompactionchunk())
return nullptr;
}
}
@@ -80,21 +78,9 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->putReactor == nullptr) {
- return;
- }
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lock2(this->blobPutDoneCVMutex);
- if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) {
- this->blobPutDoneCV.wait(lock2);
- }
- if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) {
- throw std::runtime_error("put reactor has not been terminated properly");
- }
- if (!this->putReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->putReactor->getStatusHolder()->getStatus().error_message());
- }
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
+
// TODO add recovery data
// TODO handle attachments holders
database::BackupItem backupItem(
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -1,10 +1,9 @@
#pragma once
#include "BackupItem.h"
-#include "BlobGetClientReactor.h"
#include "DatabaseEntitiesTools.h"
+#include "GlobalConstants.h"
#include "LogItem.h"
-#include "ServiceBlobClient.h"
#include <backup.grpc.pb.h>
#include <backup.pb.h>
@@ -33,10 +32,7 @@
};
std::shared_ptr<database::BackupItem> backupItem;
- std::shared_ptr<reactor::BlobGetClientReactor> getReactor;
std::mutex reactorStateMutex;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
- ServiceBlobClient blobClient;
State state = State::COMPACTION;
std::vector<std::shared_ptr<database::LogItem>> logs;
size_t currentLogIndex = 0;
@@ -44,9 +40,7 @@
std::string internalBuffer;
std::string previousLogID;
bool endOfQueue = false;
-
- std::condition_variable blobGetDoneCV;
- std::mutex blobGetDoneCVMutex;
+ bool clientInitialized = false;
const size_t chunkLimit =
GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -9,8 +9,7 @@
PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request)
: ServerWriteReactorBase<
backup::PullBackupRequest,
- backup::PullBackupResponse>(request),
- dataChunks(std::make_shared<folly::MPMCQueue<std::string>>(100)) {
+ backup::PullBackupResponse>(request) {
}
void PullBackupReactor::initializeGetReactor(const std::string &holder) {
@@ -18,10 +17,8 @@
throw std::runtime_error(
"get reactor cannot be initialized when backup item is missing");
}
- this->getReactor.reset(new reactor::BlobGetClientReactor(
- holder, this->dataChunks, &this->blobGetDoneCV));
- this->getReactor->request.set_holder(holder);
- this->blobClient.get(this->getReactor);
+ // todo:blob perform get initialize
+ this->clientInitialized = true;
}
void PullBackupReactor::initialize() {
@@ -59,7 +56,7 @@
extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size();
extraBytesNeeded += this->backupItem->getBackupID().size();
- if (this->getReactor == nullptr) {
+ if (!this->clientInitialized) {
extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size();
extraBytesNeeded += this->backupItem->getAttachmentHolders().size();
response->set_attachmentholders(this->backupItem->getAttachmentHolders());
@@ -67,7 +64,7 @@
}
std::string dataChunk;
if (this->internalBuffer.size() < this->chunkLimit) {
- this->dataChunks->blockingRead(dataChunk);
+ // todo:blob perform blocking read
}
if (!dataChunk.empty() ||
this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) {
@@ -76,14 +73,6 @@
response->set_compactionchunk(dataChunk);
return nullptr;
}
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error(
- "dangling data discovered after reading compaction");
- }
- if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->getReactor->getStatusHolder()->getStatus().error_message());
- }
this->state = State::LOGS;
if (!this->internalBuffer.empty()) {
response->set_compactionchunk(std::move(this->internalBuffer));
@@ -98,12 +87,6 @@
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex == this->logs.size()) {
- // we reached the end of the logs collection so we just want to
- // terminate either we terminate with an error if we have some dangling
- // data or with success if we don't
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error("dangling data discovered after reading logs");
- }
if (!this->internalBuffer.empty()) {
response->set_logid(this->previousLogID);
response->set_logchunk(std::move(this->internalBuffer));
@@ -151,14 +134,10 @@
// we get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) {
- this->dataChunks->blockingRead(dataChunk);
+ // todo:blob perform blocking read
}
this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0);
dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
- if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->getReactor->getStatusHolder()->getStatus().error_message());
- }
// if we get an empty chunk, we reset the currentLog so we can read the
// next one from the logs collection.
// If there's data inside, we write it to the client and proceed.
@@ -204,19 +183,8 @@
void PullBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- std::unique_lock<std::mutex> lockGet(this->blobGetDoneCVMutex);
- if (this->getReactor != nullptr) {
- if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) {
- this->blobGetDoneCV.wait(lockGet);
- }
- if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) {
- throw std::runtime_error("get reactor has not been terminated properly");
- }
- if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->getReactor->getStatusHolder()->getStatus().error_message());
- }
- }
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
if (!this->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->getStatusHolder()->getStatus().error_message());
diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.h
@@ -2,7 +2,6 @@
#include "LogItem.h"
#include "ServerReadReactorBase.h"
-#include "ServiceBlobClient.h"
#include "backup.grpc.pb.h"
#include "backup.pb.h"
@@ -43,9 +42,6 @@
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
- ServiceBlobClient blobClient;
-
void storeInDatabase();
std::string generateLogID(const std::string &backupID);
void initializePutReactor();
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -42,11 +42,7 @@
throw std::runtime_error(
"put reactor cannot be initialized with empty hash");
}
- if (this->putReactor == nullptr) {
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->blobHolder, this->hash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
- }
+ // todo:blob perform put:initialize
}
std::unique_ptr<grpc::Status>
@@ -102,11 +98,7 @@
"), merge them into bigger parts instead");
}
if (this->persistenceMethod == PersistenceMethod::BLOB) {
- if (this->putReactor == nullptr) {
- throw std::runtime_error(
- "put reactor is being used but has not been initialized");
- }
- this->putReactor->scheduleSendingDataChunk(std::move(chunk));
+ // todo:blob perform put:add chunk (std::move(chunk))
return nullptr;
}
this->value += std::move(*chunk);
@@ -118,8 +110,7 @@
this->blobHolder =
tools::generateHolder(this->hash, this->backupID, this->logID);
this->initializePutReactor();
- this->putReactor->scheduleSendingDataChunk(
- std::make_unique<std::string>(this->value));
+ // todo:blob perform put:add chunk (this->value)
this->value = "";
} else {
this->persistenceMethod = PersistenceMethod::DB;
@@ -143,20 +134,12 @@
throw std::runtime_error("Invalid persistence method detected");
}
- if (this->persistenceMethod == PersistenceMethod::DB ||
- this->putReactor == nullptr) {
+ if (this->persistenceMethod == PersistenceMethod::DB) {
this->storeInDatabase();
return;
}
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lockPut(this->blobPutDoneCVMutex);
- if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) {
- this->blobPutDoneCV.wait(lockPut);
- }
- if (!this->putReactor->getStatusHolder()->getStatus().ok()) {
- throw std::runtime_error(
- this->putReactor->getStatusHolder()->getStatus().error_message());
- }
+ // todo:blob perform put:add chunk ("")
+ // todo:blob perform put:wait for completion
// store in db only when we successfully upload chunks
this->storeInDatabase();
}
diff --git a/services/backup/src/grpc-client/ServiceBlobClient.h b/services/backup/src/grpc-client/ServiceBlobClient.h
deleted file mode 100644
--- a/services/backup/src/grpc-client/ServiceBlobClient.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#pragma once
-
-#include "BlobGetClientReactor.h"
-#include "BlobPutClientReactor.h"
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-
-class ServiceBlobClient {
- std::unique_ptr<blob::BlobService::Stub> stub;
-
-public:
- ServiceBlobClient() {
- // todo handle different types of connection(e.g. load balancer)
- std::string targetStr = "blob-server:50051";
- std::shared_ptr<grpc::Channel> channel =
- grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials());
- this->stub = blob::BlobService::NewStub(channel);
- }
-
- void put(std::shared_ptr<reactor::BlobPutClientReactor> putReactor) {
- if (putReactor == nullptr) {
- throw std::runtime_error(
- "put reactor is being used but has not been initialized");
- }
- this->stub->async()->Put(&putReactor->context, &(*putReactor));
- putReactor->start();
- }
-
- void get(std::shared_ptr<reactor::BlobGetClientReactor> getReactor) {
- if (getReactor == nullptr) {
- throw std::runtime_error(
- "get reactor is being used but has not been initialized");
- }
- this->stub->async()->Get(
- &getReactor->context, &getReactor->request, &(*getReactor));
- getReactor->start();
- }
- // void remove(const std::string &holder);
-};
-
-} // namespace network
-} // namespace comm
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 26, 5:09 AM (12 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2703911
Default Alt Text
D4885.diff (21 KB)
Attached To
Mode
D4885: [services] Backup - Connect to Blob - Remove c++ part
Attached
Detach File
Event Timeline
Log In to Comment