diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs index b3cba635e..c6e9b7868 100644 --- a/services/backup/src/blob/downloader.rs +++ b/services/backup/src/blob/downloader.rs @@ -1,79 +1,82 @@ use anyhow::{bail, Result}; use tokio::{ sync::mpsc::{self, Receiver}, task::JoinHandle, }; -use tracing::{instrument, Instrument}; +use tracing::{error, instrument, Instrument}; use super::{proto, BlobClient}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; /// The BlobDownloader instance is a handle holder of a Tokio task running the /// actual blob client instance. The communication is done via a MPSC channel /// and is one-sided - the data is transmitted from the client task to the /// caller. Blob chunks received in response stream are waiting /// for the channel to have capacity, so it is recommended to read them quickly /// to make room for more. /// The client task can be stopped and awaited for result via the `terminate()` /// method. pub struct BlobDownloader { rx: Receiver>, handle: JoinHandle>, } impl BlobDownloader { /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "blob_downloader")] - pub async fn start( - holder: String, - mut blob_client: BlobClient, - ) -> Result { + pub fn start(holder: String, mut blob_client: BlobClient) -> Self { let (blob_res_tx, blob_res_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client_thread = async move { - let response = blob_client.get(proto::GetRequest { holder }).await?; + let response = blob_client + .get(proto::GetRequest { holder }) + .await + .map_err(|err| { + error!("Get request failed: {:?}", err); + err + })?; let mut inner_response = response.into_inner(); loop { match inner_response.message().await? { Some(data) => { let data: Vec = data.data_chunk; if let Err(err) = blob_res_tx.send(data).await { bail!(err); } } // Response stream was closed None => break, } } Ok(()) }; let handle = tokio::spawn(client_thread.in_current_span()); - Ok(BlobDownloader { + BlobDownloader { rx: blob_res_rx, handle, - }) + } } /// Receives the next chunk of blob data if ready or sleeps /// until the data is available. /// /// Returns `None` when the transmission is finished, but this doesn't /// determine if it was successful. After receiving `None`, the client /// should be consumed by calling [`GetClient::terminate`] to handle /// possible errors. pub async fn next_chunk(&mut self) -> Option> { self.rx.recv().await } /// Stops receiving messages and awaits the client thread to exit /// and returns its status. pub async fn terminate(mut self) -> Result<()> { self.rx.close(); self.handle.await? } } diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs index b11f9ef44..28a265543 100644 --- a/services/backup/src/blob/uploader.rs +++ b/services/backup/src/blob/uploader.rs @@ -1,170 +1,168 @@ use anyhow::{anyhow, bail, Result}; use tokio::{ sync::mpsc::{self, Receiver, Sender}, task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{error, instrument, Instrument}; use super::{proto, BlobClient}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; pub struct BlobUploader { req_tx: Sender, res_rx: Receiver, handle: JoinHandle>, } /// The BlobUploader instance is a handle holder of a Tokio task running the /// actual blob client instance. The communication is done via two MPSC /// channels - one sending requests to the client task, and another for sending /// responses back to the caller. These messages should go in pairs /// - one request for one response. /// The client task can be stopped and awaited for result via the `terminate()` /// method. impl BlobUploader { /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "blob_uploader")] - pub async fn start(mut blob_client: BlobClient) -> Result { + pub fn start(mut blob_client: BlobClient) -> Self { let (blob_req_tx, blob_req_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let (blob_res_tx, blob_res_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client_thread = async move { match blob_client .put(tonic::Request::new(ReceiverStream::new(blob_req_rx))) .await { Ok(response) => { let mut response_stream = response.into_inner(); loop { match response_stream.message().await? { Some(response_message) => { // warning: this will produce an error if there's more unread // responses than MPSC_CHANNEL_BUFFER_CAPACITY // so you should always read the response MPSC channel // right after sending a request to dequeue the responses // and make room for more. // The PutClient::put() function should take care of that if let Err(err) = blob_res_tx.try_send(response_message) { bail!(err); } } // Response stream was closed None => break, } } } Err(err) => { + error!("Put request failed: {:?}", err); bail!(err.to_string()); } }; Ok(()) }; let handle = tokio::spawn(client_thread.in_current_span()); - Ok(BlobUploader { + BlobUploader { req_tx: blob_req_tx, res_rx: blob_res_rx, handle, - }) + } } /// Sends a [`PutRequest`] to the stream and waits for blob service /// to send a response. After all data is sent, the [`PutClient::terminate`] /// should be called to end the transmission and handle possible errors. pub async fn put(&mut self, req: PutRequest) -> Result { self.req_tx.try_send(req)?; self .res_rx .recv() .await .ok_or_else(|| anyhow!("Blob client channel closed")) } /// Convenience wrapper for /// ``` /// BlobClient::put(PutRequest { /// data: Some(PutRequestData::DataChunk(data)) /// }) /// ``` pub async fn put_data(&mut self, data: Vec) -> Result { self .put(PutRequest { data: Some(PutRequestData::DataChunk(data)), }) .await } /// Closes the connection and awaits the blob client task to finish. pub async fn terminate(self) -> Result<()> { drop(self.req_tx); let thread_result = self.handle.await?; thread_result } } /// Starts a put client instance. Fulfills request with blob hash and holder. /// /// `None` is returned if given `holder` already exists. /// /// ## Example /// ``` /// if let Some(mut client) = /// start_simple_uploader("my_holder", "my_hash").await? { /// let my_data = vec![1,2,3,4]; /// let _ = client.put_data(my_data).await; /// /// let status = client.terminate().await; /// } /// ``` pub async fn start_simple_uploader( holder: &str, blob_hash: &str, blob_client: BlobClient, ) -> Result, Status> { - // start client - let mut uploader = BlobUploader::start(blob_client).await.map_err(|err| { - error!("Failed to instantiate uploader: {:?}", err); - Status::aborted("Internal error") - })?; + // start upload request + let mut uploader = BlobUploader::start(blob_client); // send holder uploader .put(PutRequest { data: Some(PutRequestData::Holder(holder.to_string())), }) .await .map_err(|err| { error!("Failed to set blob holder: {:?}", err); Status::aborted("Internal error") })?; // send hash let PutResponse { data_exists } = uploader .put(PutRequest { data: Some(PutRequestData::BlobHash(blob_hash.to_string())), }) .await .map_err(|err| { error!("Failed to set blob hash: {:?}", err); Status::aborted("Internal error") })?; // Blob with given holder already exists, nothing to do if data_exists { // the connection is already terminated by server, // but it's good to await it anyway uploader.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; return Ok(None); } Ok(Some(uploader)) } diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs index 5126dbe5a..67835cce9 100644 --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -1,335 +1,330 @@ use async_stream::try_stream; use tokio_stream::{Stream, StreamExt}; use tonic::Status; use tracing::{debug, error, trace, warn}; use tracing_futures::Instrument; use super::handle_db_error; use super::proto::{self, PullBackupResponse}; use crate::{ blob::{BlobClient, BlobDownloader}, constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_LOG_ID, }, database::{BackupItem, DatabaseClient, LogItem}, }; pub struct PullBackupHandler { blob_client: BlobClient, backup_item: BackupItem, logs: Vec, } impl PullBackupHandler { pub async fn new( db: &DatabaseClient, blob_client: &BlobClient, request: proto::PullBackupRequest, ) -> Result { let proto::PullBackupRequest { user_id, backup_id } = request; let backup_item = db .find_backup_item(&user_id, &backup_id) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Backup item not found"); Status::not_found("Backup item not found") })?; let backup_id = backup_item.backup_id.as_str(); let logs = db .find_log_items_for_backup(backup_id) .await .map_err(handle_db_error)?; Ok(PullBackupHandler { backup_item, logs, blob_client: blob_client.clone(), }) } /// Consumes the handler and provides a response `Stream`. The stream will /// produce the following in order: /// - Backup compaction data chunks /// - Backup logs /// - Whole log, if stored in db /// - Log chunks, if stored in blob pub fn into_response_stream( self, ) -> impl Stream> { use proto::pull_backup_response::*; try_stream! { debug!("Pulling backup..."); { let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone()); tokio::pin!(compaction_stream); while let Some(response) = compaction_stream.try_next().await? { yield response; } } trace!("Backup data pull complete."); if self.logs.is_empty() { debug!("No logs to pull. Finishing"); return; } debug!("Pulling logs..."); for log in self.logs { trace!("Pulling log ID={}", &log.log_id); let span = tracing::trace_span!("log", log_id = &log.log_id); if log.persisted_in_blob { trace!(parent: &span, "Log persisted in blob"); let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span); tokio::pin!(log_data_stream); while let Some(response) = log_data_stream.try_next().await? { yield response; } } else { trace!(parent: &span, "Log persisted in database"); yield proto::PullBackupResponse { attachment_holders: Some(log.attachment_holders), id: Some(Id::LogId(log.log_id)), data: Some(Data::LogChunk(log.value.into_bytes())), }; } } trace!("Pulled all logs, done"); } } } /// Downloads a blob-stored [`BlobStoredItem`] and streams its content into /// stream of [`PullBackupResponse`] objects, handles gRPC message size details. fn data_stream( item: &Item, blob_client: BlobClient, ) -> impl Stream> + '_ where Item: BlobStoredItem, { try_stream! { let mut buffer = ResponseBuffer::default(); let mut downloader = - BlobDownloader::start(item.get_holder().to_string(), blob_client).await.map_err(|err| { - error!( - "Failed to start blob downloader: {:?}", err - ); - Status::aborted("Internal error") - })?; + BlobDownloader::start(item.get_holder().to_string(), blob_client); let mut is_first_chunk = true; loop { if !buffer.is_saturated() { if let Some(data) = downloader.next_chunk().await { buffer.put(data); } } if buffer.is_empty() { break; } // get data chunk, shortened by length of metadata let padding = item.metadata_size(is_first_chunk); let chunk = buffer.get_chunk(padding); trace!( with_attachments = is_first_chunk, data_size = chunk.len(), "Sending data chunk" ); yield item.to_response(chunk, is_first_chunk); is_first_chunk = false; } downloader.terminate().await.map_err(|err| { error!("Blob downloader failed: {:?}", err); Status::aborted("Internal error") })?; } } /// Represents downloadable item stored in Blob service trait BlobStoredItem { // Blob holder representing this item fn get_holder(&self) -> &str; /// Generates a gRPC response for given `data_chunk`. /// The response may be in extended version, with `include_extra_info`, /// ususally sent with first chunk fn to_response( &self, data_chunk: Vec, include_extra_info: bool, ) -> proto::PullBackupResponse; /// Size in bytes of non-data fields contained in response message. fn metadata_size(&self, include_extra_info: bool) -> usize; } impl BlobStoredItem for BackupItem { fn get_holder(&self) -> &str { &self.compaction_holder } fn to_response( &self, data_chunk: Vec, include_extra_info: bool, ) -> proto::PullBackupResponse { use proto::pull_backup_response::*; let attachment_holders = if include_extra_info { Some(self.attachment_holders.clone()) } else { None }; proto::PullBackupResponse { id: Some(Id::BackupId(self.backup_id.clone())), data: Some(Data::CompactionChunk(data_chunk)), attachment_holders, } } fn metadata_size(&self, include_extra_info: bool) -> usize { let mut extra_bytes: usize = 0; extra_bytes += BACKUP_TABLE_FIELD_BACKUP_ID.as_bytes().len(); extra_bytes += self.backup_id.as_bytes().len(); if include_extra_info { extra_bytes += BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len(); extra_bytes += self.attachment_holders.as_bytes().len(); } extra_bytes } } impl BlobStoredItem for LogItem { fn get_holder(&self) -> &str { &self.value } fn to_response( &self, data_chunk: Vec, include_extra_info: bool, ) -> proto::PullBackupResponse { use proto::pull_backup_response::*; let attachment_holders = if include_extra_info { Some(self.attachment_holders.clone()) } else { None }; proto::PullBackupResponse { id: Some(Id::LogId(self.log_id.clone())), data: Some(Data::LogChunk(data_chunk)), attachment_holders, } } fn metadata_size(&self, include_extra_info: bool) -> usize { let mut extra_bytes: usize = 0; extra_bytes += LOG_TABLE_FIELD_LOG_ID.as_bytes().len(); extra_bytes += self.log_id.as_bytes().len(); if include_extra_info { extra_bytes += LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len(); extra_bytes += self.attachment_holders.as_bytes().len(); } extra_bytes } } /// A utility structure that buffers downloaded data and allows to retrieve it /// as chunks of arbitrary size, not greater than provided `limit`. struct ResponseBuffer { buf: Vec, limit: usize, } impl Default for ResponseBuffer { /// Buffer size defaults to max usable gRPC message size fn default() -> Self { ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE) } } impl ResponseBuffer { pub fn new(limit: usize) -> Self { ResponseBuffer { buf: Vec::new(), limit, } } pub fn put(&mut self, data: Vec) { if data.len() > self.limit { warn!("Data saved to buffer is larger than chunk limit."); } self.buf.extend(data); } /// Gets chunk of size `limit - padding` and leaves remainder in buffer pub fn get_chunk(&mut self, padding: usize) -> Vec { let mut chunk = std::mem::take(&mut self.buf); let target_size = self.limit - padding; if chunk.len() > target_size { // after this operation, chunk=0..target_size, self.buf=target_size..end self.buf = chunk.split_off(target_size); } return chunk; } /// Does buffer length exceed given limit pub fn is_saturated(&self) -> bool { self.buf.len() >= self.limit } pub fn is_empty(&self) -> bool { self.buf.is_empty() } } #[cfg(test)] mod tests { use super::*; const LIMIT: usize = 100; #[test] fn test_response_buffer() { let mut buffer = ResponseBuffer::new(LIMIT); assert_eq!(buffer.is_empty(), true); // put 80 bytes of data buffer.put(vec![0u8; 80]); assert_eq!(buffer.is_empty(), false); assert_eq!(buffer.is_saturated(), false); // put next 80 bytes, should be saturated as 160 > 100 buffer.put(vec![0u8; 80]); let buf_size = buffer.buf.len(); assert_eq!(buffer.is_saturated(), true); assert_eq!(buf_size, 160); // get one chunk let padding: usize = 10; let expected_chunk_size = LIMIT - padding; let chunk = buffer.get_chunk(padding); assert_eq!(chunk.len(), expected_chunk_size); // 90 // buffer should not be saturated now (160 - 90 < 100) let remaining_buf_size = buffer.buf.len(); assert_eq!(remaining_buf_size, buf_size - expected_chunk_size); assert_eq!(buffer.is_saturated(), false); // get last chunk let chunk = buffer.get_chunk(padding); assert_eq!(chunk.len(), remaining_buf_size); assert_eq!(buffer.is_empty(), true); } }