diff --git a/services/backup/src/DatabaseEntities/BackupItem.cpp b/services/backup/src/DatabaseEntities/BackupItem.cpp --- a/services/backup/src/DatabaseEntities/BackupItem.cpp +++ b/services/backup/src/DatabaseEntities/BackupItem.cpp @@ -3,6 +3,8 @@ #include "Constants.h" #include "Tools.h" +#include + namespace comm { namespace network { namespace database { @@ -37,6 +39,7 @@ } void BackupItem::validate() const { + LOG(INFO) << "[BackupItem::validate]"; if (!this->userID.size()) { throw std::runtime_error("userID empty"); } @@ -52,9 +55,14 @@ } void BackupItem::assignItemFromDatabase(const AttributeValues &itemFromDB) { + LOG(INFO) << "[BackupItem::assignItemFromDatabase]"; try { this->userID = itemFromDB.at(BackupItem::FIELD_USER_ID).GetS(); + LOG(INFO) << "[BackupItem::assignItemFromDatabase] user id " + << this->userID; this->backupID = itemFromDB.at(BackupItem::FIELD_BACKUP_ID).GetS(); + LOG(INFO) << "[BackupItem::assignItemFromDatabase] backup id " + << this->backupID; this->created = std::stoll( std::string(itemFromDB.at(BackupItem::FIELD_CREATED).GetS()).c_str()); this->recoveryData = itemFromDB.at(BackupItem::FIELD_RECOVERY_DATA).GetS(); @@ -113,6 +121,8 @@ } void BackupItem::addAttachmentHolders(const std::string &attachmentHolders) { + LOG(INFO) << "[BackupItem::addAttachmentHolders] attachment holders " + << attachmentHolders; this->attachmentHolders += tools::validateAttachmentHolders(attachmentHolders); } diff --git a/services/backup/src/DatabaseEntities/LogItem.cpp b/services/backup/src/DatabaseEntities/LogItem.cpp --- a/services/backup/src/DatabaseEntities/LogItem.cpp +++ b/services/backup/src/DatabaseEntities/LogItem.cpp @@ -3,6 +3,7 @@ #include "Constants.h" #include "Tools.h" +#include #include namespace comm { @@ -39,6 +40,7 @@ } void LogItem::validate() const { + LOG(INFO) << "[LogItem::validate]"; if (!this->backupID.size()) { throw std::runtime_error("backupID empty"); } @@ -62,9 +64,13 @@ } void LogItem::assignItemFromDatabase(const AttributeValues &itemFromDB) { + LOG(INFO) << "[LogItem::assignItemFromDatabase]"; try { this->backupID = itemFromDB.at(LogItem::FIELD_BACKUP_ID).GetS(); + LOG(INFO) << "[LogItem::assignItemFromDatabase] backup id " + << this->backupID; this->logID = itemFromDB.at(LogItem::FIELD_LOG_ID).GetS(); + LOG(INFO) << "[LogItem::assignItemFromDatabase] log id " << this->logID; this->persistedInBlob = std::stoi( std::string(itemFromDB.at(LogItem::FIELD_PERSISTED_IN_BLOB).GetS()) .c_str()); @@ -119,11 +125,14 @@ } void LogItem::addAttachmentHolders(const std::string &attachmentHolders) { + LOG(INFO) << "[LogItem::addAttachmentHolders] attachment holders " + << attachmentHolders; this->attachmentHolders += tools::validateAttachmentHolders(attachmentHolders); } size_t LogItem::getItemSize(const LogItem *item) { + LOG(INFO) << "[LogItem::getItemSize]"; size_t size = 0; size += LogItem::FIELD_BACKUP_ID.size(); @@ -140,6 +149,7 @@ size += item->getAttachmentHolders().size(); size += item->getDataHash().size(); + LOG(INFO) << "[LogItem::getItemSize] size " << size; return size; } diff --git a/services/backup/src/DatabaseManager.cpp b/services/backup/src/DatabaseManager.cpp --- a/services/backup/src/DatabaseManager.cpp +++ b/services/backup/src/DatabaseManager.cpp @@ -9,6 +9,8 @@ #include #include +#include + namespace comm { namespace network { namespace database { @@ -19,6 +21,9 @@ } void DatabaseManager::putBackupItem(const BackupItem &item) { + LOG(INFO) << "[DatabaseManager::putBackupItem] user id " << item.getUserID(); + LOG(INFO) << "[DatabaseManager::putBackupItem] backup id " + << item.getBackupID(); Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(BackupItem::tableName); request.AddItem( @@ -49,6 +54,8 @@ std::shared_ptr DatabaseManager::findBackupItem( const std::string &userID, const std::string &backupID) { + LOG(INFO) << "[DatabaseManager::findBackupItem] user id " << userID; + LOG(INFO) << "[DatabaseManager::findBackupItem] backup id " << backupID; Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( BackupItem::FIELD_USER_ID, Aws::DynamoDB::Model::AttributeValue(userID)); @@ -61,6 +68,7 @@ std::shared_ptr DatabaseManager::findLastBackupItem(const std::string &userID) { + LOG(INFO) << "[DatabaseManager::findLastBackupItem] user id " << userID; std::shared_ptr item = createItemByType(); Aws::DynamoDB::Model::QueryRequest req; @@ -83,8 +91,10 @@ } const Aws::Vector &items = outcome.GetResult().GetItems(); if (items.empty()) { + LOG(INFO) << "[DatabaseManager::findLastBackupItem] not found"; return nullptr; } + LOG(INFO) << "[DatabaseManager::findLastBackupItem] found"; return std::make_shared(items[0]); } @@ -92,10 +102,13 @@ if (item == nullptr) { return; } + LOG(INFO) << "[DatabaseManager::removeBackupItem]"; this->innerRemoveItem(*item); } void DatabaseManager::putLogItem(const LogItem &item) { + LOG(INFO) << "[DatabaseManager::putLogItem] backup id " << item.getBackupID(); + LOG(INFO) << "[DatabaseManager::putLogItem] log id " << item.getLogID(); Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(LogItem::tableName); request.AddItem( @@ -125,6 +138,8 @@ std::shared_ptr DatabaseManager::findLogItem( const std::string &backupID, const std::string &logID) { + LOG(INFO) << "[DatabaseManager::findLogItem] backup id " << backupID; + LOG(INFO) << "[DatabaseManager::findLogItem] log id " << logID; Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( LogItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(backupID)); @@ -136,6 +151,8 @@ std::vector> DatabaseManager::findLogItemsForBackup(const std::string &backupID) { + LOG(INFO) << "[DatabaseManager::findLogItemsForBackup] backup id " + << backupID; std::vector> result; std::shared_ptr item = createItemByType(); @@ -158,6 +175,8 @@ result.push_back(std::make_shared(item)); } + LOG(INFO) << "[DatabaseManager::findLogItemsForBackup] result size " + << result.size(); return result; } @@ -165,6 +184,7 @@ if (item == nullptr) { return; } + LOG(INFO) << "[DatabaseManager::removeLogItem]"; this->innerRemoveItem(*item); } diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -16,6 +16,8 @@ std::unique_ptr BlobGetClientReactor::readResponse(blob::GetResponse &response) { + LOG(INFO) << "[BlobGetClientReactor::readResponse] data chunk size " + << response.datachunk().size(); if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { throw std::runtime_error("error reading data from the blob service"); } @@ -23,6 +25,7 @@ } void BlobGetClientReactor::doneCallback() { + LOG(INFO) << "[BlobGetClientReactor::doneCallback]"; this->dataChunks->write(""); this->terminationNotifier->notify_one(); } diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -16,6 +16,9 @@ void BlobPutClientReactor::scheduleSendingDataChunk( std::unique_ptr dataChunk) { + LOG(INFO) + << "[BlobPutClientReactor::scheduleSendingDataChunk] data chunk size " + << dataChunk->size(); if (!this->dataChunks.write(std::move(*dataChunk))) { throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); @@ -25,21 +28,30 @@ std::unique_ptr BlobPutClientReactor::prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) { + LOG(INFO) << "[BlobPutClientReactor::prepareRequest]"; if (this->state == State::SEND_HOLDER) { this->request.set_holder(this->holder); + LOG(INFO) << "[BlobPutClientReactor::prepareRequest] holder " + << this->holder; this->state = State::SEND_HASH; return nullptr; } if (this->state == State::SEND_HASH) { request.set_blobhash(this->hash); + LOG(INFO) << "[BlobPutClientReactor::prepareRequest] hash " << this->hash; this->state = State::SEND_CHUNKS; return nullptr; } + LOG(INFO) << "[BlobPutClientReactor::prepareRequest] data exists " + << previousResponse->dataexists(); if (previousResponse->dataexists()) { return std::make_unique(grpc::Status::OK); } std::string dataChunk; + LOG(INFO) << "[BlobPutClientReactor::prepareRequest] reading data chunk"; this->dataChunks.blockingRead(dataChunk); + LOG(INFO) << "[BlobPutClientReactor::prepareRequest] read data chunk " + << dataChunk.size(); if (dataChunk.empty()) { return std::make_unique(grpc::Status::OK); } @@ -48,6 +60,7 @@ } void BlobPutClientReactor::doneCallback() { + LOG(INFO) << "[BlobPutClientReactor::doneCallback]"; this->terminationNotifier->notify_one(); } diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -20,6 +20,9 @@ std::string backupID = request->backupid(); std::string logID = request->logid(); const std::string holders = request->holders(); + LOG(INFO) << "[AddAttachmentsUtility::processRequest] user id/backup id/log " + "id/holders " + << userID << "/" << backupID << "/" << logID << "/" << holders; try { if (userID.empty()) { throw std::runtime_error("user id required but not provided"); @@ -33,6 +36,8 @@ if (logID.empty()) { // add these attachments to backup + LOG(INFO) << "[AddAttachmentsUtility::processRequest] adding attachments " + "to backup"; std::shared_ptr backupItem = database::DatabaseManager::getInstance().findBackupItem( userID, backupID); @@ -40,6 +45,8 @@ database::DatabaseManager::getInstance().putBackupItem(*backupItem); } else { // add these attachments to log + LOG(INFO) << "[AddAttachmentsUtility::processRequest] adding attachments " + "to log"; std::shared_ptr logItem = database::DatabaseManager::getInstance().findLogItem(backupID, logID); logItem->addAttachmentHolders(holders); @@ -49,6 +56,7 @@ bool old = logItem->getPersistedInBlob(); logItem = this->moveToS3(logItem); } + LOG(INFO) << "[AddAttachmentsUtility::processRequest] update log item"; database::DatabaseManager::getInstance().putLogItem(*logItem); } } catch (std::runtime_error &e) { @@ -60,9 +68,13 @@ std::shared_ptr AddAttachmentsUtility::moveToS3(std::shared_ptr logItem) { + LOG(INFO) << "[AddAttachmentsUtility::moveToS3] backup id/log id " + << logItem->getBackupID() << "/" << logItem->getLogID(); std::string holder = tools::generateHolder( logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID()); std::string data = std::move(logItem->getValue()); + LOG(INFO) << "[AddAttachmentsUtility::moveToS3] holder/data size " << holder + << "/" << data.size(); std::shared_ptr newLogItem = std::make_shared( logItem->getBackupID(), @@ -81,6 +93,7 @@ std::unique_lock lockPut(blobPutDoneCVMutex); putReactor->scheduleSendingDataChunk( std::make_unique(std::move(data))); + LOG(INFO) << "[AddAttachmentsUtility::moveToS3] schedule empty chunk"; putReactor->scheduleSendingDataChunk(std::make_unique("")); if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) { blobPutDoneCV.wait(lockPut); @@ -89,6 +102,7 @@ throw std::runtime_error( putReactor->getStatusHolder()->getStatus().error_message()); } + LOG(INFO) << "[AddAttachmentsUtility::moveToS3] terminating"; return newLogItem; } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -19,6 +19,7 @@ std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { + LOG(INFO) << "[CreateNewBackupReactor::handleRequest]"; // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); @@ -28,6 +29,8 @@ throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); + LOG(INFO) << "[CreateNewBackupReactor::handleRequest] user id " + << this->userID; this->state = State::DEVICE_ID; return nullptr; } @@ -36,6 +39,8 @@ throw std::runtime_error("device id expected but not received"); } this->deviceID = request.deviceid(); + LOG(INFO) << "[CreateNewBackupReactor::handleRequest] device id " + << this->deviceID; this->state = State::KEY_ENTROPY; return nullptr; } @@ -45,6 +50,8 @@ "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); + LOG(INFO) << "[CreateNewBackupReactor::handleRequest] key entropy " + << this->keyEntropy; this->state = State::DATA_HASH; return nullptr; } @@ -53,9 +60,13 @@ throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); + LOG(INFO) << "[CreateNewBackupReactor::handleRequest] data hash " + << this->dataHash; this->state = State::DATA_CHUNKS; this->backupID = this->generateBackupID(); + LOG(INFO) << "[CreateNewBackupReactor::handleRequest] backup id " + << this->backupID; if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) != nullptr) { throw std::runtime_error( @@ -64,12 +75,16 @@ } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); + LOG(INFO) << "[CreateNewBackupReactor::handleRequest] holder " + << this->holder; this->putReactor = std::make_shared( this->holder, this->dataHash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { + LOG(INFO) << "[CreateNewBackupReactor::handleRequest] data chunk size " + << request.newcompactionchunk().size(); this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; @@ -79,13 +94,20 @@ } void CreateNewBackupReactor::terminateCallback() { + LOG(INFO) + << "[CreateNewBackupReactor::terminateCallback] put reactor present " + << (this->putReactor != nullptr); const std::lock_guard lock(this->reactorStateMutex); if (this->putReactor == nullptr) { return; } + LOG(INFO) << "[CreateNewBackupReactor::terminateCallback] scheduling empty " + "data chunk"; this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { + LOG(INFO) << "[CreateNewBackupReactor::terminateCallback] waiting for put " + "reactor"; this->blobPutDoneCV.wait(lock2); } if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { @@ -95,6 +117,8 @@ throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } + LOG(INFO) + << "[CreateNewBackupReactor::terminateCallback] putting backup item"; // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -14,6 +14,7 @@ } void PullBackupReactor::initializeGetReactor(const std::string &holder) { + LOG(INFO) << "[PullBackupReactor::initializeGetReactor] holder " << holder; if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); @@ -25,6 +26,7 @@ } void PullBackupReactor::initialize() { + LOG(INFO) << "[PullBackupReactor::initialize]"; // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); @@ -44,10 +46,13 @@ } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); + LOG(INFO) << "[PullBackupReactor::initialize] logs size " + << this->logs.size(); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { + LOG(INFO) << "[PullBackupReactor::writeResponse]"; // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); @@ -55,6 +60,7 @@ response->set_backupid(""); size_t extraBytesNeeded = 0; if (this->state == State::COMPACTION) { + LOG(INFO) << "[PullBackupReactor::writeResponse][compaction]"; response->set_backupid(this->backupItem->getBackupID()); extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); extraBytesNeeded += this->backupItem->getBackupID().size(); @@ -66,14 +72,28 @@ this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; + LOG(INFO) + << "[PullBackupReactor::writeResponse][compaction] internal buffer " + "size/chunk limit " + << this->internalBuffer.size() << "/" << this->chunkLimit; if (this->internalBuffer.size() < this->chunkLimit) { + LOG(INFO) + << "[PullBackupReactor::writeResponse][compaction] blocking read"; this->dataChunks->blockingRead(dataChunk); } + LOG(INFO) << "[PullBackupReactor::writeResponse][compaction] data chunk " + "size/extra " + "bytes needed/chunk limit " + << dataChunk.size() << "/" << extraBytesNeeded << "/" + << this->chunkLimit; if (!dataChunk.empty() || this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) { dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_compactionchunk(dataChunk); + LOG(INFO) << "[PullBackupReactor::writeResponse][compaction] sending " + "data chunk " + << dataChunk.size(); return nullptr; } if (!this->dataChunks->isEmpty()) { @@ -85,18 +105,25 @@ this->getReactor->getStatusHolder()->getStatus().error_message()); } this->state = State::LOGS; + LOG(INFO) << "[PullBackupReactor::writeResponse][compaction] internal " + "buffer size " + << this->internalBuffer.size(); if (!this->internalBuffer.empty()) { response->set_compactionchunk(std::move(this->internalBuffer)); return nullptr; } } if (this->state == State::LOGS) { + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] logs size " + << logs.size(); // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { // this means that there are no logs at all so we just terminate with // the compaction return std::make_unique(grpc::Status::OK); } + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] current log index " + << this->currentLogIndex; if (this->currentLogIndex == this->logs.size()) { // we reached the end of the logs collection so we just want to // terminate either we terminate with an error if we have some dangling @@ -104,11 +131,17 @@ if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } + LOG(INFO) + << "[PullBackupReactor::writeResponse][logs] internal buffer size " + << this->internalBuffer.size(); if (!this->internalBuffer.empty()) { response->set_logid(this->previousLogID); + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] log id " + << this->previousLogID; response->set_logchunk(std::move(this->internalBuffer)); return nullptr; } + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] terminating"; return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { @@ -119,7 +152,11 @@ // this means that we're not reading anything between invocations of // writeResponse // it is only not null when we read data in chunks + LOG(INFO) + << "[PullBackupReactor::writeResponse][logs] is current log present" + << (this->currentLog != nullptr); if (this->currentLog == nullptr) { + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] new current log"; this->currentLog = this->logs.at(this->currentLogIndex); extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); extraBytesNeeded += this->currentLog->getLogID().size(); @@ -128,6 +165,9 @@ extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] is current log " + "persisted in blob " + << this->currentLog->getPersistedInBlob(); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor // and proceed @@ -147,14 +187,28 @@ } response->set_backupid(this->currentLog->getBackupID()); response->set_logid(this->currentLog->getLogID()); + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] backup id/log id " + << this->currentLog->getBackupID() << "/" + << this->currentLog->getLogID(); // we want to read the chunks from the blob through the get client until // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] internal buffer " + "size/chunk limit/end of queue " + << this->internalBuffer.size() << "/" << this->chunkLimit << "/" + << this->endOfQueue; if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] blocking read"; this->dataChunks->blockingRead(dataChunk); } this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] new end of queue " + << this->endOfQueue; + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] extra bytes needed " + << extraBytesNeeded; dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] data chunk size " + << dataChunk.size(); if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); @@ -168,6 +222,8 @@ } else { dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); + LOG(INFO) << "[PullBackupReactor::writeResponse][logs] data chunk size2 " + << dataChunk.size(); response->set_logchunk(dataChunk); } return nullptr; @@ -181,11 +237,16 @@ this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; this->endOfQueue = false; + LOG(INFO) << "[PullBackupReactor::nextLog] current log index/previous log id " + << this->currentLogIndex << "/" << this->previousLogID; } std::string PullBackupReactor::prepareDataChunkWithPadding( const std::string &dataChunk, size_t padding) { + LOG(INFO) << "[PullBackupReactor::prepareDataChunkWithPadding] data chunk " + "size/padding " + << dataChunk.size() << "/" << padding; if (dataChunk.size() > this->chunkLimit) { throw std::runtime_error("received data chunk bigger than the chunk limit"); } @@ -202,14 +263,21 @@ throw std::runtime_error("new data chunk incorrectly calculated"); } + LOG(INFO) + << "[PullBackupReactor::prepareDataChunkWithPadding] new chunk size " + << chunk.size(); return chunk; } void PullBackupReactor::terminateCallback() { + LOG(INFO) << "[PullBackupReactor::terminateCallback] get reactor present " + << (this->getReactor != nullptr); const std::lock_guard lock(this->reactorStateMutex); std::unique_lock lockGet(this->blobGetDoneCVMutex); if (this->getReactor != nullptr) { if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { + LOG(INFO) + << "[PullBackupReactor::terminateCallback] waiting for get reactor"; this->blobGetDoneCV.wait(lockGet); } if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { @@ -224,6 +292,7 @@ throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } + LOG(INFO) << "[PullBackupReactor::terminateCallback] terminating"; } } // namespace reactor diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -10,6 +10,7 @@ namespace reactor { void SendLogReactor::storeInDatabase() { + LOG(INFO) << "[SendLogReactor::storeInDatabase]"; bool storedInBlob = this->persistenceMethod == PersistenceMethod::BLOB; database::LogItem logItem( this->backupID, @@ -29,11 +30,13 @@ } std::string SendLogReactor::generateLogID(const std::string &backupID) { + LOG(INFO) << "[SendLogReactor::generateLogID] backup id " << backupID; return backupID + tools::ID_SEPARATOR + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutReactor() { + LOG(INFO) << "[SendLogReactor::initializePutReactor]"; if (this->blobHolder.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty blob holder"); @@ -51,6 +54,8 @@ std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { + LOG(INFO) << "[SendLogReactor::readRequest] persistence method " + << (int)this->persistenceMethod; // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); @@ -60,6 +65,7 @@ throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); + LOG(INFO) << "[SendLogReactor::readRequest] user id " << this->userID; this->state = State::BACKUP_ID; return nullptr; }; @@ -68,12 +74,14 @@ throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); + LOG(INFO) << "[SendLogReactor::readRequest] backup id " << this->backupID; if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) == nullptr) { throw std::runtime_error( "trying to send log for a non-existent backup"); } this->logID = this->generateLogID(this->backupID); + LOG(INFO) << "[SendLogReactor::readRequest] log id " << this->logID; this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; @@ -83,6 +91,7 @@ throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); + LOG(INFO) << "[SendLogReactor::readRequest] log hash " << this->hash; this->state = State::LOG_CHUNK; return nullptr; }; @@ -92,6 +101,8 @@ } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); + LOG(INFO) << "[SendLogReactor::readRequest] log chunk size " + << chunk->size(); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } @@ -110,13 +121,20 @@ return nullptr; } this->value += std::move(*chunk); + LOG(INFO) << "[SendLogReactor::readRequest] new value size " + << this->value.size(); database::LogItem logItem = database::LogItem( this->backupID, this->logID, true, this->value, "", this->hash); + LOG(INFO) << "[SendLogReactor::readRequest] log item size " + << database::LogItem::getItemSize(&logItem) << "/" + << LOG_DATA_SIZE_DATABASE_LIMIT; if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { this->persistenceMethod = PersistenceMethod::BLOB; this->blobHolder = tools::generateHolder(this->hash, this->backupID, this->logID); + LOG(INFO) << "[SendLogReactor::readRequest] new holder " + << this->blobHolder; this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk( std::make_unique(this->value)); @@ -131,6 +149,7 @@ } void SendLogReactor::terminateCallback() { + LOG(INFO) << "[SendLogReactor::terminateCallback]"; const std::lock_guard lock(this->reactorStateMutex); if (!this->getStatusHolder()->getStatus().ok()) { @@ -151,17 +170,21 @@ this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { + LOG(INFO) + << "[SendLogReactor::terminateCallback] waiting for the put reactor"; this->blobPutDoneCV.wait(lockPut); } if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } + LOG(INFO) << "[SendLogReactor::terminateCallback] finalizing"; // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { + LOG(INFO) << "[SendLogReactor::terminateCallback] doneCallback"; // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); diff --git a/services/backup/src/Tools.cpp b/services/backup/src/Tools.cpp --- a/services/backup/src/Tools.cpp +++ b/services/backup/src/Tools.cpp @@ -3,6 +3,8 @@ #include "GlobalConstants.h" #include "GlobalTools.h" +#include + #include #include #include @@ -14,6 +16,7 @@ namespace tools { std::string generateRandomString(std::size_t length) { + LOG(INFO) << "[generateRandomString] length " << length; const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; thread_local std::random_device generator; @@ -29,11 +32,15 @@ const std::string &blobHash, const std::string &backupID, const std::string &resourceID) { + LOG(INFO) << "[generateHolder] blob hash " << blobHash; + LOG(INFO) << "[generateHolder] backup id " << backupID; + LOG(INFO) << "[generateHolder] resource id " << resourceID; return backupID + ID_SEPARATOR + resourceID + ID_SEPARATOR + blobHash + ID_SEPARATOR + tools::generateUUID(); } std::string validateAttachmentHolders(const std::string &holders) { + LOG(INFO) << "[validateAttachmentHolders] holders " << holders; std::stringstream stream(holders); std::string item; std::string result;