diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -9,8 +9,29 @@ // 400KiB limit (in docs there is KB but they mean KiB) - // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html // This includes both attribute names' and values' lengths +// +// This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to +// recognize if we may receive multiple chunks or just one. If it was larger +// than the chunk limit, once we get the amount of data of size equal to the +// limit, we wouldn't know if we should put this in the database right away or +// wait for more data. pub const LOG_DATA_SIZE_DATABASE_LIMIT: usize = 1024 * 400; +// 4MB limit +// WARNING: use keeping in mind that grpc adds its own headers to messages +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md +// so the message that actually is being sent over the network looks like this +// [Compressed-Flag] [Message-Length] [Message] +// [Compressed-Flag] 1 byte - added by grpc +// [Message-Length] 4 bytes - added by grpc +// [Message] N bytes - actual data +// so for every message we get 5 additional bytes of data +// as mentioned here +// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671 +// grpc stream may contain more than one message +pub const GRPC_CHUNK_SIZE_LIMIT: usize = 4 * 1024 * 1024; +pub const GRPC_METADATA_SIZE_PER_MESSAGE: usize = 5; + // Configuration defaults pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051; diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -1,11 +1,14 @@ use async_stream::try_stream; use tokio_stream::{Stream, StreamExt}; use tonic::Status; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; use super::handle_db_error; use super::proto::{self, PullBackupResponse}; -use crate::database::{BackupItem, DatabaseClient, LogItem}; +use crate::{ + constants::{GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE}, + database::{BackupItem, DatabaseClient, LogItem}, +}; pub struct PullBackupHandler { backup_item: BackupItem, @@ -102,3 +105,55 @@ yield Err(Status::unimplemented("Not implemented yet")); } } + +/// A utility structure that buffers downloaded data and allows to retrieve it +/// as chunks of arbitrary size, not greater than provided `limit`. +struct ResponseBuffer { + buf: Vec, + limit: usize, +} + +impl Default for ResponseBuffer { + /// Buffer size defaults to max usable gRPC message size + fn default() -> Self { + ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE) + } +} + +impl ResponseBuffer { + pub fn new(limit: usize) -> Self { + ResponseBuffer { + buf: Vec::new(), + limit, + } + } + + pub fn put(&mut self, data: Vec) { + if data.len() > self.limit { + warn!("Data saved to buffer is larger than chunk size."); + } + + self.buf.extend(data); + } + + /// Gets chunk of size `limit - padding` and leaves remainder in buffer + pub fn get(&mut self, padding: usize) -> Vec { + let mut chunk = std::mem::take(&mut self.buf); + + let total_size = chunk.len() + padding; + if total_size > self.limit { + // after this operation, chunk=0..limit, self.buf=limit..end + self.buf = chunk.split_off(self.limit); + } + return chunk; + } + + /// Does buffer length exceed given limit + pub fn is_saturated(&self) -> bool { + self.buf.len() >= self.limit + } + + pub fn is_empty(&self) -> bool { + self.buf.is_empty() + } +}