diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -74,23 +74,26 @@ // put into S3 std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - put_client_initialize_cxx(); + put_client_initialize_cxx(holder.c_str()); put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), holder.c_str()); - put_client_blocking_read_cxx(); + put_client_blocking_read_cxx(holder.c_str()); put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), newLogItem->getDataHash().c_str()); - rust::String responseStr = put_client_blocking_read_cxx(); + rust::String responseStr = put_client_blocking_read_cxx(holder.c_str()); // data exists? if (!(bool)tools::charPtrToInt(responseStr.c_str())) { put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(data).c_str()); - put_client_blocking_read_cxx(); + put_client_blocking_read_cxx(holder.c_str()); } - put_client_terminate_cxx(); + put_client_terminate_cxx(holder.c_str()); return newLogItem; } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -66,26 +66,29 @@ } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - put_client_initialize_cxx(); + put_client_initialize_cxx(this->holder.c_str()); put_client_write_cxx( + this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->holder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors put_client_write_cxx( + this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->dataHash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique( @@ -97,21 +100,18 @@ if (request.mutable_newcompactionchunk()->empty()) { return std::make_unique(grpc::Status::OK); } - try { - put_client_write_cxx( - tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), - std::string(std::move(*request.mutable_newcompactionchunk())) - .c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors - } catch (std::exception &e) { - throw std::runtime_error( - e.what()); // todo in base reactors we can just handle std exception - // instead of keep rethrowing here - } + put_client_write_cxx( + this->holder.c_str(), + tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), + std::string(std::move(*request.mutable_newcompactionchunk())) + .c_str()); + put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + return nullptr; } } @@ -120,7 +120,7 @@ void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(); + put_client_terminate_cxx(this->holder.c_str()); // TODO add recovery data // TODO handle attachments holders diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -44,7 +44,7 @@ throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - put_client_initialize_cxx(); + put_client_initialize_cxx(this->blobHolder.c_str()); } std::unique_ptr @@ -99,13 +99,15 @@ } if (this->persistenceMethod == PersistenceMethod::BLOB) { put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), request.mutable_logdata()->c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors return nullptr; } @@ -119,35 +121,40 @@ tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutClient(); put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->blobHolder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->hash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique(grpc::Status::OK); } put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(this->value).c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; @@ -160,7 +167,7 @@ void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(); + put_client_terminate_cxx(this->blobHolder.c_str()); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error(