diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs index fcd932095..3ce8e0681 100644 --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -1,143 +1,137 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer}; use anyhow::bail; use crate::RUNTIME; use lazy_static::lazy_static; -use libc; -use libc::c_char; use std::collections::HashMap; use std::sync::Mutex; use tokio::sync::mpsc; use tokio::task::JoinHandle; struct ReadClient { rx: mpsc::Receiver>, rx_handle: JoinHandle>, } lazy_static! { // todo: we should consider limiting the clients size, // if every client is able to allocate up to 4MB data at a time static ref CLIENTS: Mutex> = Mutex::new(HashMap::new()); static ref ERROR_MESSAGES: Mutex> = Mutex::new(Vec::new()); } fn is_initialized(holder: &str) -> anyhow::Result { if let Ok(clients) = CLIENTS.lock() { return Ok(clients.contains_key(holder)); } bail!("couldn't access client"); } pub fn get_client_initialize_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if is_initialized(&holder)? { - get_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; + get_client_terminate_cxx(holder.clone())?; } // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); - let cloned_holder = holder.clone(); + let holder_string = holder.to_string(); let rx_handle = RUNTIME.spawn(async move { let response = grpc_client .get(GetRequest { - holder: cloned_holder, + holder: holder_string, }) .await?; let mut inner_response = response.into_inner(); loop { let maybe_data = inner_response.message().await?; let mut result = false; if let Some(data) = maybe_data { let data: Vec = data.data_chunk; result = match response_thread_tx.send(data).await { Ok(_) => true, Err(err) => { bail!(err); } } } if !result { break; } } Ok(()) }); if let Ok(mut clients) = CLIENTS.lock() { let client = ReadClient { rx_handle, rx: response_thread_rx, }; - (*clients).insert(holder, client); + (*clients).insert(holder.to_string(), client); return Ok(()); } bail!("could not access client"); } bail!("could not successfully connect to the blob server") } pub fn get_client_blocking_read_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result, anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { - if let Some(client) = clients.get_mut(&holder) { + if let Some(client) = clients.get_mut(&holder.to_string()) { let maybe_data = client.rx.recv().await; return Ok(maybe_data.unwrap_or_else(|| vec![])); } else { bail!(format!("no client present for {}", holder)); } } else { bail!("couldn't access client"); } })?) } pub fn get_client_terminate_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if !is_initialized(&holder)? { return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { - match clients.remove(&holder) { + match clients.remove(&holder.to_string()) { Some(client) => { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { bail!(format!("awaiting for the client {} failed", holder)); } Ok(()) })?; } None => { bail!(format!("no client foudn for {}", holder)); } } } else { bail!("couldn't access client"); } if is_initialized(&holder)? { bail!("client transmitter handler released properly"); } Ok(()) } diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs index cf1c7ae98..0fc42700b 100644 --- a/services/backup/blob_client/src/lib.rs +++ b/services/backup/blob_client/src/lib.rs @@ -1,51 +1,50 @@ mod constants; mod get_client; mod put_client; -mod tools; use lazy_static::lazy_static; use tokio::runtime; use put_client::{ put_client_blocking_read_cxx, put_client_initialize_cxx, put_client_terminate_cxx, put_client_write_cxx, }; use get_client::{ get_client_blocking_read_cxx, get_client_initialize_cxx, get_client_terminate_cxx, }; lazy_static! { static ref RUNTIME: runtime::Runtime = runtime::Runtime::new() .expect("Unable to create tokio runtime"); } #[cxx::bridge] mod ffi { extern "Rust" { - unsafe fn put_client_initialize_cxx( - holder_char: *const c_char, + fn put_client_initialize_cxx( + holder_char: &str, ) -> Result<()>; unsafe fn put_client_write_cxx( - holder_char: *const c_char, + holder_char: &str, field_index: usize, data: *const c_char, ) -> Result<()>; - unsafe fn put_client_blocking_read_cxx( - holder_char: *const c_char, + fn put_client_blocking_read_cxx( + holder_char: &str, ) -> Result; - unsafe fn put_client_terminate_cxx( - holder_char: *const c_char, + fn put_client_terminate_cxx( + holder_char: &str, ) -> Result<()>; - unsafe fn get_client_initialize_cxx( - holder_char: *const c_char, + fn get_client_initialize_cxx( + holder_char: &str, ) -> Result<()>; - unsafe fn get_client_blocking_read_cxx( - holder_char: *const c_char, + fn get_client_blocking_read_cxx( + holder_char: &str, ) -> Result>; - unsafe fn get_client_terminate_cxx( - holder_char: *const c_char, + fn get_client_terminate_cxx( + holder_char: &str, ) -> Result<()>; } } diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs index e7232dcaa..5a8aa5d90 100644 --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -1,255 +1,250 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::put_request; use proto::put_request::Data::*; use proto::PutRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer}; use anyhow::bail; use crate::RUNTIME; use lazy_static::lazy_static; use libc; use libc::c_char; use std::collections::HashMap; use std::ffi::CStr; use std::sync::Mutex; use tokio::sync::mpsc; use tokio::task::JoinHandle; #[derive(Debug)] struct PutRequestData { field_index: usize, data: Vec, } struct BidiClient { tx: mpsc::Sender, rx: mpsc::Receiver, rx_handle: JoinHandle>, } lazy_static! { // todo: we should consider limiting the clients size, // if every client is able to allocate up to 4MB data at a time static ref CLIENTS: Mutex> = Mutex::new(HashMap::new()); static ref ERROR_MESSAGES: Mutex> = Mutex::new(Vec::new()); } fn is_initialized(holder: &str) -> anyhow::Result { match CLIENTS.lock() { Ok(clients) => Ok(clients.contains_key(holder)), _ => bail!("couldn't access client") } } pub fn put_client_initialize_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if is_initialized(&holder)? { - put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; + put_client_terminate_cxx(&holder.to_string())?; } if is_initialized(&holder)? { bail!("client cannot be initialized twice"); } // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { let (request_thread_tx, mut request_thread_rx) = mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let outbound = async_stream::stream! { let mut maybe_error: Option = None; while let Some(data) = request_thread_rx.recv().await { let request_data: Option = match data.field_index { 1 => { match String::from_utf8(data.data) { Ok(utf8_data) => Some(Holder(utf8_data)), _ => { maybe_error = Some("invalid utf-8".to_string()); break; }, } } 2 => { match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(BlobHash(utf8_data)), None => { maybe_error = Some("invalid utf-8".to_string()); break; }, } } 3 => { Some(DataChunk(data.data)) } _ => { maybe_error = Some(format!("invalid field index value {}", data.field_index)); break; } }; if let Some (unpacked_data) = request_data { let request = PutRequest { data: Some(unpacked_data), }; yield request; } else { maybe_error = Some("an error occured, aborting connection".to_string()); break; } } if let Some(error) = maybe_error { // todo consider handling this differently println!("an error occured in the stream: {}", error); } }; // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let rx_handle = RUNTIME.spawn(async move { match grpc_client.put(tonic::Request::new(outbound)).await { Ok(response) => { let mut inner_response = response.into_inner(); loop { let maybe_response_message = inner_response.message().await?; let mut result = false; if let Some(response_message) = maybe_response_message { // warning: this will produce an error if there's more unread // responses than MPSC_CHANNEL_BUFFER_CAPACITY // you should then use put_client_blocking_read_cxx in order // to dequeue the responses in c++ and make room for more if let Ok(_) = response_thread_tx .try_send((response_message.data_exists as i32).to_string()) { result = true; } else { bail!("response queue full"); } } if !result { break; } } } Err(err) => { bail!(err.to_string()); } }; Ok(()) }); if is_initialized(&holder)? { bail!(format!( "client initialization overlapped for holder {}", holder )); } if let Ok(mut clients) = CLIENTS.lock() { let client = BidiClient { tx: request_thread_tx, rx: response_thread_rx, rx_handle, }; - (*clients).insert(holder, client); + (*clients).insert(holder.to_string(), client); return Ok(()); } bail!(format!("could not access client for holder {}", holder)); } bail!("could not successfully connect to the blob server"); } pub fn put_client_blocking_read_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result { - let holder = c_char_pointer_to_string(holder_char)?; Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { - let maybe_client = clients.get_mut(&holder); + let maybe_client = clients.get_mut(holder); if let Some(client) = maybe_client { if let Some(data) = client.rx.recv().await { return Ok(data); } else { bail!("couldn't receive data via client's receiver"); } } else { bail!(format!( "no client detected for {} in blocking read", holder )); } } else { bail!("couldn't access clients"); } })?) } /** * field index: * 1 - holder (utf8 string) * 2 - blob hash (utf8 string) * 3 - data chunk (bytes) */ pub fn put_client_write_cxx( - holder_char: *const c_char, + holder: &str, field_index: usize, data: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_bytes: Vec = data_c_str.to_bytes().to_vec(); RUNTIME.block_on(async { if let Ok(clients) = CLIENTS.lock() { - let maybe_client = clients.get(&holder); + let maybe_client = clients.get(&holder.to_string()); if let Some(client) = maybe_client { client .tx .send(PutRequestData { field_index, data: data_bytes, }) .await?; return Ok(()); } bail!(format!("no client detected for {} in write", holder)); } else { bail!("couldn't access clients"); } })?; Ok(()) } pub fn put_client_terminate_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if !is_initialized(&holder)? { return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { - let maybe_client = clients.remove(&holder); + let maybe_client = clients.remove(&holder.to_string()); if let Some(client) = maybe_client { drop(client.tx); RUNTIME.block_on(async { client.rx_handle.await? })?; } else { bail!("no client detected in terminate"); } } else { bail!("couldn't access client"); } if is_initialized(&holder)? { bail!("client transmitter handler released properly"); } Ok(()) } diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs deleted file mode 100644 index efd1d4305..000000000 --- a/services/backup/blob_client/src/tools.rs +++ /dev/null @@ -1,15 +0,0 @@ -use libc::c_char; -use std::ffi::{CStr, CString}; - -pub fn c_char_pointer_to_string( - c_char_pointer: *const c_char, -) -> anyhow::Result { - let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) }; - Ok(holder_cstr.to_str()?.to_owned()) -} - -pub fn string_to_c_char_pointer( - signs: &String, -) -> anyhow::Result<*const c_char, anyhow::Error> { - Ok(CString::new((&signs).as_bytes())?.as_ptr()) -} diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp index 89e9c87ad..2e1062a6e 100644 --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -1,102 +1,102 @@ #include "AddAttachmentsUtility.h" #include "blob_client/src/lib.rs.h" #include #include "BackupItem.h" #include "Constants.h" #include "DatabaseManager.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { grpc::Status AddAttachmentsUtility::processRequest( const backup::AddAttachmentsRequest *request) { grpc::Status status = grpc::Status::OK; std::string userID = request->userid(); std::string backupID = request->backupid(); std::string logID = request->logid(); const std::string holders = request->holders(); try { if (userID.empty()) { throw std::runtime_error("user id required but not provided"); } if (backupID.empty()) { throw std::runtime_error("backup id required but not provided"); } if (holders.empty()) { throw std::runtime_error("holders required but not provided"); } if (logID.empty()) { // add these attachments to backup std::shared_ptr backupItem = database::DatabaseManager::getInstance().findBackupItem( userID, backupID); backupItem->addAttachmentHolders(holders); database::DatabaseManager::getInstance().putBackupItem(*backupItem); } else { // add these attachments to log std::shared_ptr logItem = database::DatabaseManager::getInstance().findLogItem(backupID, logID); logItem->addAttachmentHolders(holders); if (!logItem->getPersistedInBlob() && database::LogItem::getItemSize(logItem.get()) > LOG_DATA_SIZE_DATABASE_LIMIT) { bool old = logItem->getPersistedInBlob(); logItem = this->moveToS3(logItem); } database::DatabaseManager::getInstance().putLogItem(*logItem); } } catch (std::exception &e) { LOG(ERROR) << e.what(); status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return status; } std::shared_ptr AddAttachmentsUtility::moveToS3(std::shared_ptr logItem) { std::string holder = tools::generateHolder( logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID()); std::string data = std::move(logItem->getValue()); std::shared_ptr newLogItem = std::make_shared( logItem->getBackupID(), logItem->getLogID(), true, holder, logItem->getAttachmentHolders(), logItem->getDataHash()); // put into S3 std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - put_client_initialize_cxx(holder.c_str()); + put_client_initialize_cxx(rust::String(holder)); put_client_write_cxx( - holder.c_str(), + rust::String(holder), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), holder.c_str()); - put_client_blocking_read_cxx(holder.c_str()); + put_client_blocking_read_cxx(rust::String(holder)); put_client_write_cxx( - holder.c_str(), + rust::String(holder), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), newLogItem->getDataHash().c_str()); - rust::String responseStr = put_client_blocking_read_cxx(holder.c_str()); + rust::String responseStr = put_client_blocking_read_cxx(rust::String(holder)); // data exists? if (!(bool)tools::charPtrToInt(responseStr.c_str())) { put_client_write_cxx( - holder.c_str(), + rust::String(holder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(data).c_str()); - put_client_blocking_read_cxx(holder.c_str()); + put_client_blocking_read_cxx(rust::String(holder)); } - put_client_terminate_cxx(holder.c_str()); + put_client_terminate_cxx(rust::String(holder)); return newLogItem; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index 5114fa18f..46c9fce22 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,124 +1,124 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include "blob_client/src/lib.rs.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { if (this->deviceID.empty()) { throw std::runtime_error( "trying to generate a backup ID with an empty device ID"); } return this->deviceID + std::to_string(tools::getCurrentTimestamp()); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::DEVICE_ID; return nullptr; } case State::DEVICE_ID: { if (!request.has_deviceid()) { throw std::runtime_error("device id expected but not received"); } this->deviceID = request.deviceid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; this->backupID = this->generateBackupID(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) != nullptr) { throw std::runtime_error( "Backup with id [" + this->backupID + "] for user [" + this->userID + "] already exists, creation aborted"); } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - put_client_initialize_cxx(this->holder.c_str()); + put_client_initialize_cxx(rust::String(this->holder)); put_client_write_cxx( - this->holder.c_str(), + rust::String(this->holder), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->holder.c_str()); - put_client_blocking_read_cxx(this->holder.c_str()); + put_client_blocking_read_cxx(rust::String(this->holder)); put_client_write_cxx( - this->holder.c_str(), + rust::String(this->holder), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->dataHash.c_str()); rust::String responseStr = - put_client_blocking_read_cxx(this->holder.c_str()); + put_client_blocking_read_cxx(rust::String(this->holder)); // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique( grpc::Status::OK, true); } return nullptr; } case State::DATA_CHUNKS: { if (request.mutable_newcompactionchunk()->empty()) { return std::make_unique(grpc::Status::OK); } put_client_write_cxx( - this->holder.c_str(), + rust::String(this->holder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::string(std::move(*request.mutable_newcompactionchunk())) .c_str()); - put_client_blocking_read_cxx(this->holder.c_str()); + put_client_blocking_read_cxx(rust::String(this->holder)); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(this->holder.c_str()); + put_client_terminate_cxx(rust::String(this->holder)); // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, tools::getCurrentTimestamp(), tools::generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp index 18dd5bcf1..ef4dd48a8 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,216 +1,216 @@ #include "PullBackupReactor.h" #include "blob_client/src/lib.rs.h" #include "DatabaseManager.h" namespace comm { namespace network { namespace reactor { PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - get_client_initialize_cxx(holder.c_str()); + get_client_initialize_cxx(rust::String(holder)); this->clientInitialized = true; } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); response->set_attachmentholders(""); response->set_backupid(""); size_t extraBytesNeeded = 0; if (this->state == State::COMPACTION) { response->set_backupid(this->backupItem->getBackupID()); extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); extraBytesNeeded += this->backupItem->getBackupID().size(); if (!this->clientInitialized) { extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit) { rust::Vec responseVec = get_client_blocking_read_cxx( this->backupItem->getCompactionHolder().c_str()); dataChunk = (responseVec.empty()) ? "" : std::string(reinterpret_cast(responseVec.data())); dataChunk.resize(responseVec.size()); } if (!dataChunk.empty() || this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) { dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_compactionchunk(dataChunk); return nullptr; } this->state = State::LOGS; if (!this->internalBuffer.empty()) { response->set_compactionchunk(std::move(this->internalBuffer)); return nullptr; } } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { // this means that there are no logs at all so we just terminate with // the compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { if (!this->internalBuffer.empty()) { response->set_logid(this->previousLogID); response->set_logchunk(std::move(this->internalBuffer)); return nullptr; } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { // we went out of the scope of the logs collection, this should never // happen and should be perceived as an error throw std::runtime_error("log index out of bound"); } // this means that we're not reading anything between invocations of // writeResponse // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); extraBytesNeeded += this->currentLog->getLogID().size(); response->set_attachmentholders(this->currentLog->getAttachmentHolders()); extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor // and proceed this->initializeGetReactor(this->currentLog->getValue()); this->currentLogHolder = std::make_unique(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection response->set_logid(this->currentLog->getLogID()); response->set_logchunk(this->currentLog->getValue()); this->nextLog(); return nullptr; } } else { extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); extraBytesNeeded += this->currentLog->getLogID().size(); } response->set_backupid(this->currentLog->getBackupID()); response->set_logid(this->currentLog->getLogID()); // we want to read the chunks from the blob through the get client until // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { rust::Vec responseVec = get_client_blocking_read_cxx(this->currentLogHolder->c_str()); dataChunk = (responseVec.empty()) ? "" : std::string(reinterpret_cast(responseVec.data())); dataChunk.resize(responseVec.size()); } this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); // if we get an empty chunk, we reset the currentLog so we can read the // next one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { this->nextLog(); } else { response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } void PullBackupReactor::nextLog() { ++this->currentLogIndex; this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; this->endOfQueue = false; if (this->currentLogHolder != nullptr) { get_client_terminate_cxx(this->currentLogHolder->c_str()); this->currentLogHolder = nullptr; } } std::string PullBackupReactor::prepareDataChunkWithPadding( const std::string &dataChunk, size_t padding) { if (dataChunk.size() > this->chunkLimit) { throw std::runtime_error(std::string( "received data chunk bigger than the chunk limit: " + std::to_string(dataChunk.size()) + "/" + std::to_string(this->chunkLimit))); } std::string chunk = std::move(this->internalBuffer) + dataChunk; const size_t realSize = chunk.size() + padding; if (realSize <= this->chunkLimit) { return chunk; } const size_t bytesToStash = realSize - this->chunkLimit; this->internalBuffer = std::string(chunk.end() - bytesToStash, chunk.end()); chunk.resize(chunk.size() - bytesToStash); if (chunk.size() > this->chunkLimit) { throw std::runtime_error("new data chunk incorrectly calculated"); } return chunk; } void PullBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); get_client_terminate_cxx(this->backupItem->getCompactionHolder().c_str()); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index 49a69cb65..144e5c066 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,178 +1,178 @@ #include "SendLogReactor.h" #include "blob_client/src/lib.rs.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { bool storedInBlob = this->persistenceMethod == PersistenceMethod::BLOB; database::LogItem logItem( this->backupID, this->logID, storedInBlob, storedInBlob ? this->blobHolder : this->value, {}, this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { throw std::runtime_error( "trying to put into the database an item with size " + std::to_string(database::LogItem::getItemSize(&logItem)) + " that exceeds the limit " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT)); } database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { return backupID + tools::ID_SEPARATOR + tools::generateUUID(); } void SendLogReactor::initializePutClient() { if (this->blobHolder.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty blob holder"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - put_client_initialize_cxx(this->blobHolder.c_str()); + put_client_initialize_cxx(rust::String(this->blobHolder)); } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) == nullptr) { throw std::runtime_error( "trying to send log for a non-existent backup"); } this->logID = this->generateLogID(this->backupID); this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } if (request.mutable_logdata()->size() == 0) { return std::make_unique(grpc::Status::OK); } if (this->persistenceMethod == PersistenceMethod::DB) { throw std::runtime_error( "please do not send multiple tiny chunks (less than " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + "), merge them into bigger parts instead"); } if (this->persistenceMethod == PersistenceMethod::BLOB) { put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), request.mutable_logdata()->c_str()); put_client_blocking_read_cxx(this->blobHolder.c_str()); return nullptr; } this->value += std::move(*request.mutable_logdata()); database::LogItem logItem = database::LogItem( this->backupID, this->logID, true, this->value, "", this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { this->persistenceMethod = PersistenceMethod::BLOB; this->blobHolder = tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutClient(); put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->blobHolder.c_str()); put_client_blocking_read_cxx(this->blobHolder.c_str()); put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->hash.c_str()); rust::String responseStr = put_client_blocking_read_cxx(this->blobHolder.c_str()); // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique(grpc::Status::OK); } put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(this->value).c_str()); put_client_blocking_read_cxx(this->blobHolder.c_str()); this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(this->blobHolder.c_str()); + put_client_terminate_cxx(rust::String(this->blobHolder)); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } if (this->persistenceMethod != PersistenceMethod::BLOB && this->persistenceMethod != PersistenceMethod::DB) { throw std::runtime_error("Invalid persistence method detected"); } if (this->persistenceMethod == PersistenceMethod::DB) { this->storeInDatabase(); return; } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement } } // namespace reactor } // namespace network } // namespace comm