diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h @@ -40,8 +40,7 @@ std::unique_ptr BlobGetClientReactor::readResponse(blob::GetResponse &response) { if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { - throw std::runtime_error( - "error reading compaction data from the blob service"); + throw std::runtime_error("error reading data from the blob service"); } return nullptr; } diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h @@ -63,12 +63,10 @@ throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - if (this->getReactor == nullptr) { - this->getReactor = std::make_shared( - holder, this->dataChunks); - this->getReactor->request.set_holder(holder); - this->blobClient.get(this->getReactor); - } + this->getReactor.reset( + new reactor::BlobGetClientReactor(holder, this->dataChunks)); + this->getReactor->request.set_holder(holder); + this->blobClient.get(this->getReactor); } void PullBackupReactor::initialize() { @@ -99,27 +97,35 @@ // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->state == State::COMPACTION) { + if (this->getReactor == nullptr) { this->initializeGetReactor(this->backupItem->getCompactionHolder()); - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!dataChunk.empty()) { - response->set_compactionchunk(dataChunk); - return nullptr; - } else { - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading compaction"); - } - this->getReactor = nullptr; - this->state = State::LOGS; + } + std::string dataChunk; + this->dataChunks->blockingRead(dataChunk); + if (!dataChunk.empty()) { + response->set_compactionchunk(dataChunk); + return nullptr; + } else { + if (!this->dataChunks->isEmpty()) { + throw std::runtime_error( + "dangling data discovered after reading compaction"); + } + if (!this->getReactor->getStatus().ok()) { + throw std::runtime_error(this->getReactor->getStatus().error_message()); } + this->state = State::LOGS; } + } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { + if (!this->dataChunks->isEmpty()) { + throw std::runtime_error( + "dangling data discovered after reading logs"); + } return std::make_unique(grpc::Status::OK); } else if (this->currentLogIndex > this->logs.size()) { throw std::runtime_error("log index out of bound"); @@ -137,6 +143,9 @@ } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); + if (!this->getReactor->getStatus().ok()) { + throw std::runtime_error(this->getReactor->getStatus().error_message()); + } if (dataChunk.empty()) { ++this->currentLogIndex; this->currentLog = nullptr; diff --git a/services/blob/src/Reactors/server/GetReactor.h b/services/blob/src/Reactors/server/GetReactor.h --- a/services/blob/src/Reactors/server/GetReactor.h +++ b/services/blob/src/Reactors/server/GetReactor.h @@ -38,7 +38,7 @@ std::min(this->chunkSize, this->fileSize - this->offset); std::string range = "bytes=" + std::to_string(this->offset) + "-" + - std::to_string(this->offset + nextSize); + std::to_string(this->offset + nextSize - 1); this->getRequest.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome =