Page MenuHomePhorge

D3738.1765255486.diff
No OneTemporary

Size
5 KB
Referenced Files
None
Subscribers
None

D3738.1765255486.diff

diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h
@@ -40,8 +40,7 @@
std::unique_ptr<grpc::Status>
BlobGetClientReactor::readResponse(blob::GetResponse &response) {
if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) {
- throw std::runtime_error(
- "error reading compaction data from the blob service");
+ throw std::runtime_error("error reading data from the blob service");
}
return nullptr;
}
diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
@@ -63,12 +63,10 @@
throw std::runtime_error(
"get reactor cannot be initialized when backup item is missing");
}
- if (this->getReactor == nullptr) {
- this->getReactor = std::make_shared<reactor::BlobGetClientReactor>(
- holder, this->dataChunks);
- this->getReactor->request.set_holder(holder);
- this->blobClient.get(this->getReactor);
- }
+ this->getReactor.reset(
+ new reactor::BlobGetClientReactor(holder, this->dataChunks));
+ this->getReactor->request.set_holder(holder);
+ this->blobClient.get(this->getReactor);
}
void PullBackupReactor::initialize() {
@@ -99,7 +97,9 @@
// as there may be multiple threads from the pool taking over here
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
if (this->state == State::COMPACTION) {
- this->initializeGetReactor(this->backupItem->getCompactionHolder());
+ if (this->getReactor == nullptr) {
+ this->initializeGetReactor(this->backupItem->getCompactionHolder());
+ }
std::string dataChunk;
this->dataChunks->blockingRead(dataChunk);
if (!dataChunk.empty()) {
@@ -110,36 +110,65 @@
throw std::runtime_error(
"dangling data discovered after reading compaction");
}
- this->getReactor = nullptr;
+ if (!this->getReactor->getStatus().ok()) {
+ throw std::runtime_error(this->getReactor->getStatus().error_message());
+ }
this->state = State::LOGS;
}
if (this->state == State::LOGS) {
// TODO make sure logs are received in correct order regardless their size
if (this->logs.empty()) {
+ // this means that there are no logs at all so we just terminate with the
+ // compaction
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex == this->logs.size()) {
+ // we reached the end of the logs collection so we just want to terminate
+ // either we terminate with an error if we have some dangling data
+ // or with success if we don't
+ if (!this->dataChunks->isEmpty()) {
+ throw std::runtime_error("dangling data discovered after reading logs");
+ }
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex > this->logs.size()) {
+ // we went out of the scope of the logs collection, this should never
+ // happen and should be perceived as an error
throw std::runtime_error("log index out of bound");
}
+ // this means that we're not reading anything between invocations of
+ // writeResponse
+ // it is only not null when we read data in chunks
if (this->currentLog == nullptr) {
this->currentLog = this->logs.at(this->currentLogIndex);
if (this->currentLog->getPersistedInBlob()) {
+ // if the item is stored in the blob, we initialize the get reactor and
+ // proceed
this->initializeGetReactor(this->currentLog->getValue());
} else {
+ // if the item is persisted in the database, we just take it, send the
+ // data to the client and reset currentLog so the next invocation of
+ // writeResponse will take another one from the collection
response->set_logchunk(this->currentLog->getValue());
++this->currentLogIndex;
this->currentLog = nullptr;
return nullptr;
}
}
+ // we want to read the chunks from the blob through the get client until we
+ // get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
this->dataChunks->blockingRead(dataChunk);
+ if (!this->getReactor->getStatus().ok()) {
+ throw std::runtime_error(this->getReactor->getStatus().error_message());
+ }
+ // if we get an empty chunk, we reset the currentLog so we can read the next
+ // one from the logs collection.
+ // If there's data inside, we write it to the client and proceed.
if (dataChunk.empty()) {
++this->currentLogIndex;
this->currentLog = nullptr;
+ return nullptr;
} else {
response->set_logchunk(dataChunk);
}
diff --git a/services/blob/src/Reactors/server/GetReactor.h b/services/blob/src/Reactors/server/GetReactor.h
--- a/services/blob/src/Reactors/server/GetReactor.h
+++ b/services/blob/src/Reactors/server/GetReactor.h
@@ -38,7 +38,7 @@
std::min(this->chunkSize, this->fileSize - this->offset);
std::string range = "bytes=" + std::to_string(this->offset) + "-" +
- std::to_string(this->offset + nextSize);
+ std::to_string(this->offset + nextSize - 1);
this->getRequest.SetRange(range);
Aws::S3::Model::GetObjectOutcome getOutcome =

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 9, 4:44 AM (2 h, 38 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5852626
Default Alt Text
D3738.1765255486.diff (5 KB)

Event Timeline