diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs index 1740ef2b5..17ceb67ab 100644 --- a/services/backup/src/main.rs +++ b/services/backup/src/main.rs @@ -1,52 +1,57 @@ use anyhow::Result; use std::net::SocketAddr; use tonic::transport::Server; use tracing::{info, Level}; use tracing_subscriber::EnvFilter; +use crate::blob::BlobClient; use crate::service::{BackupServiceServer, MyBackupService}; pub mod blob; pub mod config; pub mod constants; pub mod database; pub mod service; pub mod utils; // re-export this to be available as crate::CONFIG pub use config::CONFIG; fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } -async fn run_grpc_server(db: database::DatabaseClient) -> Result<()> { +async fn run_grpc_server( + db: database::DatabaseClient, + blob_client: BlobClient, +) -> Result<()> { let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?; - let backup_service = MyBackupService::new(db); + let backup_service = MyBackupService::new(db, blob_client); info!("Starting gRPC server listening at {}", addr.to_string()); Server::builder() .add_service(BackupServiceServer::new(backup_service)) .serve(addr) .await?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { config::parse_cmdline_args(); configure_logging()?; let aws_config = config::load_aws_config().await; let db = database::DatabaseClient::new(&aws_config); + let blob_client = blob::init_blob_client(); - run_grpc_server(db).await + run_grpc_server(db, blob_client).await } diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs index fa038cff5..64bab98ff 100644 --- a/services/backup/src/service/handlers/add_attachments.rs +++ b/services/backup/src/service/handlers/add_attachments.rs @@ -1,199 +1,204 @@ use tonic::Status; use tracing::debug; use tracing::error; use super::handle_db_error; use super::proto; use crate::{ + blob::BlobClient, constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, }; pub async fn handle_add_attachments( db: &DatabaseClient, + blob_client: &BlobClient, request: proto::AddAttachmentsRequest, ) -> Result<(), Status> { let proto::AddAttachmentsRequest { user_id, backup_id, log_id, holders, } = request; if user_id.is_empty() { return Err(Status::invalid_argument( "user id required but not provided", )); } if backup_id.is_empty() { return Err(Status::invalid_argument( "backup id required but not provided", )); } if holders.is_empty() { return Err(Status::invalid_argument( "holders required but not provided", )); } if log_id.is_empty() { let backup_item_result = db .find_backup_item(&user_id, &backup_id) .await .map_err(handle_db_error)?; let mut backup_item = backup_item_result.ok_or_else(|| { debug!("Backup item not found"); Status::not_found("Backup item not found") })?; add_new_attachments(&mut backup_item.attachment_holders, &holders); db.put_backup_item(backup_item) .await .map_err(handle_db_error)?; } else { let log_item_result = db .find_log_item(&backup_id, &log_id) .await .map_err(handle_db_error)?; let mut log_item = log_item_result.ok_or_else(|| { debug!("Log item not found"); Status::not_found("Log item not found") })?; add_new_attachments(&mut log_item.attachment_holders, &holders); // log item too large for database, move it to blob-service stroage if !log_item.persisted_in_blob && log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT { debug!("Log item too large. Persisting in blob service..."); - log_item = move_to_blob(log_item).await?; + log_item = move_to_blob(log_item, blob_client).await?; } db.put_log_item(log_item).await.map_err(handle_db_error)?; } Ok(()) } -async fn move_to_blob(log_item: LogItem) -> Result { +async fn move_to_blob( + log_item: LogItem, + blob_client: &BlobClient, +) -> Result { let holder = crate::utils::generate_blob_holder( &log_item.data_hash, &log_item.backup_id, Some(&log_item.log_id), ); if let Some(mut uploader) = crate::blob::start_simple_uploader(&holder, &log_item.data_hash).await? { let blob_chunk = log_item.value.into_bytes(); uploader.put_data(blob_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; uploader.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; } else { debug!("Blob holder for log ID={} already exists", &log_item.log_id); } Ok(LogItem { persisted_in_blob: true, value: holder, ..log_item }) } /// Modifies the [`current_holders_str`] by appending attachment holders /// contained in [`new_holders`]. Removes duplicates. Both arguments /// are expected to be [`ATTACHMENT_HOLDER_SEPARATOR`] separated strings. fn add_new_attachments(current_holders_str: &mut String, new_holders: &str) { let mut current_holders = parse_attachment_holders(current_holders_str); let new_holders = parse_attachment_holders(new_holders); current_holders.extend(new_holders); *current_holders_str = current_holders .into_iter() .collect::>() .join(ATTACHMENT_HOLDER_SEPARATOR); } /// Parses an [`ATTACHMENT_HOLDER_SEPARATOR`] separated string into a `HashSet` /// of attachment holder string slices. fn parse_attachment_holders( holders_str: &str, ) -> std::collections::HashSet<&str> { holders_str .split(ATTACHMENT_HOLDER_SEPARATOR) .filter(|holder| !holder.is_empty()) .collect() } #[cfg(test)] mod tests { use super::*; use std::collections::HashSet; #[test] fn test_parse_attachments() { let holders = "h1;h2;h3"; let expected = HashSet::from(["h1", "h2", "h3"]); assert_eq!(parse_attachment_holders(holders), expected); } #[test] fn test_empty_attachments() { let actual = parse_attachment_holders(""); let expected = HashSet::new(); assert_eq!(actual, expected); } #[test] fn test_add_attachments() { let mut current_holders = "holder1;holder2".to_string(); let new_holders = "holder3;holder4"; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder1", "holder2", "holder3", "holder4"]) ); } #[test] fn test_add_to_empty() { let mut current_holders = String::new(); let new_holders = "holder3;holder4"; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder3", "holder4"]) ); } #[test] fn test_add_none() { let mut current_holders = "holder1;holder2".to_string(); let new_holders = ""; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder1", "holder2"]) ); } #[test] fn test_remove_duplicates() { let mut current_holders = "holder1;holder2".to_string(); let new_holders = "holder2;holder3"; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder1", "holder2", "holder3"]) ); } } diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs index fb154bff5..1fcc7371a 100644 --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -1,230 +1,232 @@ use tonic::Status; use tracing::{debug, error, trace, warn}; use crate::{ - blob::{start_simple_uploader, BlobUploader}, + blob::{start_simple_uploader, BlobClient, BlobUploader}, database::{BackupItem, DatabaseClient}, service::proto, }; use super::handle_db_error; type CreateBackupResult = Result; enum HandlerState { /// Initial state. Handler is receiving non-data inputs ReceivingParams, /// Handler is receiving data chunks ReceivingData { uploader: BlobUploader }, /// A special case when Blob service claims that a blob with given /// [`CreateBackupHandler::data_hash`] already exists DataAlreadyExists, } pub struct CreateBackupHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, device_id: Option, key_entropy: Option>, data_hash: Option, // client instances db: DatabaseClient, + blob_client: BlobClient, // internal state state: HandlerState, backup_id: String, holder: Option, } impl CreateBackupHandler { - pub fn new(db: &DatabaseClient) -> Self { + pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self { CreateBackupHandler { should_close_stream: false, user_id: None, device_id: None, key_entropy: None, data_hash: None, - db: db.clone(), + db, + blob_client, state: HandlerState::ReceivingParams, backup_id: String::new(), holder: None, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> CreateBackupResult { if self.user_id.is_some() { warn!("user ID already provided"); return Err(Status::invalid_argument("User ID already provided")); } self.user_id = Some(user_id); self.handle_internal().await } pub async fn handle_device_id( &mut self, device_id: String, ) -> CreateBackupResult { if self.device_id.is_some() { warn!("Device ID already provided"); return Err(Status::invalid_argument("Device ID already provided")); } tracing::Span::current().record("device_id", &device_id); self.device_id = Some(device_id); self.handle_internal().await } pub async fn handle_key_entropy( &mut self, key_entropy: Vec, ) -> CreateBackupResult { if self.key_entropy.is_some() { warn!("Key entropy already provided"); return Err(Status::invalid_argument("Key entropy already provided")); } self.key_entropy = Some(key_entropy); self.handle_internal().await } pub async fn handle_data_hash( &mut self, data_hash: Vec, ) -> CreateBackupResult { if self.data_hash.is_some() { warn!("Data hash already provided"); return Err(Status::invalid_argument("Data hash already provided")); } let hash_str = String::from_utf8(data_hash).map_err(|err| { error!("Failed to convert data_hash into string: {:?}", err); Status::aborted("Unexpected error") })?; tracing::Span::current().record("data_hash", &hash_str); self.data_hash = Some(hash_str); self.handle_internal().await } pub async fn handle_data_chunk( &mut self, data_chunk: Vec, ) -> CreateBackupResult { let HandlerState::ReceivingData { ref mut uploader } = self.state else { self.should_close_stream = true; error!("Data chunk sent before other inputs"); return Err(Status::invalid_argument( "Data chunk sent before other inputs", )); }; // empty chunk ends transmission if data_chunk.is_empty() { self.should_close_stream = true; return Ok(proto::CreateNewBackupResponse { backup_id: self.backup_id.clone(), }); } trace!("Received {} bytes of data", data_chunk.len()); uploader.put_data(data_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; Ok(proto::CreateNewBackupResponse { // actual Backup ID should be sent only once, the time it is generated // see handle_internal() backup_id: String::new(), }) } /// This function should be called after the input stream is finished. pub async fn finish(self) -> Result<(), Status> { match self.state { HandlerState::ReceivingParams => { // client probably aborted early trace!("Nothing to store in database. Finishing early"); return Ok(()); } HandlerState::ReceivingData { uploader } => { uploader.terminate().await.map_err(|err| { error!("Uploader task closed with error: {:?}", err); Status::aborted("Internal error") })?; } HandlerState::DataAlreadyExists => (), } let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else { error!("Holder / UserID absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; if self.backup_id.is_empty() { error!("Backup ID was not generated. This should never happen!"); return Err(Status::failed_precondition("Internal error")); } let backup_item = BackupItem::new(user_id, self.backup_id, holder); self .db .put_backup_item(backup_item) .await .map_err(handle_db_error)?; Ok(()) } // internal param handler helper async fn handle_internal(&mut self) -> CreateBackupResult { if !matches!(self.state, HandlerState::ReceivingParams) { error!("CreateBackupHandler already received all non-data params."); return Err(Status::failed_precondition("Backup data chunk expected")); } // all non-data inputs must be set before receiving backup data chunks let (Some(data_hash), Some(device_id), Some(_), Some(_)) = ( self.data_hash.as_ref(), self.device_id.as_ref(), self.user_id.as_ref(), self.key_entropy.as_ref(), ) else { // return empty backup ID when inputs are incomplete return Ok(proto::CreateNewBackupResponse { backup_id: "".to_string(), }); }; let backup_id = generate_backup_id(device_id); let holder = crate::utils::generate_blob_holder(data_hash, &backup_id, None); self.backup_id = backup_id.clone(); self.holder = Some(holder.clone()); tracing::Span::current().record("backup_id", &backup_id); tracing::Span::current().record("blob_holder", &holder); match start_simple_uploader(&holder, data_hash).await? { Some(uploader) => { self.state = HandlerState::ReceivingData { uploader }; trace!("Everything prepared, waiting for data..."); } None => { // Blob with given data_hash already exists debug!("Blob already exists, finishing"); self.should_close_stream = true; self.state = HandlerState::DataAlreadyExists; } }; Ok(proto::CreateNewBackupResponse { backup_id }) } } /// Generates ID for a new backup fn generate_backup_id(device_id: &str) -> String { format!( "{device_id}_{timestamp}", device_id = device_id, timestamp = chrono::Utc::now().timestamp_millis() ) } diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs index 10e645360..550edee89 100644 --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -1,328 +1,335 @@ use async_stream::try_stream; use tokio_stream::{Stream, StreamExt}; use tonic::Status; use tracing::{debug, error, trace, warn}; use tracing_futures::Instrument; use super::handle_db_error; use super::proto::{self, PullBackupResponse}; use crate::{ - blob::BlobDownloader, + blob::{BlobClient, BlobDownloader}, constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_LOG_ID, }, database::{BackupItem, DatabaseClient, LogItem}, }; pub struct PullBackupHandler { + blob_client: BlobClient, backup_item: BackupItem, logs: Vec, } impl PullBackupHandler { pub async fn new( db: &DatabaseClient, + blob_client: &BlobClient, request: proto::PullBackupRequest, ) -> Result { let proto::PullBackupRequest { user_id, backup_id } = request; let backup_item = db .find_backup_item(&user_id, &backup_id) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Backup item not found"); Status::not_found("Backup item not found") })?; let backup_id = backup_item.backup_id.as_str(); let logs = db .find_log_items_for_backup(backup_id) .await .map_err(handle_db_error)?; - Ok(PullBackupHandler { backup_item, logs }) + Ok(PullBackupHandler { + backup_item, + logs, + blob_client: blob_client.clone(), + }) } /// Consumes the handler and provides a response `Stream`. The stream will /// produce the following in order: /// - Backup compaction data chunks /// - Backup logs /// - Whole log, if stored in db /// - Log chunks, if stored in blob pub fn into_response_stream( self, ) -> impl Stream> { use proto::pull_backup_response::*; try_stream! { debug!("Pulling backup..."); { - let compaction_stream = data_stream(&self.backup_item); + let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone()); tokio::pin!(compaction_stream); while let Some(response) = compaction_stream.try_next().await? { yield response; } } trace!("Backup data pull complete."); if self.logs.is_empty() { debug!("No logs to pull. Finishing"); return; } debug!("Pulling logs..."); for log in self.logs { trace!("Pulling log ID={}", &log.log_id); let span = tracing::trace_span!("log", log_id = &log.log_id); if log.persisted_in_blob { trace!(parent: &span, "Log persisted in blob"); - let log_data_stream = data_stream(&log).instrument(span); + let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span); tokio::pin!(log_data_stream); while let Some(response) = log_data_stream.try_next().await? { yield response; } } else { trace!(parent: &span, "Log persisted in database"); yield proto::PullBackupResponse { attachment_holders: Some(log.attachment_holders), id: Some(Id::LogId(log.log_id)), data: Some(Data::LogChunk(log.value.into_bytes())), }; } } trace!("Pulled all logs, done"); } } } /// Downloads a blob-stored [`BlobStoredItem`] and streams its content into /// stream of [`PullBackupResponse`] objects, handles gRPC message size details. fn data_stream( item: &Item, + blob_client: BlobClient, ) -> impl Stream> + '_ where Item: BlobStoredItem, { try_stream! { let mut buffer = ResponseBuffer::default(); let mut downloader = BlobDownloader::start(item.get_holder().to_string()).await.map_err(|err| { error!( "Failed to start blob downloader: {:?}", err ); Status::aborted("Internal error") })?; let mut is_first_chunk = true; loop { if !buffer.is_saturated() { if let Some(data) = downloader.next_chunk().await { buffer.put(data); } } if buffer.is_empty() { break; } // get data chunk, shortened by length of metadata let padding = item.metadata_size(is_first_chunk); let chunk = buffer.get_chunk(padding); trace!( with_attachments = is_first_chunk, data_size = chunk.len(), "Sending data chunk" ); yield item.to_response(chunk, is_first_chunk); is_first_chunk = false; } downloader.terminate().await.map_err(|err| { error!("Blob downloader failed: {:?}", err); Status::aborted("Internal error") })?; } } /// Represents downloadable item stored in Blob service trait BlobStoredItem { // Blob holder representing this item fn get_holder(&self) -> &str; /// Generates a gRPC response for given `data_chunk`. /// The response may be in extended version, with `include_extra_info`, /// ususally sent with first chunk fn to_response( &self, data_chunk: Vec, include_extra_info: bool, ) -> proto::PullBackupResponse; /// Size in bytes of non-data fields contained in response message. fn metadata_size(&self, include_extra_info: bool) -> usize; } impl BlobStoredItem for BackupItem { fn get_holder(&self) -> &str { &self.compaction_holder } fn to_response( &self, data_chunk: Vec, include_extra_info: bool, ) -> proto::PullBackupResponse { use proto::pull_backup_response::*; let attachment_holders = if include_extra_info { Some(self.attachment_holders.clone()) } else { None }; proto::PullBackupResponse { id: Some(Id::BackupId(self.backup_id.clone())), data: Some(Data::CompactionChunk(data_chunk)), attachment_holders, } } fn metadata_size(&self, include_extra_info: bool) -> usize { let mut extra_bytes: usize = 0; extra_bytes += BACKUP_TABLE_FIELD_BACKUP_ID.as_bytes().len(); extra_bytes += self.backup_id.as_bytes().len(); if include_extra_info { extra_bytes += BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len(); extra_bytes += self.attachment_holders.as_bytes().len(); } extra_bytes } } impl BlobStoredItem for LogItem { fn get_holder(&self) -> &str { &self.value } fn to_response( &self, data_chunk: Vec, include_extra_info: bool, ) -> proto::PullBackupResponse { use proto::pull_backup_response::*; let attachment_holders = if include_extra_info { Some(self.attachment_holders.clone()) } else { None }; proto::PullBackupResponse { id: Some(Id::LogId(self.log_id.clone())), data: Some(Data::LogChunk(data_chunk)), attachment_holders, } } fn metadata_size(&self, include_extra_info: bool) -> usize { let mut extra_bytes: usize = 0; extra_bytes += LOG_TABLE_FIELD_LOG_ID.as_bytes().len(); extra_bytes += self.log_id.as_bytes().len(); if include_extra_info { extra_bytes += LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len(); extra_bytes += self.attachment_holders.as_bytes().len(); } extra_bytes } } /// A utility structure that buffers downloaded data and allows to retrieve it /// as chunks of arbitrary size, not greater than provided `limit`. struct ResponseBuffer { buf: Vec, limit: usize, } impl Default for ResponseBuffer { /// Buffer size defaults to max usable gRPC message size fn default() -> Self { ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE) } } impl ResponseBuffer { pub fn new(limit: usize) -> Self { ResponseBuffer { buf: Vec::new(), limit, } } pub fn put(&mut self, data: Vec) { if data.len() > self.limit { warn!("Data saved to buffer is larger than chunk limit."); } self.buf.extend(data); } /// Gets chunk of size `limit - padding` and leaves remainder in buffer pub fn get_chunk(&mut self, padding: usize) -> Vec { let mut chunk = std::mem::take(&mut self.buf); let target_size = self.limit - padding; if chunk.len() > target_size { // after this operation, chunk=0..target_size, self.buf=target_size..end self.buf = chunk.split_off(target_size); } return chunk; } /// Does buffer length exceed given limit pub fn is_saturated(&self) -> bool { self.buf.len() >= self.limit } pub fn is_empty(&self) -> bool { self.buf.is_empty() } } #[cfg(test)] mod tests { use super::*; const LIMIT: usize = 100; #[test] fn test_response_buffer() { let mut buffer = ResponseBuffer::new(LIMIT); assert_eq!(buffer.is_empty(), true); // put 80 bytes of data buffer.put(vec![0u8; 80]); assert_eq!(buffer.is_empty(), false); assert_eq!(buffer.is_saturated(), false); // put next 80 bytes, should be saturated as 160 > 100 buffer.put(vec![0u8; 80]); let buf_size = buffer.buf.len(); assert_eq!(buffer.is_saturated(), true); assert_eq!(buf_size, 160); // get one chunk let padding: usize = 10; let expected_chunk_size = LIMIT - padding; let chunk = buffer.get_chunk(padding); assert_eq!(chunk.len(), expected_chunk_size); // 90 // buffer should not be saturated now (160 - 90 < 100) let remaining_buf_size = buffer.buf.len(); assert_eq!(remaining_buf_size, buf_size - expected_chunk_size); assert_eq!(buffer.is_saturated(), false); // get last chunk let chunk = buffer.get_chunk(padding); assert_eq!(chunk.len(), remaining_buf_size); assert_eq!(buffer.is_empty(), true); } } diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs index b6c16537d..bdbb3dfd4 100644 --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -1,264 +1,266 @@ use tonic::Status; use tracing::{debug, error, trace, warn}; use uuid::Uuid; use super::handle_db_error; use crate::{ - blob::BlobUploader, + blob::{BlobClient, BlobUploader}, constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, service::proto::SendLogResponse, }; enum LogPersistence { /// Log entirely stored in DynamoDB database DB, /// Log contents stored with Blob service BLOB { holder: String }, } pub struct SendLogHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, backup_id: Option, log_hash: Option, // internal state log_id: Option, log_buffer: Vec, persistence_method: LogPersistence, should_receive_data: bool, // client instances db: DatabaseClient, + blob_client: BlobClient, uploader: Option, } impl SendLogHandler { - pub fn new(db: &DatabaseClient) -> Self { + pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self { SendLogHandler { db: db.clone(), + blob_client: blob_client.clone(), uploader: None, user_id: None, backup_id: None, log_hash: None, log_id: None, log_buffer: Vec::new(), persistence_method: LogPersistence::DB, should_receive_data: false, should_close_stream: false, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> Result<(), Status> { if self.user_id.is_some() { warn!("user ID already provided"); return Err(Status::invalid_argument("User ID already provided")); } self.user_id = Some(user_id); self.handle_internal().await } pub async fn handle_backup_id( &mut self, backup_id: String, ) -> Result<(), Status> { if self.backup_id.is_some() { warn!("backup ID already provided"); return Err(Status::invalid_argument("Backup ID already provided")); } tracing::Span::current().record("backup_id", &backup_id); self.backup_id = Some(backup_id); self.handle_internal().await } pub async fn handle_log_hash( &mut self, log_hash: Vec, ) -> Result<(), Status> { if self.log_hash.is_some() { warn!("Log hash already provided"); return Err(Status::invalid_argument("Log hash already provided")); } let hash_str = String::from_utf8(log_hash).map_err(|err| { error!("Failed to convert data_hash into string: {:?}", err); Status::aborted("Unexpected error") })?; tracing::Span::current().record("log_hash", &hash_str); self.log_hash = Some(hash_str); self.handle_internal().await } pub async fn handle_log_data( &mut self, data_chunk: Vec, ) -> Result<(), Status> { if !self.should_receive_data || self.log_id.is_none() { self.should_close_stream = true; error!("Data chunk sent before other inputs"); return Err(Status::invalid_argument( "Data chunk sent before other inputs", )); } // empty chunk ends transmission if data_chunk.is_empty() { self.should_close_stream = true; return Ok(()); } match self.persistence_method { LogPersistence::DB => { self.log_buffer.extend(data_chunk); self.ensure_size_constraints().await?; } LogPersistence::BLOB { .. } => { let Some(client) = self.uploader.as_mut() else { self.should_close_stream = true; error!("Put client uninitialized. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; client.put_data(data_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; } } Ok(()) } pub async fn finish(self) -> Result { if let Some(client) = self.uploader { client.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; } else { trace!("No uploader initialized. Skipping termination"); } if !self.should_receive_data { // client probably aborted early trace!("Nothing to store in database. Finishing early"); return Ok(SendLogResponse { log_checkpoint: "".to_string(), }); } let (Some(backup_id), Some(log_id), Some(data_hash)) = ( self.backup_id, self.log_id, self.log_hash ) else { error!("Log info absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; let (log_value, persisted_in_blob) = match self.persistence_method { LogPersistence::BLOB { holder } => (holder, true), LogPersistence::DB => { let contents = String::from_utf8(self.log_buffer).map_err(|err| { error!("Failed to convert log contents data into string: {:?}", err); Status::aborted("Unexpected error") })?; (contents, false) } }; let log_item = LogItem { backup_id, log_id: log_id.clone(), persisted_in_blob, value: log_value, attachment_holders: String::new(), data_hash, }; self .db .put_log_item(log_item) .await .map_err(handle_db_error)?; Ok(SendLogResponse { log_checkpoint: log_id, }) } // internal param handler helper async fn handle_internal(&mut self) -> Result<(), Status> { if self.should_receive_data { error!("SendLogHandler is already expecting data chunks"); return Err(Status::failed_precondition("Log data chunk expected")); } // all non-data inputs must be set before receiving log contents let (Some(backup_id), Some(_), Some(_)) = ( self.backup_id.as_ref(), self.user_id.as_ref(), self.log_hash.as_ref() ) else { return Ok(()); }; let log_id = generate_log_id(backup_id); tracing::Span::current().record("log_id", &log_id); self.log_id = Some(log_id); trace!("Everything prepared, waiting for data..."); self.should_receive_data = true; Ok(()) } /// Ensures log fits db size constraints. If not, it is moved to blob /// persistence async fn ensure_size_constraints(&mut self) -> Result<(), Status> { let (Some(backup_id), Some(log_id), Some(log_hash)) = ( self.backup_id.as_ref(), self.log_id.as_ref(), self.log_hash.as_ref() ) else { self.should_close_stream = true; error!("Log info absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; let log_size = LogItem::size_from_components( backup_id, log_id, log_hash, &self.log_buffer, ); if log_size > LOG_DATA_SIZE_DATABASE_LIMIT { debug!("Log too large, switching persistence to Blob"); let holder = crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); match crate::blob::start_simple_uploader(&holder, &log_hash).await? { Some(mut uploader) => { let blob_chunk = std::mem::take(&mut self.log_buffer); uploader.put_data(blob_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; self.uploader = Some(uploader); } None => { debug!("Log hash already exists"); self.should_close_stream = true; } } self.persistence_method = LogPersistence::BLOB { holder }; } Ok(()) } } fn generate_log_id(backup_id: &str) -> String { format!( "{backup_id}{sep}{uuid}", backup_id = backup_id, sep = ID_SEPARATOR, uuid = Uuid::new_v4() ) } diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs index b1f1ce0ee..3b3fc1922 100644 --- a/services/backup/src/service/mod.rs +++ b/services/backup/src/service/mod.rs @@ -1,233 +1,241 @@ use aws_sdk_dynamodb::Error as DynamoDBError; use proto::backup_service_server::BackupService; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; use crate::{ + blob::BlobClient, constants::MPSC_CHANNEL_BUFFER_CAPACITY, database::{DatabaseClient, Error as DBError}, }; mod proto { tonic::include_proto!("backup"); } pub use proto::backup_service_server::BackupServiceServer; /// submodule containing gRPC endpoint handler implementations mod handlers { pub(super) mod add_attachments; pub(super) mod create_backup; pub(super) mod pull_backup; pub(super) mod send_log; // re-exports for convenient usage in handlers pub(self) use super::handle_db_error; pub(self) use super::proto; } use self::handlers::create_backup::CreateBackupHandler; use self::handlers::pull_backup::PullBackupHandler; use self::handlers::send_log::SendLogHandler; pub struct MyBackupService { db: DatabaseClient, + blob_client: BlobClient, } impl MyBackupService { - pub fn new(db_client: DatabaseClient) -> Self { - MyBackupService { db: db_client } + pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self { + MyBackupService { + db: db_client, + blob_client, + } } } // gRPC implementation #[tonic::async_trait] impl BackupService for MyBackupService { type CreateNewBackupStream = Pin< Box< dyn Stream> + Send, >, >; #[instrument(skip_all, fields(device_id, data_hash, backup_id, blob_holder))] async fn create_new_backup( &self, request: Request>, ) -> Result, Status> { use proto::create_new_backup_request::Data::*; info!("CreateNewBackup request: {:?}", request); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let db = self.db.clone(); + let blob_client = self.blob_client.clone(); let worker = async move { - let mut handler = CreateBackupHandler::new(&db); + let mut handler = CreateBackupHandler::new(db, blob_client); while let Some(message) = in_stream.next().await { let response = match message { Ok(proto::CreateNewBackupRequest { data: Some(UserId(user_id)), }) => handler.handle_user_id(user_id).await, Ok(proto::CreateNewBackupRequest { data: Some(DeviceId(device_id)), }) => handler.handle_device_id(device_id).await, Ok(proto::CreateNewBackupRequest { data: Some(KeyEntropy(key_entropy)), }) => handler.handle_key_entropy(key_entropy).await, Ok(proto::CreateNewBackupRequest { data: Some(NewCompactionHash(hash)), }) => handler.handle_data_hash(hash).await, Ok(proto::CreateNewBackupRequest { data: Some(NewCompactionChunk(chunk)), }) => handler.handle_data_chunk(chunk).await, unexpected => { error!("Received an unexpected request: {:?}", unexpected); Err(Status::unknown("unknown error")) } }; trace!("Sending response: {:?}", response); if let Err(e) = tx.send(response).await { error!("Response was dropped: {}", e); break; } if handler.should_close_stream { trace!("Handler requested to close stream"); break; } } if let Err(status) = handler.finish().await { trace!("Sending error response: {:?}", status); let _ = tx.send(Err(status)).await; } debug!("Request finished processing"); }; tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::CreateNewBackupStream )) } #[instrument(skip_all, fields(backup_id, log_hash, log_id))] async fn send_log( &self, request: Request>, ) -> Result, Status> { use proto::send_log_request::Data::*; info!("SendLog request: {:?}", request); - let mut handler = SendLogHandler::new(&self.db); + let mut handler = SendLogHandler::new(&self.db, &self.blob_client); let mut in_stream = request.into_inner(); while let Some(message) = in_stream.next().await { let result = match message { Ok(proto::SendLogRequest { data: Some(UserId(user_id)), }) => handler.handle_user_id(user_id).await, Ok(proto::SendLogRequest { data: Some(BackupId(backup_id)), }) => handler.handle_backup_id(backup_id).await, Ok(proto::SendLogRequest { data: Some(LogHash(log_hash)), }) => handler.handle_log_hash(log_hash).await, Ok(proto::SendLogRequest { data: Some(LogData(chunk)), }) => handler.handle_log_data(chunk).await, unexpected => { error!("Received an unexpected request: {:?}", unexpected); Err(Status::unknown("unknown error")) } }; if let Err(err) = result { error!("An error occurred when processing request: {:?}", err); return Err(err); } if handler.should_close_stream { trace!("Handler requested to close request stream"); break; } } let response = handler.finish().await; debug!("Finished. Sending response: {:?}", response); response.map(|response_body| Response::new(response_body)) } type RecoverBackupKeyStream = Pin< Box< dyn Stream> + Send, >, >; #[instrument(skip(self))] async fn recover_backup_key( &self, _request: Request>, ) -> Result, Status> { Err(Status::unimplemented("unimplemented")) } type PullBackupStream = Pin< Box> + Send>, >; #[instrument(skip_all, fields(backup_id = &request.get_ref().backup_id))] async fn pull_backup( &self, request: Request, ) -> Result, Status> { info!("PullBackup request: {:?}", request); let handler = - PullBackupHandler::new(&self.db, request.into_inner()).await?; + PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner()) + .await?; let stream = handler.into_response_stream().in_current_span(); Ok(Response::new(Box::pin(stream) as Self::PullBackupStream)) } #[instrument(skip_all, fields( backup_id = &request.get_ref().backup_id, log_id = &request.get_ref().log_id) )] async fn add_attachments( &self, request: Request, ) -> Result, Status> { info!( "AddAttachment request. New holders: {}", &request.get_ref().holders ); handlers::add_attachments::handle_add_attachments( &self.db, + &self.blob_client, request.into_inner(), ) .await?; info!("Request processed successfully"); Ok(Response::new(())) } } /// A helper converting our Database errors into gRPC responses fn handle_db_error(db_error: DBError) -> Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); Status::unavailable("please retry") } e => { error!("Encountered an unexpected error: {}", e); Status::failed_precondition("unexpected error") } } }