diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -69,35 +69,41 @@ // put into S3 std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - put_client_initialize_cxx(); + put_client_initialize_cxx(holder.c_str()); put_client_write_cxx( - tools::getBlobPutField(tools::BlobPutField::HOLDER), holder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + holder.c_str(), + tools::getBlobPutField(tools::BlobPutField::HOLDER), + holder.c_str()); + put_client_blocking_read_cxx( + holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(tools::BlobPutField::HASH), newLogItem->getDataHash().c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + rust::String responseStr = put_client_blocking_read_cxx( + holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors // data exists? if (!(bool)tools::charPtrToInt(responseStr.c_str())) { put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), std::move(data).c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors } - put_client_terminate_cxx(); + put_client_terminate_cxx(holder.c_str()); return newLogItem; } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -67,26 +67,29 @@ response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); bool dataExists = false; - put_client_initialize_cxx(); + put_client_initialize_cxx(this->holder.c_str()); put_client_write_cxx( + this->holder.c_str(), tools::getBlobPutField(tools::BlobPutField::HOLDER), this->holder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors put_client_write_cxx( + this->holder.c_str(), tools::getBlobPutField(tools::BlobPutField::HASH), this->dataHash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique( @@ -99,14 +102,16 @@ return std::make_unique(grpc::Status::OK); } put_client_write_cxx( + this->holder.c_str(), tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), std::string(std::move(*request.mutable_newcompactionchunk())) .c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors return nullptr; } @@ -116,7 +121,7 @@ void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(); + put_client_terminate_cxx(this->holder.c_str()); // TODO add recovery data // TODO handle attachments holders diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -44,7 +44,7 @@ throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - put_client_initialize_cxx(); + put_client_initialize_cxx(this->blobHolder.c_str()); } std::unique_ptr @@ -99,13 +99,15 @@ } if (this->persistenceMethod == PersistenceMethod::BLOB) { put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), request.mutable_logdata()->c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors return nullptr; } @@ -119,35 +121,40 @@ tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutClient(); put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(tools::BlobPutField::HOLDER), this->blobHolder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(tools::BlobPutField::HASH), this->hash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique(grpc::Status::OK); } put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), std::move(this->value).c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; @@ -160,7 +167,7 @@ void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(); + put_client_terminate_cxx(this->blobHolder.c_str()); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error(