diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt --- a/services/backup/CMakeLists.txt +++ b/services/backup/CMakeLists.txt @@ -66,14 +66,10 @@ backup PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src - ${CMAKE_CURRENT_SOURCE_DIR}/src/grpc-client ${CMAKE_CURRENT_SOURCE_DIR}/src/DatabaseEntities ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server/base-reactors - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/blob - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/base-reactors ${Boost_INCLUDE_DIR} ) @@ -91,7 +87,6 @@ comm-blob-grpc comm-backup-grpc comm-services-common - comm-client-base-reactors comm-server-base-reactors backup::rust_lib ) diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h deleted file mode 100644 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include -#include - -#include - -#include -#include - -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class BlobGetClientReactor - : public ClientReadReactorBase { - std::string holder; - std::shared_ptr> dataChunks; - std::condition_variable *terminationNotifier; - -public: - BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks, - std::condition_variable *terminationNotifier); - - std::unique_ptr - readResponse(blob::GetResponse &response) override; - void doneCallback() override; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp deleted file mode 100644 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ /dev/null @@ -1,32 +0,0 @@ - -#include "BlobGetClientReactor.h" - -namespace comm { -namespace network { -namespace reactor { - -BlobGetClientReactor::BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks, - std::condition_variable *terminationNotifier) - : holder(holder), - dataChunks(dataChunks), - terminationNotifier(terminationNotifier) { -} - -std::unique_ptr -BlobGetClientReactor::readResponse(blob::GetResponse &response) { - if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { - throw std::runtime_error("error reading data from the blob service"); - } - return nullptr; -} - -void BlobGetClientReactor::doneCallback() { - this->dataChunks->write(""); - this->terminationNotifier->notify_one(); -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h deleted file mode 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include "Constants.h" -#include "GlobalConstants.h" - -#include "blob.grpc.pb.h" -#include "blob.pb.h" - -#include "ClientBidiReactorBase.h" - -#include -#include - -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class BlobPutClientReactor - : public ClientBidiReactorBase { - - enum class State { - SEND_HOLDER = 0, - SEND_HASH = 1, - SEND_CHUNKS = 2, - }; - - State state = State::SEND_HOLDER; - const std::string hash; - const std::string holder; - size_t currentDataSize = 0; - const size_t chunkSize = - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; - folly::MPMCQueue dataChunks; - std::condition_variable *terminationNotifier; - -public: - BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier); - void scheduleSendingDataChunk(std::unique_ptr dataChunk); - std::unique_ptr prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) override; - void doneCallback() override; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp deleted file mode 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include "BlobPutClientReactor.h" - -namespace comm { -namespace network { -namespace reactor { - -BlobPutClientReactor::BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier) - : holder(holder), - hash(hash), - dataChunks(folly::MPMCQueue(100)), - terminationNotifier(terminationNotifier) { -} - -void BlobPutClientReactor::scheduleSendingDataChunk( - std::unique_ptr dataChunk) { - if (!this->dataChunks.write(std::move(*dataChunk))) { - throw std::runtime_error( - "Error scheduling sending a data chunk to send to the blob service"); - } -} - -std::unique_ptr BlobPutClientReactor::prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) { - if (this->state == State::SEND_HOLDER) { - this->request.set_holder(this->holder); - this->state = State::SEND_HASH; - return nullptr; - } - if (this->state == State::SEND_HASH) { - request.set_blobhash(this->hash); - this->state = State::SEND_CHUNKS; - return nullptr; - } - if (previousResponse->dataexists()) { - return std::make_unique(grpc::Status::OK); - } - std::string dataChunk; - this->dataChunks.blockingRead(dataChunk); - if (dataChunk.empty()) { - return std::make_unique(grpc::Status::OK); - } - request.set_datachunk(dataChunk); - return nullptr; -} - -void BlobPutClientReactor::doneCallback() { - this->terminationNotifier->notify_one(); -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -3,10 +3,8 @@ #include #include "BackupItem.h" -#include "BlobPutClientReactor.h" #include "Constants.h" #include "DatabaseManager.h" -#include "ServiceBlobClient.h" #include "Tools.h" namespace comm { @@ -74,21 +72,10 @@ // put into S3 std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - std::shared_ptr putReactor = - std::make_shared( - holder, newLogItem->getDataHash(), &blobPutDoneCV); - ServiceBlobClient().put(putReactor); - std::unique_lock lockPut(blobPutDoneCVMutex); - putReactor->scheduleSendingDataChunk( - std::make_unique(std::move(data))); - putReactor->scheduleSendingDataChunk(std::make_unique("")); - if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) { - blobPutDoneCV.wait(lockPut); - } - if (!putReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - putReactor->getStatusHolder()->getStatus().error_message()); - } + // todo:blob perform put + // todo:blob perform put:add chunk (std::move(data)) + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion return newLogItem; } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h @@ -1,7 +1,5 @@ #pragma once -#include "ServiceBlobClient.h" - #include "backup.grpc.pb.h" #include "backup.pb.h" @@ -34,9 +32,7 @@ std::string dataHash; std::string holder; std::string backupID; - std::shared_ptr putReactor; - ServiceBlobClient blobClient; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -64,14 +64,12 @@ } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - this->putReactor = std::make_shared( - this->holder, this->dataHash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); + // todo:blob perform put:initialize return nullptr; } case State::DATA_CHUNKS: { - this->putReactor->scheduleSendingDataChunk(std::make_unique( - std::move(*request.mutable_newcompactionchunk()))); + // todo:blob perform put:add chunk + // (std::move(*request.mutable_newcompactionchunk()) return nullptr; } } @@ -80,21 +78,9 @@ void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - if (this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lock2(this->blobPutDoneCVMutex); - if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { - this->blobPutDoneCV.wait(lock2); - } - if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { - throw std::runtime_error("put reactor has not been terminated properly"); - } - if (!this->putReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->putReactor->getStatusHolder()->getStatus().error_message()); - } + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion + // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -1,10 +1,9 @@ #pragma once #include "BackupItem.h" -#include "BlobGetClientReactor.h" #include "DatabaseEntitiesTools.h" +#include "GlobalConstants.h" #include "LogItem.h" -#include "ServiceBlobClient.h" #include #include @@ -33,10 +32,7 @@ }; std::shared_ptr backupItem; - std::shared_ptr getReactor; std::mutex reactorStateMutex; - std::shared_ptr> dataChunks; - ServiceBlobClient blobClient; State state = State::COMPACTION; std::vector> logs; size_t currentLogIndex = 0; @@ -44,9 +40,7 @@ std::string internalBuffer; std::string previousLogID; bool endOfQueue = false; - - std::condition_variable blobGetDoneCV; - std::mutex blobGetDoneCVMutex; + bool clientInitialized = false; const size_t chunkLimit = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -9,8 +9,7 @@ PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, - backup::PullBackupResponse>(request), - dataChunks(std::make_shared>(100)) { + backup::PullBackupResponse>(request) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { @@ -18,10 +17,8 @@ throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - this->getReactor.reset(new reactor::BlobGetClientReactor( - holder, this->dataChunks, &this->blobGetDoneCV)); - this->getReactor->request.set_holder(holder); - this->blobClient.get(this->getReactor); + // todo:blob perform get initialize + this->clientInitialized = true; } void PullBackupReactor::initialize() { @@ -59,7 +56,7 @@ extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); extraBytesNeeded += this->backupItem->getBackupID().size(); - if (this->getReactor == nullptr) { + if (!this->clientInitialized) { extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); response->set_attachmentholders(this->backupItem->getAttachmentHolders()); @@ -67,7 +64,7 @@ } std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit) { - this->dataChunks->blockingRead(dataChunk); + // todo:blob perform blocking read } if (!dataChunk.empty() || this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) { @@ -76,14 +73,6 @@ response->set_compactionchunk(dataChunk); return nullptr; } - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading compaction"); - } - if (!this->getReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getReactor->getStatusHolder()->getStatus().error_message()); - } this->state = State::LOGS; if (!this->internalBuffer.empty()) { response->set_compactionchunk(std::move(this->internalBuffer)); @@ -98,12 +87,6 @@ return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { - // we reached the end of the logs collection so we just want to - // terminate either we terminate with an error if we have some dangling - // data or with success if we don't - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error("dangling data discovered after reading logs"); - } if (!this->internalBuffer.empty()) { response->set_logid(this->previousLogID); response->set_logchunk(std::move(this->internalBuffer)); @@ -151,14 +134,10 @@ // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { - this->dataChunks->blockingRead(dataChunk); + // todo:blob perform blocking read } this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); - if (!this->getReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getReactor->getStatusHolder()->getStatus().error_message()); - } // if we get an empty chunk, we reset the currentLog so we can read the // next one from the logs collection. // If there's data inside, we write it to the client and proceed. @@ -204,19 +183,8 @@ void PullBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - std::unique_lock lockGet(this->blobGetDoneCVMutex); - if (this->getReactor != nullptr) { - if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { - this->blobGetDoneCV.wait(lockGet); - } - if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { - throw std::runtime_error("get reactor has not been terminated properly"); - } - if (!this->getReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getReactor->getStatusHolder()->getStatus().error_message()); - } - } + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -2,7 +2,6 @@ #include "LogItem.h" #include "ServerReadReactorBase.h" -#include "ServiceBlobClient.h" #include "backup.grpc.pb.h" #include "backup.pb.h" @@ -43,9 +42,6 @@ std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - std::shared_ptr putReactor; - ServiceBlobClient blobClient; - void storeInDatabase(); std::string generateLogID(const std::string &backupID); void initializePutReactor(); diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -42,11 +42,7 @@ throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - if (this->putReactor == nullptr) { - this->putReactor = std::make_shared( - this->blobHolder, this->hash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - } + // todo:blob perform put:initialize } std::unique_ptr @@ -102,11 +98,7 @@ "), merge them into bigger parts instead"); } if (this->persistenceMethod == PersistenceMethod::BLOB) { - if (this->putReactor == nullptr) { - throw std::runtime_error( - "put reactor is being used but has not been initialized"); - } - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); + // todo:blob perform put:add chunk (std::move(chunk)) return nullptr; } this->value += std::move(*chunk); @@ -118,8 +110,7 @@ this->blobHolder = tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk( - std::make_unique(this->value)); + // todo:blob perform put:add chunk (this->value) this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; @@ -143,20 +134,12 @@ throw std::runtime_error("Invalid persistence method detected"); } - if (this->persistenceMethod == PersistenceMethod::DB || - this->putReactor == nullptr) { + if (this->persistenceMethod == PersistenceMethod::DB) { this->storeInDatabase(); return; } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { - this->blobPutDoneCV.wait(lockPut); - } - if (!this->putReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->putReactor->getStatusHolder()->getStatus().error_message()); - } + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion // store in db only when we successfully upload chunks this->storeInDatabase(); } diff --git a/services/backup/src/grpc-client/ServiceBlobClient.h b/services/backup/src/grpc-client/ServiceBlobClient.h deleted file mode 100644 --- a/services/backup/src/grpc-client/ServiceBlobClient.h +++ /dev/null @@ -1,51 +0,0 @@ -#pragma once - -#include "BlobGetClientReactor.h" -#include "BlobPutClientReactor.h" - -#include -#include - -#include - -#include -#include - -namespace comm { -namespace network { - -class ServiceBlobClient { - std::unique_ptr stub; - -public: - ServiceBlobClient() { - // todo handle different types of connection(e.g. load balancer) - std::string targetStr = "blob-server:50051"; - std::shared_ptr channel = - grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); - this->stub = blob::BlobService::NewStub(channel); - } - - void put(std::shared_ptr putReactor) { - if (putReactor == nullptr) { - throw std::runtime_error( - "put reactor is being used but has not been initialized"); - } - this->stub->async()->Put(&putReactor->context, &(*putReactor)); - putReactor->start(); - } - - void get(std::shared_ptr getReactor) { - if (getReactor == nullptr) { - throw std::runtime_error( - "get reactor is being used but has not been initialized"); - } - this->stub->async()->Get( - &getReactor->context, &getReactor->request, &(*getReactor)); - getReactor->start(); - } - // void remove(const std::string &holder); -}; - -} // namespace network -} // namespace comm