diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h index a563ba298..e15fbc3ea 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h @@ -1,59 +1,58 @@ #pragma once #include "ClientReadReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobGetClientReactor : public ClientReadReactorBase { std::string holder; std::shared_ptr> dataChunks; public: BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks); std::unique_ptr readResponse(blob::GetResponse &response) override; void doneCallback() override; grpc::Status getStatus() const; }; BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks) : holder(holder), dataChunks(dataChunks) { } std::unique_ptr BlobGetClientReactor::readResponse(blob::GetResponse &response) { if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { - throw std::runtime_error( - "error reading compaction data from the blob service"); + throw std::runtime_error("error reading data from the blob service"); } return nullptr; } void BlobGetClientReactor::doneCallback() { this->dataChunks->write(""); } grpc::Status BlobGetClientReactor::getStatus() const { return this->status; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h index 71a289b45..2b5173e1d 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h @@ -1,159 +1,188 @@ #pragma once #include "BlobGetClientReactor.h" #include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" #include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { class PullBackupReactor : public ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { enum class State { COMPACTION = 1, LOGS = 2, }; std::shared_ptr backupItem; std::shared_ptr getReactor; std::mutex reactorStateMutex; std::shared_ptr> dataChunks; ServiceBlobClient blobClient; State state = State::COMPACTION; std::vector> logs; size_t currentLogIndex = 0; std::shared_ptr currentLog; void initializeGetReactor(const std::string &holder); public: PullBackupReactor(const backup::PullBackupRequest *request); void initialize() override; std::unique_ptr writeResponse(backup::PullBackupResponse *response) override; void terminateCallback() override; }; PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request), dataChunks(std::make_shared>(100)) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - if (this->getReactor == nullptr) { - this->getReactor = std::make_shared( - holder, this->dataChunks); - this->getReactor->request.set_holder(holder); - this->blobClient.get(this->getReactor); - } + this->getReactor.reset( + new reactor::BlobGetClientReactor(holder, this->dataChunks)); + this->getReactor->request.set_holder(holder); + this->blobClient.get(this->getReactor); } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->state == State::COMPACTION) { - this->initializeGetReactor(this->backupItem->getCompactionHolder()); + if (this->getReactor == nullptr) { + this->initializeGetReactor(this->backupItem->getCompactionHolder()); + } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!dataChunk.empty()) { response->set_compactionchunk(dataChunk); return nullptr; } if (!this->dataChunks->isEmpty()) { throw std::runtime_error( "dangling data discovered after reading compaction"); } - this->getReactor = nullptr; + if (!this->getReactor->getStatus().ok()) { + throw std::runtime_error(this->getReactor->getStatus().error_message()); + } this->state = State::LOGS; } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { + // this means that there are no logs at all so we just terminate with the + // compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { + // we reached the end of the logs collection so we just want to terminate + // either we terminate with an error if we have some dangling data + // or with success if we don't + if (!this->dataChunks->isEmpty()) { + throw std::runtime_error("dangling data discovered after reading logs"); + } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { + // we went out of the scope of the logs collection, this should never + // happen and should be perceived as an error throw std::runtime_error("log index out of bound"); } + // this means that we're not reading anything between invocations of + // writeResponse + // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); if (this->currentLog->getPersistedInBlob()) { + // if the item is stored in the blob, we initialize the get reactor and + // proceed this->initializeGetReactor(this->currentLog->getValue()); } else { + // if the item is persisted in the database, we just take it, send the + // data to the client and reset currentLog so the next invocation of + // writeResponse will take another one from the collection response->set_logchunk(this->currentLog->getValue()); ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } } + // we want to read the chunks from the blob through the get client until we + // get an empty chunk - a sign of "end of chunks" std::string dataChunk; this->dataChunks->blockingRead(dataChunk); + if (!this->getReactor->getStatus().ok()) { + throw std::runtime_error(this->getReactor->getStatus().error_message()); + } + // if we get an empty chunk, we reset the currentLog so we can read the next + // one from the logs collection. + // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { ++this->currentLogIndex; this->currentLog = nullptr; + return nullptr; } else { response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } void PullBackupReactor::terminateCallback() { if (!this->getReactor->getStatus().ok()) { throw std::runtime_error(this->getReactor->getStatus().error_message()); } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/GetReactor.h b/services/blob/src/Reactors/server/GetReactor.h index 94de6e6a7..2370dbeb2 100644 --- a/services/blob/src/Reactors/server/GetReactor.h +++ b/services/blob/src/Reactors/server/GetReactor.h @@ -1,87 +1,87 @@ #pragma once #include "ServerWriteReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class GetReactor : public ServerWriteReactorBase { size_t offset = 0; size_t fileSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; database::S3Path s3Path; Aws::S3::Model::GetObjectRequest getRequest; public: using ServerWriteReactorBase:: ServerWriteReactorBase; std::unique_ptr writeResponse(blob::GetResponse *response) override { if (this->offset >= this->fileSize) { return std::make_unique(grpc::Status::OK); } const size_t nextSize = std::min(this->chunkSize, this->fileSize - this->offset); std::string range = "bytes=" + std::to_string(this->offset) + "-" + - std::to_string(this->offset + nextSize); + std::to_string(this->offset + nextSize - 1); this->getRequest.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(this->getRequest); if (!getOutcome.IsSuccess()) { return std::make_unique( grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); response->set_datachunk(result); this->offset += nextSize; return nullptr; } void initialize() override { this->s3Path = findS3Path(this->request.holder()); this->fileSize = getBucket(s3Path.getBucketName()).getObjectSize(s3Path.getObjectName()); this->getRequest.SetBucket(this->s3Path.getBucketName()); this->getRequest.SetKey(this->s3Path.getObjectName()); AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName()); if (!bucket.isAvailable()) { throw std::runtime_error( "bucket [" + this->s3Path.getBucketName() + "] not available"); } const size_t fileSize = bucket.getObjectSize(this->s3Path.getObjectName()); if (this->fileSize == 0) { throw std::runtime_error("object empty"); } }; void doneCallback() override{}; }; } // namespace reactor } // namespace network } // namespace comm