diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt index 699d79ed4..e51aca828 100644 --- a/services/backup/CMakeLists.txt +++ b/services/backup/CMakeLists.txt @@ -1,142 +1,137 @@ PROJECT(backup CXX) cmake_minimum_required(VERSION 3.16) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin) if(COMMAND cmake_policy) cmake_policy(SET CMP0003 NEW) endif(COMMAND cmake_policy) set(CMAKE_CXX_STANDARD 17) # For C++17 on MacOS, we must set minimum target to 10.14+ set(CMAKE_OSX_DEPLOYMENT_TARGET 10.14) set(BUILD_TESTING OFF CACHE BOOL "Turn off tests" FORCE) set(WITH_GTEST "Use Google Test" OFF) find_package(glog REQUIRED) find_package(protobuf REQUIRED) find_package(gRPC REQUIRED) find_package(Folly REQUIRED) find_package(AWSSDK REQUIRED COMPONENTS core dynamodb) find_package(Boost 1.40 COMPONENTS program_options context filesystem regex system thread REQUIRED ) # Rust integration include(cmake-components/corrosion-cxx.cmake) find_package(Corrosion REQUIRED) if(${CMAKE_CURRENT_SOURCE_DIR} MATCHES "^\/transferred.*") # Inside the docker build contex set(_proto_path "grpc") else() # Inside repo set(_proto_path "../../native/cpp/CommonCpp/grpc") endif() # Shared Comm protos add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/${_proto_path} ${CMAKE_CURRENT_BINARY_DIR}/protos EXCLUDE_FROM_ALL ) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/../lib/src ${CMAKE_CURRENT_BINARY_DIR}/lib/src EXCLUDE_FROM_ALL ) file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp") # SERVER add_executable( backup ${GENERATED_CODE} ${FOLLY_SOURCES} ${SOURCE_CODE} ) add_library_rust(PATH rust_lib NAMESPACE backup) target_include_directories( backup PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src - ${CMAKE_CURRENT_SOURCE_DIR}/src/grpc-client ${CMAKE_CURRENT_SOURCE_DIR}/src/DatabaseEntities ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server/base-reactors - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/blob - ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/client/base-reactors ${Boost_INCLUDE_DIR} ) set( LIBS ${GRPC_LIBS} ${AWSSDK_LINK_LIBRARIES} ${Boost_LIBRARIES} glog::glog Folly::folly gRPC::grpc++ comm-blob-grpc comm-backup-grpc comm-services-common - comm-client-base-reactors comm-server-base-reactors backup::rust_lib ) target_link_libraries( backup ${LIBS} ) install( TARGETS backup RUNTIME DESTINATION bin/ ) # TEST if ($ENV{COMM_TEST_SERVICES} MATCHES 1) file(GLOB TEST_CODE "./test/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp") enable_testing() find_package(GTest REQUIRED) include_directories( ${GTEST_INCLUDE_DIR} ./test ) add_executable( runTests ${GENERATED_CODE} ${FOLLY_SOURCES} ${SOURCE_CODE} ${TEST_CODE} ) target_link_libraries( runTests ${LIBS} gtest gtest_main ) add_test( NAME runTests COMMAND runTests ) endif() diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp deleted file mode 100644 index fc3a749a9..000000000 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ /dev/null @@ -1,32 +0,0 @@ - -#include "BlobGetClientReactor.h" - -namespace comm { -namespace network { -namespace reactor { - -BlobGetClientReactor::BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks, - std::condition_variable *terminationNotifier) - : holder(holder), - dataChunks(dataChunks), - terminationNotifier(terminationNotifier) { -} - -std::unique_ptr -BlobGetClientReactor::readResponse(blob::GetResponse &response) { - if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { - throw std::runtime_error("error reading data from the blob service"); - } - return nullptr; -} - -void BlobGetClientReactor::doneCallback() { - this->dataChunks->write(""); - this->terminationNotifier->notify_one(); -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h deleted file mode 100644 index d98ab64e2..000000000 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include -#include - -#include - -#include -#include - -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class BlobGetClientReactor - : public ClientReadReactorBase { - std::string holder; - std::shared_ptr> dataChunks; - std::condition_variable *terminationNotifier; - -public: - BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks, - std::condition_variable *terminationNotifier); - - std::unique_ptr - readResponse(blob::GetResponse &response) override; - void doneCallback() override; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp deleted file mode 100644 index c3a020f7a..000000000 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include "BlobPutClientReactor.h" - -namespace comm { -namespace network { -namespace reactor { - -BlobPutClientReactor::BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier) - : holder(holder), - hash(hash), - dataChunks(folly::MPMCQueue(100)), - terminationNotifier(terminationNotifier) { -} - -void BlobPutClientReactor::scheduleSendingDataChunk( - std::unique_ptr dataChunk) { - if (!this->dataChunks.write(std::move(*dataChunk))) { - throw std::runtime_error( - "Error scheduling sending a data chunk to send to the blob service"); - } -} - -std::unique_ptr BlobPutClientReactor::prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) { - if (this->state == State::SEND_HOLDER) { - this->request.set_holder(this->holder); - this->state = State::SEND_HASH; - return nullptr; - } - if (this->state == State::SEND_HASH) { - request.set_blobhash(this->hash); - this->state = State::SEND_CHUNKS; - return nullptr; - } - if (previousResponse->dataexists()) { - return std::make_unique(grpc::Status::OK); - } - std::string dataChunk; - this->dataChunks.blockingRead(dataChunk); - if (dataChunk.empty()) { - return std::make_unique(grpc::Status::OK); - } - request.set_datachunk(dataChunk); - return nullptr; -} - -void BlobPutClientReactor::doneCallback() { - this->terminationNotifier->notify_one(); -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h deleted file mode 100644 index 7bc1b0237..000000000 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include "Constants.h" -#include "GlobalConstants.h" - -#include "blob.grpc.pb.h" -#include "blob.pb.h" - -#include "ClientBidiReactorBase.h" - -#include -#include - -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -class BlobPutClientReactor - : public ClientBidiReactorBase { - - enum class State { - SEND_HOLDER = 0, - SEND_HASH = 1, - SEND_CHUNKS = 2, - }; - - State state = State::SEND_HOLDER; - const std::string hash; - const std::string holder; - size_t currentDataSize = 0; - const size_t chunkSize = - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; - folly::MPMCQueue dataChunks; - std::condition_variable *terminationNotifier; - -public: - BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier); - void scheduleSendingDataChunk(std::unique_ptr dataChunk); - std::unique_ptr prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) override; - void doneCallback() override; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp index 2e3c7586e..0281d0daf 100644 --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -1,97 +1,84 @@ #include "AddAttachmentsUtility.h" #include #include "BackupItem.h" -#include "BlobPutClientReactor.h" #include "Constants.h" #include "DatabaseManager.h" -#include "ServiceBlobClient.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { grpc::Status AddAttachmentsUtility::processRequest( const backup::AddAttachmentsRequest *request) { grpc::Status status = grpc::Status::OK; std::string userID = request->userid(); std::string backupID = request->backupid(); std::string logID = request->logid(); const std::string holders = request->holders(); try { if (userID.empty()) { throw std::runtime_error("user id required but not provided"); } if (backupID.empty()) { throw std::runtime_error("backup id required but not provided"); } if (holders.empty()) { throw std::runtime_error("holders required but not provided"); } if (logID.empty()) { // add these attachments to backup std::shared_ptr backupItem = database::DatabaseManager::getInstance().findBackupItem( userID, backupID); backupItem->addAttachmentHolders(holders); database::DatabaseManager::getInstance().putBackupItem(*backupItem); } else { // add these attachments to log std::shared_ptr logItem = database::DatabaseManager::getInstance().findLogItem(backupID, logID); logItem->addAttachmentHolders(holders); if (!logItem->getPersistedInBlob() && database::LogItem::getItemSize(logItem.get()) > LOG_DATA_SIZE_DATABASE_LIMIT) { bool old = logItem->getPersistedInBlob(); logItem = this->moveToS3(logItem); } database::DatabaseManager::getInstance().putLogItem(*logItem); } } catch (std::runtime_error &e) { LOG(ERROR) << e.what(); status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return status; } std::shared_ptr AddAttachmentsUtility::moveToS3(std::shared_ptr logItem) { std::string holder = tools::generateHolder( logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID()); std::string data = std::move(logItem->getValue()); std::shared_ptr newLogItem = std::make_shared( logItem->getBackupID(), logItem->getLogID(), true, holder, logItem->getAttachmentHolders(), logItem->getDataHash()); // put into S3 std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - std::shared_ptr putReactor = - std::make_shared( - holder, newLogItem->getDataHash(), &blobPutDoneCV); - ServiceBlobClient().put(putReactor); - std::unique_lock lockPut(blobPutDoneCVMutex); - putReactor->scheduleSendingDataChunk( - std::make_unique(std::move(data))); - putReactor->scheduleSendingDataChunk(std::make_unique("")); - if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) { - blobPutDoneCV.wait(lockPut); - } - if (!putReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - putReactor->getStatusHolder()->getStatus().error_message()); - } + // todo:blob perform put + // todo:blob perform put:add chunk (std::move(data)) + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion return newLogItem; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index 73d29c219..b23e62675 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,112 +1,98 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { if (this->deviceID.empty()) { throw std::runtime_error( "trying to generate a backup ID with an empty device ID"); } return this->deviceID + std::to_string(tools::getCurrentTimestamp()); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::DEVICE_ID; return nullptr; } case State::DEVICE_ID: { if (!request.has_deviceid()) { throw std::runtime_error("device id expected but not received"); } this->deviceID = request.deviceid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; this->backupID = this->generateBackupID(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) != nullptr) { throw std::runtime_error( "Backup with id [" + this->backupID + "] for user [" + this->userID + "] already exists, creation aborted"); } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - this->putReactor = std::make_shared( - this->holder, this->dataHash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); + // todo:blob perform put:initialize return nullptr; } case State::DATA_CHUNKS: { - this->putReactor->scheduleSendingDataChunk(std::make_unique( - std::move(*request.mutable_newcompactionchunk()))); + // todo:blob perform put:add chunk + // (std::move(*request.mutable_newcompactionchunk()) return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - if (this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lock2(this->blobPutDoneCVMutex); - if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { - this->blobPutDoneCV.wait(lock2); - } - if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { - throw std::runtime_error("put reactor has not been terminated properly"); - } - if (!this->putReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->putReactor->getStatusHolder()->getStatus().error_message()); - } + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion + // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, tools::getCurrentTimestamp(), tools::generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h index 7a7e03736..739403e41 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h @@ -1,56 +1,52 @@ #pragma once -#include "ServiceBlobClient.h" - #include "backup.grpc.pb.h" #include "backup.pb.h" #include "ServerBidiReactorBase.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class CreateNewBackupReactor : public ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { enum class State { USER_ID = 1, DEVICE_ID = 2, KEY_ENTROPY = 3, DATA_HASH = 4, DATA_CHUNKS = 5, }; State state = State::USER_ID; std::string userID; std::string deviceID; std::string keyEntropy; std::string dataHash; std::string holder; std::string backupID; - std::shared_ptr putReactor; - ServiceBlobClient blobClient; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; std::string generateBackupID(); public: std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; void terminateCallback() override; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp index 26dd0953b..2ceb36c17 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,228 +1,196 @@ #include "PullBackupReactor.h" #include "DatabaseManager.h" namespace comm { namespace network { namespace reactor { PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, - backup::PullBackupResponse>(request), - dataChunks(std::make_shared>(100)) { + backup::PullBackupResponse>(request) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - this->getReactor.reset(new reactor::BlobGetClientReactor( - holder, this->dataChunks, &this->blobGetDoneCV)); - this->getReactor->request.set_holder(holder); - this->blobClient.get(this->getReactor); + // todo:blob perform get initialize + this->clientInitialized = true; } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); response->set_attachmentholders(""); response->set_backupid(""); size_t extraBytesNeeded = 0; if (this->state == State::COMPACTION) { response->set_backupid(this->backupItem->getBackupID()); extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); extraBytesNeeded += this->backupItem->getBackupID().size(); - if (this->getReactor == nullptr) { + if (!this->clientInitialized) { extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit) { - this->dataChunks->blockingRead(dataChunk); + // todo:blob perform blocking read } if (!dataChunk.empty() || this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) { dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_compactionchunk(dataChunk); return nullptr; } - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading compaction"); - } - if (!this->getReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getReactor->getStatusHolder()->getStatus().error_message()); - } this->state = State::LOGS; if (!this->internalBuffer.empty()) { response->set_compactionchunk(std::move(this->internalBuffer)); return nullptr; } } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { // this means that there are no logs at all so we just terminate with // the compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { - // we reached the end of the logs collection so we just want to - // terminate either we terminate with an error if we have some dangling - // data or with success if we don't - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error("dangling data discovered after reading logs"); - } if (!this->internalBuffer.empty()) { response->set_logid(this->previousLogID); response->set_logchunk(std::move(this->internalBuffer)); return nullptr; } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { // we went out of the scope of the logs collection, this should never // happen and should be perceived as an error throw std::runtime_error("log index out of bound"); } // this means that we're not reading anything between invocations of // writeResponse // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); extraBytesNeeded += this->currentLog->getLogID().size(); response->set_attachmentholders(this->currentLog->getAttachmentHolders()); extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor // and proceed this->initializeGetReactor(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection response->set_logid(this->currentLog->getLogID()); response->set_logchunk(this->currentLog->getValue()); this->nextLog(); return nullptr; } } else { extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); extraBytesNeeded += this->currentLog->getLogID().size(); } response->set_backupid(this->currentLog->getBackupID()); response->set_logid(this->currentLog->getLogID()); // we want to read the chunks from the blob through the get client until // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { - this->dataChunks->blockingRead(dataChunk); + // todo:blob perform blocking read } this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); - if (!this->getReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getReactor->getStatusHolder()->getStatus().error_message()); - } // if we get an empty chunk, we reset the currentLog so we can read the // next one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { this->nextLog(); } else { response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } void PullBackupReactor::nextLog() { ++this->currentLogIndex; this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; this->endOfQueue = false; } std::string PullBackupReactor::prepareDataChunkWithPadding( const std::string &dataChunk, size_t padding) { if (dataChunk.size() > this->chunkLimit) { throw std::runtime_error("received data chunk bigger than the chunk limit"); } std::string chunk = std::move(this->internalBuffer) + dataChunk; const size_t realSize = chunk.size() + padding; if (realSize <= this->chunkLimit) { return chunk; } const size_t bytesToStash = realSize - this->chunkLimit; this->internalBuffer = std::string(chunk.end() - bytesToStash, chunk.end()); chunk.resize(chunk.size() - bytesToStash); if (chunk.size() > this->chunkLimit) { throw std::runtime_error("new data chunk incorrectly calculated"); } return chunk; } void PullBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - std::unique_lock lockGet(this->blobGetDoneCVMutex); - if (this->getReactor != nullptr) { - if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { - this->blobGetDoneCV.wait(lockGet); - } - if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { - throw std::runtime_error("get reactor has not been terminated properly"); - } - if (!this->getReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->getReactor->getStatusHolder()->getStatus().error_message()); - } - } + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h index a6e9fe7b6..4334781b8 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -1,71 +1,65 @@ #pragma once #include "BackupItem.h" -#include "BlobGetClientReactor.h" #include "DatabaseEntitiesTools.h" +#include "GlobalConstants.h" #include "LogItem.h" -#include "ServiceBlobClient.h" #include #include #include "ServerWriteReactorBase.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class PullBackupReactor : public ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { enum class State { COMPACTION = 1, COMPACTION_ATTACHMENTS = 2, LOGS = 3, LOG_ATTACHMENTS = 4, }; std::shared_ptr backupItem; - std::shared_ptr getReactor; std::mutex reactorStateMutex; - std::shared_ptr> dataChunks; - ServiceBlobClient blobClient; State state = State::COMPACTION; std::vector> logs; size_t currentLogIndex = 0; std::shared_ptr currentLog; std::string internalBuffer; std::string previousLogID; bool endOfQueue = false; - - std::condition_variable blobGetDoneCV; - std::mutex blobGetDoneCVMutex; + bool clientInitialized = false; const size_t chunkLimit = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; void initializeGetReactor(const std::string &holder); void nextLog(); std::string prepareDataChunkWithPadding(const std::string &dataChunk, size_t padding); public: PullBackupReactor(const backup::PullBackupRequest *request); void initialize() override; std::unique_ptr writeResponse(backup::PullBackupResponse *response) override; void terminateCallback() override; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index c8fc8883e..95280e15c 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,173 +1,156 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { bool storedInBlob = this->persistenceMethod == PersistenceMethod::BLOB; database::LogItem logItem( this->backupID, this->logID, storedInBlob, storedInBlob ? this->blobHolder : this->value, {}, this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { throw std::runtime_error( "trying to put into the database an item with size " + std::to_string(database::LogItem::getItemSize(&logItem)) + " that exceeds the limit " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT)); } database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { return backupID + tools::ID_SEPARATOR + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutReactor() { if (this->blobHolder.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty blob holder"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - if (this->putReactor == nullptr) { - this->putReactor = std::make_shared( - this->blobHolder, this->hash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - } + // todo:blob perform put:initialize } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) == nullptr) { throw std::runtime_error( "trying to send log for a non-existent backup"); } this->logID = this->generateLogID(this->backupID); this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } if (this->persistenceMethod == PersistenceMethod::DB) { throw std::runtime_error( "please do not send multiple tiny chunks (less than " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + "), merge them into bigger parts instead"); } if (this->persistenceMethod == PersistenceMethod::BLOB) { - if (this->putReactor == nullptr) { - throw std::runtime_error( - "put reactor is being used but has not been initialized"); - } - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); + // todo:blob perform put:add chunk (std::move(chunk)) return nullptr; } this->value += std::move(*chunk); database::LogItem logItem = database::LogItem( this->backupID, this->logID, true, this->value, "", this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { this->persistenceMethod = PersistenceMethod::BLOB; this->blobHolder = tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk( - std::make_unique(this->value)); + // todo:blob perform put:add chunk (this->value) this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } if (this->persistenceMethod != PersistenceMethod::BLOB && this->persistenceMethod != PersistenceMethod::DB) { throw std::runtime_error("Invalid persistence method detected"); } - if (this->persistenceMethod == PersistenceMethod::DB || - this->putReactor == nullptr) { + if (this->persistenceMethod == PersistenceMethod::DB) { this->storeInDatabase(); return; } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { - this->blobPutDoneCV.wait(lockPut); - } - if (!this->putReactor->getStatusHolder()->getStatus().ok()) { - throw std::runtime_error( - this->putReactor->getStatusHolder()->getStatus().error_message()); - } + // todo:blob perform put:add chunk ("") + // todo:blob perform put:wait for completion // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h index 886ba6b5d..75e2992ff 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -1,65 +1,61 @@ #pragma once #include "LogItem.h" #include "ServerReadReactorBase.h" -#include "ServiceBlobClient.h" #include "backup.grpc.pb.h" #include "backup.pb.h" #include #include namespace comm { namespace network { namespace reactor { class SendLogReactor : public ServerReadReactorBase< backup::SendLogRequest, backup::SendLogResponse> { enum class State { USER_ID = 1, BACKUP_ID = 2, LOG_HASH = 3, LOG_CHUNK = 4, }; enum class PersistenceMethod { UNKNOWN = 0, DB = 1, BLOB = 2, }; State state = State::USER_ID; PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; std::string userID; std::string logID; std::string backupID; std::string hash; std::string blobHolder; std::string value; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - std::shared_ptr putReactor; - ServiceBlobClient blobClient; - void storeInDatabase(); std::string generateLogID(const std::string &backupID); void initializePutReactor(); public: using ServerReadReactorBase:: ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; void terminateCallback() override; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/grpc-client/ServiceBlobClient.h b/services/backup/src/grpc-client/ServiceBlobClient.h deleted file mode 100644 index 04908f32f..000000000 --- a/services/backup/src/grpc-client/ServiceBlobClient.h +++ /dev/null @@ -1,51 +0,0 @@ -#pragma once - -#include "BlobGetClientReactor.h" -#include "BlobPutClientReactor.h" - -#include -#include - -#include - -#include -#include - -namespace comm { -namespace network { - -class ServiceBlobClient { - std::unique_ptr stub; - -public: - ServiceBlobClient() { - // todo handle different types of connection(e.g. load balancer) - std::string targetStr = "blob-server:50051"; - std::shared_ptr channel = - grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); - this->stub = blob::BlobService::NewStub(channel); - } - - void put(std::shared_ptr putReactor) { - if (putReactor == nullptr) { - throw std::runtime_error( - "put reactor is being used but has not been initialized"); - } - this->stub->async()->Put(&putReactor->context, &(*putReactor)); - putReactor->start(); - } - - void get(std::shared_ptr getReactor) { - if (getReactor == nullptr) { - throw std::runtime_error( - "get reactor is being used but has not been initialized"); - } - this->stub->async()->Get( - &getReactor->context, &getReactor->request, &(*getReactor)); - getReactor->start(); - } - // void remove(const std::string &holder); -}; - -} // namespace network -} // namespace comm