diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs index e204c730a..3944b4da9 100644 --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,42 +1,63 @@ // Assorted constants pub const AWS_REGION: &str = "us-east-2"; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const ID_SEPARATOR: &str = ":"; pub const ATTACHMENT_HOLDER_SEPARATOR: &str = ";"; // 400KiB limit (in docs there is KB but they mean KiB) - // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html // This includes both attribute names' and values' lengths +// +// This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to +// recognize if we may receive multiple chunks or just one. If it was larger +// than the chunk limit, once we get the amount of data of size equal to the +// limit, we wouldn't know if we should put this in the database right away or +// wait for more data. pub const LOG_DATA_SIZE_DATABASE_LIMIT: usize = 1024 * 400; +// 4MB limit +// WARNING: use keeping in mind that grpc adds its own headers to messages +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md +// so the message that actually is being sent over the network looks like this +// [Compressed-Flag] [Message-Length] [Message] +// [Compressed-Flag] 1 byte - added by grpc +// [Message-Length] 4 bytes - added by grpc +// [Message] N bytes - actual data +// so for every message we get 5 additional bytes of data +// as mentioned here +// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671 +// grpc stream may contain more than one message +pub const GRPC_CHUNK_SIZE_LIMIT: usize = 4 * 1024 * 1024; +pub const GRPC_METADATA_SIZE_PER_MESSAGE: usize = 5; + // Configuration defaults pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051; pub const DEFAULT_LOCALSTACK_URL: &str = "http://localhost:4566"; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // Environment variable names pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // DynamoDB constants pub const BACKUP_TABLE_NAME: &str = "backup-service-backup"; pub const BACKUP_TABLE_FIELD_USER_ID: &str = "userID"; pub const BACKUP_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const BACKUP_TABLE_FIELD_CREATED: &str = "created"; pub const BACKUP_TABLE_FIELD_RECOVERY_DATA: &str = "recoveryData"; pub const BACKUP_TABLE_FIELD_COMPACTION_HOLDER: &str = "compactionHolder"; pub const BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const BACKUP_TABLE_INDEX_USERID_CREATED: &str = "userID-created-index"; pub const LOG_TABLE_NAME: &str = "backup-service-log"; pub const LOG_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const LOG_TABLE_FIELD_LOG_ID: &str = "logID"; pub const LOG_TABLE_FIELD_PERSISTED_IN_BLOB: &str = "persistedInBlob"; pub const LOG_TABLE_FIELD_VALUE: &str = "value"; pub const LOG_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const LOG_TABLE_FIELD_DATA_HASH: &str = "dataHash"; diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs index 8cf43aaf7..737bdd938 100644 --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -1,104 +1,199 @@ use async_stream::try_stream; use tokio_stream::{Stream, StreamExt}; use tonic::Status; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; use super::handle_db_error; use super::proto::{self, PullBackupResponse}; -use crate::database::{BackupItem, DatabaseClient, LogItem}; +use crate::{ + constants::{GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE}, + database::{BackupItem, DatabaseClient, LogItem}, +}; pub struct PullBackupHandler { backup_item: BackupItem, logs: Vec, } impl PullBackupHandler { pub async fn new( db: &DatabaseClient, request: proto::PullBackupRequest, ) -> Result { let proto::PullBackupRequest { user_id, backup_id } = request; let backup_item = db .find_backup_item(&user_id, &backup_id) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Backup item not found"); Status::not_found("Backup item not found") })?; let backup_id = backup_item.backup_id.as_str(); let logs = db .find_log_items_for_backup(backup_id) .await .map_err(handle_db_error)?; Ok(PullBackupHandler { backup_item, logs }) } /// Consumes the handler and provides a response `Stream`. The stream will /// produce the following in order: /// - Backup compaction data chunks /// - Backup logs /// - Whole log, if stored in db /// - Log chunks, if stored in blob pub fn into_response_stream( self, ) -> impl Stream> { use proto::pull_backup_response::*; try_stream! { debug!("Pulling backup..."); { let compaction_stream = backup_compaction_stream(&self.backup_item); tokio::pin!(compaction_stream); while let Some(response) = compaction_stream.try_next().await? { yield response; } } trace!("Backup data pull complete."); if self.logs.is_empty() { debug!("No logs to pull. Finishing"); return; } debug!("Pulling logs..."); for log in self.logs { trace!("Pulling log ID={}", &log.log_id); if log.persisted_in_blob { trace!("Log persisted in blob"); let log_data_stream = log_stream(&log); tokio::pin!(log_data_stream); while let Some(response) = log_data_stream.try_next().await? { yield response; } } else { yield proto::PullBackupResponse { attachment_holders: Some(log.attachment_holders), id: Some(Id::LogId(log.log_id)), data: Some(Data::LogChunk(log.value.into_bytes())), }; } } trace!("Pulled all logs, done"); } } } fn backup_compaction_stream( backup_item: &BackupItem, ) -> impl Stream> + '_ { async_stream::stream! { yield Err(Status::unimplemented("Not implemented yet")); } } fn log_stream( log: &LogItem, ) -> impl Stream> + '_ { async_stream::stream! { yield Err(Status::unimplemented("Not implemented yet")); } } + +/// A utility structure that buffers downloaded data and allows to retrieve it +/// as chunks of arbitrary size, not greater than provided `limit`. +struct ResponseBuffer { + buf: Vec, + limit: usize, +} + +impl Default for ResponseBuffer { + /// Buffer size defaults to max usable gRPC message size + fn default() -> Self { + ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE) + } +} + +impl ResponseBuffer { + pub fn new(limit: usize) -> Self { + ResponseBuffer { + buf: Vec::new(), + limit, + } + } + + pub fn put(&mut self, data: Vec) { + if data.len() > self.limit { + warn!("Data saved to buffer is larger than chunk limit."); + } + + self.buf.extend(data); + } + + /// Gets chunk of size `limit - padding` and leaves remainder in buffer + pub fn get_chunk(&mut self, padding: usize) -> Vec { + let mut chunk = std::mem::take(&mut self.buf); + + let target_size = self.limit - padding; + if chunk.len() > target_size { + // after this operation, chunk=0..target_size, self.buf=target_size..end + self.buf = chunk.split_off(target_size); + } + return chunk; + } + + /// Does buffer length exceed given limit + pub fn is_saturated(&self) -> bool { + self.buf.len() >= self.limit + } + + pub fn is_empty(&self) -> bool { + self.buf.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const LIMIT: usize = 100; + + #[test] + fn test_response_buffer() { + let mut buffer = ResponseBuffer::new(LIMIT); + assert_eq!(buffer.is_empty(), true); + + // put 80 bytes of data + buffer.put(vec![0u8; 80]); + assert_eq!(buffer.is_empty(), false); + assert_eq!(buffer.is_saturated(), false); + + // put next 80 bytes, should be saturated as 160 > 100 + buffer.put(vec![0u8; 80]); + let buf_size = buffer.buf.len(); + assert_eq!(buffer.is_saturated(), true); + assert_eq!(buf_size, 160); + + // get one chunk + let padding: usize = 10; + let expected_chunk_size = LIMIT - padding; + let chunk = buffer.get_chunk(padding); + assert_eq!(chunk.len(), expected_chunk_size); // 90 + + // buffer should not be saturated now (160 - 90 < 100) + let remaining_buf_size = buffer.buf.len(); + assert_eq!(remaining_buf_size, buf_size - expected_chunk_size); + assert_eq!(buffer.is_saturated(), false); + + // get last chunk + let chunk = buffer.get_chunk(padding); + assert_eq!(chunk.len(), remaining_buf_size); + assert_eq!(buffer.is_empty(), true); + } +}