diff --git a/services/backup/src/DatabaseManager.cpp b/services/backup/src/DatabaseManager.cpp index dc828f452..5481ac32c 100644 --- a/services/backup/src/DatabaseManager.cpp +++ b/services/backup/src/DatabaseManager.cpp @@ -1,175 +1,173 @@ #include "DatabaseManager.h" #include "Constants.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include #include #include -#include - namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } void DatabaseManager::putBackupItem(const BackupItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(BackupItem::tableName); request.AddItem( BackupItem::FIELD_USER_ID, Aws::DynamoDB::Model::AttributeValue(item.getUserID())); request.AddItem( BackupItem::FIELD_CREATED, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); request.AddItem( BackupItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(item.getBackupID())); request.AddItem( BackupItem::FIELD_RECOVERY_DATA, Aws::DynamoDB::Model::AttributeValue(item.getRecoveryData())); request.AddItem( BackupItem::FIELD_COMPACTION_HOLDER, Aws::DynamoDB::Model::AttributeValue(item.getCompactionHolder())); if (!item.getAttachmentHolders().empty()) { request.AddItem( BackupItem::FIELD_ATTACHMENT_HOLDERS, Aws::DynamoDB::Model::AttributeValue(item.getAttachmentHolders())); } this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findBackupItem( const std::string &userID, const std::string &backupID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( BackupItem::FIELD_USER_ID, Aws::DynamoDB::Model::AttributeValue(userID)); request.AddKey( BackupItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(backupID)); return this->innerFindItem(request); } std::shared_ptr DatabaseManager::findLastBackupItem(const std::string &userID) { std::shared_ptr item = createItemByType(); Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(BackupItem::tableName); req.SetKeyConditionExpression(BackupItem::FIELD_USER_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", userID); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName("userID-created-index"); req.SetLimit(1); req.SetScanIndexForward(false); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); if (items.empty()) { return nullptr; } return std::make_shared(items[0]); } void DatabaseManager::removeBackupItem(std::shared_ptr item) { if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putLogItem(const LogItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(LogItem::tableName); request.AddItem( LogItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(item.getBackupID())); request.AddItem( LogItem::FIELD_LOG_ID, Aws::DynamoDB::Model::AttributeValue(item.getLogID())); request.AddItem( LogItem::FIELD_PERSISTED_IN_BLOB, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getPersistedInBlob()))); request.AddItem( LogItem::FIELD_VALUE, Aws::DynamoDB::Model::AttributeValue(item.getValue())); if (!item.getAttachmentHolders().empty()) { request.AddItem( LogItem::FIELD_ATTACHMENT_HOLDERS, Aws::DynamoDB::Model::AttributeValue(item.getAttachmentHolders())); } request.AddItem( LogItem::FIELD_DATA_HASH, Aws::DynamoDB::Model::AttributeValue(item.getDataHash())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findLogItem( const std::string &backupID, const std::string &logID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( LogItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(backupID)); request.AddKey( LogItem::FIELD_LOG_ID, Aws::DynamoDB::Model::AttributeValue(logID)); return this->innerFindItem(request); } std::vector> DatabaseManager::findLogItemsForBackup(const std::string &backupID) { std::vector> result; std::shared_ptr item = createItemByType(); Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(LogItem::tableName); req.SetKeyConditionExpression(LogItem::FIELD_BACKUP_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", backupID); req.SetExpressionAttributeValues(attributeValues); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeLogItem(std::shared_ptr item) { if (item == nullptr) { return; } this->innerRemoveItem(*item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp index c33b4321a..c3a020f7a 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -1,58 +1,56 @@ #include "BlobPutClientReactor.h" -#include - namespace comm { namespace network { namespace reactor { BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier) : holder(holder), hash(hash), dataChunks(folly::MPMCQueue(100)), terminationNotifier(terminationNotifier) { } void BlobPutClientReactor::scheduleSendingDataChunk( std::unique_ptr dataChunk) { if (!this->dataChunks.write(std::move(*dataChunk))) { throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); } } std::unique_ptr BlobPutClientReactor::prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) { if (this->state == State::SEND_HOLDER) { this->request.set_holder(this->holder); this->state = State::SEND_HASH; return nullptr; } if (this->state == State::SEND_HASH) { request.set_blobhash(this->hash); this->state = State::SEND_CHUNKS; return nullptr; } if (previousResponse->dataexists()) { return std::make_unique(grpc::Status::OK); } std::string dataChunk; this->dataChunks.blockingRead(dataChunk); if (dataChunk.empty()) { return std::make_unique(grpc::Status::OK); } request.set_datachunk(dataChunk); return nullptr; } void BlobPutClientReactor::doneCallback() { this->terminationNotifier->notify_one(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp index de86eda94..1f9093153 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,233 +1,231 @@ #include "PullBackupReactor.h" #include "DatabaseManager.h" -#include - namespace comm { namespace network { namespace reactor { PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request), dataChunks(std::make_shared>(100)) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } this->getReactor.reset(new reactor::BlobGetClientReactor( holder, this->dataChunks, &this->blobGetDoneCV)); this->getReactor->request.set_holder(holder); this->blobClient.get(this->getReactor); } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); response->set_attachmentholders(""); response->set_backupid(""); size_t extraBytesNeeded = 0; if (this->state == State::COMPACTION) { response->set_backupid(this->backupItem->getBackupID()); extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); extraBytesNeeded += this->backupItem->getBackupID().size(); if (this->getReactor == nullptr) { extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit) { this->dataChunks->blockingRead(dataChunk); } if (!dataChunk.empty() || this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) { dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_compactionchunk(dataChunk); return nullptr; } if (!this->dataChunks->isEmpty()) { throw std::runtime_error( "dangling data discovered after reading compaction"); } if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } this->state = State::LOGS; if (!this->internalBuffer.empty()) { response->set_compactionchunk(std::move(this->internalBuffer)); return nullptr; } } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { // this means that there are no logs at all so we just terminate with // the compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { // we reached the end of the logs collection so we just want to // terminate either we terminate with an error if we have some dangling // data or with success if we don't if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } if (!this->internalBuffer.empty()) { response->set_logid(this->previousLogID); response->set_logchunk(std::move(this->internalBuffer)); return nullptr; } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { // we went out of the scope of the logs collection, this should never // happen and should be perceived as an error throw std::runtime_error("log index out of bound"); } // this means that we're not reading anything between invocations of // writeResponse // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); extraBytesNeeded += this->currentLog->getLogID().size(); response->set_attachmentholders(this->currentLog->getAttachmentHolders()); extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor // and proceed this->initializeGetReactor(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection response->set_logid(this->currentLog->getLogID()); response->set_logchunk(this->currentLog->getValue()); this->nextLog(); return nullptr; } } else { extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); extraBytesNeeded += this->currentLog->getLogID().size(); } response->set_backupid(this->currentLog->getBackupID()); response->set_logid(this->currentLog->getLogID()); // we want to read the chunks from the blob through the get client until // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { this->dataChunks->blockingRead(dataChunk); } this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } // if we get an empty chunk, we reset the currentLog so we can read the // next one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { this->nextLog(); return nullptr; } else { dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } void PullBackupReactor::nextLog() { ++this->currentLogIndex; this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; this->endOfQueue = false; } std::string PullBackupReactor::prepareDataChunkWithPadding( const std::string &dataChunk, size_t padding) { if (dataChunk.size() > this->chunkLimit) { throw std::runtime_error("received data chunk bigger than the chunk limit"); } std::string chunk = std::move(this->internalBuffer) + dataChunk; const size_t realSize = chunk.size() + padding; if (realSize <= this->chunkLimit) { return chunk; } const size_t bytesToStash = realSize - this->chunkLimit; this->internalBuffer = std::string(chunk.end() - bytesToStash, chunk.end()); chunk.resize(chunk.size() - bytesToStash); if (chunk.size() > this->chunkLimit) { throw std::runtime_error("new data chunk incorrectly calculated"); } return chunk; } void PullBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); std::unique_lock lockGet(this->blobGetDoneCVMutex); if (this->getReactor != nullptr) { if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobGetDoneCV.wait(lockGet); } if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { throw std::runtime_error("get reactor has not been terminated properly"); } if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } } if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/RecoverBackupKeyReactor.h b/services/backup/src/Reactors/server/RecoverBackupKeyReactor.h index cc5dc0634..9dee9def2 100644 --- a/services/backup/src/Reactors/server/RecoverBackupKeyReactor.h +++ b/services/backup/src/Reactors/server/RecoverBackupKeyReactor.h @@ -1,34 +1,33 @@ #pragma once #include "ServerBidiReactorBase.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" -#include #include #include namespace comm { namespace network { namespace reactor { class RecoverBackupKeyReactor : public ServerBidiReactorBase< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> { public: std::unique_ptr handleRequest( backup::RecoverBackupKeyRequest request, backup::RecoverBackupKeyResponse *response); }; std::unique_ptr RecoverBackupKeyReactor::handleRequest( backup::RecoverBackupKeyRequest request, backup::RecoverBackupKeyResponse *response) { // TODO handle request return std::make_unique( grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/grpc-client/ServiceBlobClient.h b/services/backup/src/grpc-client/ServiceBlobClient.h index 08869a329..36f64b390 100644 --- a/services/backup/src/grpc-client/ServiceBlobClient.h +++ b/services/backup/src/grpc-client/ServiceBlobClient.h @@ -1,52 +1,51 @@ #pragma once #include "BlobGetClientReactor.h" #include "BlobPutClientReactor.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include -#include #include #include namespace comm { namespace network { class ServiceBlobClient { std::unique_ptr stub; public: ServiceBlobClient() { // todo handle different types of connection(e.g. load balancer) std::string targetStr = "blob-server:50051"; std::shared_ptr channel = grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); this->stub = blob::BlobService::NewStub(channel); } void put(std::shared_ptr putReactor) { if (putReactor == nullptr) { throw std::runtime_error( "put reactor is being used but has not been initialized"); } this->stub->async()->Put(&putReactor->context, &(*putReactor)); putReactor->start(); } void get(std::shared_ptr getReactor) { if (getReactor == nullptr) { throw std::runtime_error( "get reactor is being used but has not been initialized"); } this->stub->async()->Get( &getReactor->context, &getReactor->request, &(*getReactor)); getReactor->start(); } // void remove(const std::string &holder); }; } // namespace network } // namespace comm diff --git a/services/backup/test/DatabaseManagerTest.cpp b/services/backup/test/DatabaseManagerTest.cpp index 61d155cd9..3787294b0 100644 --- a/services/backup/test/DatabaseManagerTest.cpp +++ b/services/backup/test/DatabaseManagerTest.cpp @@ -1,133 +1,131 @@ #include #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" -#include - #include #include #include using namespace comm::network::database; class DatabaseManagerTest : public testing::Test { protected: virtual void SetUp() { Aws::InitAPI({}); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; std::string generateName(const std::string prefix = "") { return prefix + "-" + std::to_string(comm::network::tools::getCurrentTimestamp()); } BackupItem generateBackupItem(const std::string &userID, const std::string &backupID) { return BackupItem( userID, backupID, comm::network::tools::getCurrentTimestamp(), "xxx", "xxx", {""}); } LogItem generateLogItem(const std::string &backupID, const std::string &logID) { return LogItem(backupID, logID, false, "xxx", {""}); } TEST_F(DatabaseManagerTest, TestOperationsOnBackupItems) { const std::string userID = generateName("user001"); std::vector backupIDs = {"backup001", "backup002", "backup003"}; for (const std::string &backupID : backupIDs) { DatabaseManager::getInstance().putBackupItem( generateBackupItem(userID, backupID)); } std::shared_ptr item; while (!backupIDs.empty()) { item = DatabaseManager::getInstance().findLastBackupItem(userID); EXPECT_NE(item, nullptr); EXPECT_EQ(item->getBackupID(), backupIDs.back()); backupIDs.pop_back(); DatabaseManager::getInstance().removeBackupItem(item); }; EXPECT_EQ(DatabaseManager::getInstance().findLastBackupItem(userID), nullptr); } TEST_F(DatabaseManagerTest, TestOperationsOnLogItems) { const std::string backupID1 = generateName("backup001"); const std::string backupID2 = generateName("backup002"); std::vector logIDs1 = {"log001", "log002", "log003"}; for (const std::string &logID : logIDs1) { LogItem generatedItem = generateLogItem(backupID1, logID); DatabaseManager::getInstance().putLogItem(generatedItem); std::shared_ptr foundItem = DatabaseManager::getInstance().findLogItem(backupID1, logID); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(foundItem->getBackupID(), generatedItem.getBackupID()); EXPECT_EQ(foundItem->getLogID(), generatedItem.getLogID()); EXPECT_EQ( foundItem->getPersistedInBlob(), generatedItem.getPersistedInBlob()); EXPECT_EQ(foundItem->getValue(), generatedItem.getValue()); EXPECT_EQ( foundItem->getAttachmentHolders(), generatedItem.getAttachmentHolders()); } std::vector logIDs2 = {"log021", "log022"}; for (const std::string &logID : logIDs2) { LogItem generatedItem = generateLogItem(backupID2, logID); DatabaseManager::getInstance().putLogItem(generatedItem); std::shared_ptr foundItem = DatabaseManager::getInstance().findLogItem(backupID2, logID); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(foundItem->getBackupID(), generatedItem.getBackupID()); EXPECT_EQ(foundItem->getLogID(), generatedItem.getLogID()); EXPECT_EQ( foundItem->getPersistedInBlob(), generatedItem.getPersistedInBlob()); EXPECT_EQ(foundItem->getValue(), generatedItem.getValue()); EXPECT_EQ( foundItem->getAttachmentHolders(), generatedItem.getAttachmentHolders()); } std::vector> items1 = DatabaseManager::getInstance().findLogItemsForBackup(backupID1); std::vector> items2 = DatabaseManager::getInstance().findLogItemsForBackup(backupID2); EXPECT_EQ(items1.size(), 3); EXPECT_EQ(items2.size(), 2); for (size_t i = 0; i < items1.size(); ++i) { EXPECT_EQ(logIDs1.at(i), items1.at(i)->getLogID()); DatabaseManager::getInstance().removeLogItem(items1.at(i)); EXPECT_EQ( DatabaseManager::getInstance().findLogItem(backupID1, logIDs1.at(i)), nullptr); } EXPECT_EQ( DatabaseManager::getInstance().findLogItemsForBackup(backupID1).size(), 0); for (size_t i = 0; i < items2.size(); ++i) { EXPECT_EQ(logIDs2.at(i), items2.at(i)->getLogID()); DatabaseManager::getInstance().removeLogItem(items2.at(i)); EXPECT_EQ( DatabaseManager::getInstance().findLogItem(backupID2, logIDs2.at(i)), nullptr); } EXPECT_EQ( DatabaseManager::getInstance().findLogItemsForBackup(backupID2).size(), 0); }