Page MenuHomePhabricator

D6242.diff
No OneTemporary

D6242.diff

diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs
--- a/services/backup/src/constants.rs
+++ b/services/backup/src/constants.rs
@@ -8,8 +8,29 @@
// 400KiB limit (in docs there is KB but they mean KiB) -
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
// This includes both attribute names' and values' lengths
+//
+// This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to
+// recognize if we may receive multiple chunks or just one. If it was larger
+// than the chunk limit, once we get the amount of data of size equal to the
+// limit, we wouldn't know if we should put this in the database right away or
+// wait for more data.
pub const LOG_DATA_SIZE_DATABASE_LIMIT: usize = 1024 * 400;
+// 4MB limit
+// WARNING: use keeping in mind that grpc adds its own headers to messages
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
+// so the message that actually is being sent over the network looks like this
+// [Compressed-Flag] [Message-Length] [Message]
+// [Compressed-Flag] 1 byte - added by grpc
+// [Message-Length] 4 bytes - added by grpc
+// [Message] N bytes - actual data
+// so for every message we get 5 additional bytes of data
+// as mentioned here
+// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671
+// grpc stream may contain more than one message
+pub const GRPC_CHUNK_SIZE_LIMIT: usize = 4 * 1024 * 1024;
+pub const GRPC_METADATA_SIZE_PER_MESSAGE: usize = 5;
+
// Configuration defaults
pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051;
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ b/services/backup/src/service/handlers/pull_backup.rs
@@ -1,11 +1,14 @@
use async_stream::try_stream;
use tokio_stream::{Stream, StreamExt};
use tonic::Status;
-use tracing::{debug, trace};
+use tracing::{debug, trace, warn};
use super::handle_db_error;
use super::proto::{self, PullBackupResponse};
-use crate::database::{BackupItem, DatabaseClient, LogItem};
+use crate::{
+ constants::{GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE},
+ database::{BackupItem, DatabaseClient, LogItem},
+};
pub struct PullBackupHandler {
backup_item: BackupItem,
@@ -102,3 +105,95 @@
yield Err(Status::unimplemented("Not implemented yet"));
}
}
+
+/// A utility structure that buffers downloaded data and allows to retrieve it
+/// as chunks of arbitrary size, not greater than provided `limit`.
+struct ResponseBuffer {
+ buf: Vec<u8>,
+ limit: usize,
+}
+
+impl Default for ResponseBuffer {
+ /// Buffer size defaults to max usable gRPC message size
+ fn default() -> Self {
+ ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE)
+ }
+}
+
+impl ResponseBuffer {
+ pub fn new(limit: usize) -> Self {
+ ResponseBuffer {
+ buf: Vec::new(),
+ limit,
+ }
+ }
+
+ pub fn put(&mut self, data: Vec<u8>) {
+ if data.len() > self.limit {
+ warn!("Data saved to buffer is larger than chunk limit.");
+ }
+
+ self.buf.extend(data);
+ }
+
+ /// Gets chunk of size `limit - padding` and leaves remainder in buffer
+ pub fn get_chunk(&mut self, padding: usize) -> Vec<u8> {
+ let mut chunk = std::mem::take(&mut self.buf);
+
+ let target_size = self.limit - padding;
+ if chunk.len() > target_size {
+ // after this operation, chunk=0..target_size, self.buf=target_size..end
+ self.buf = chunk.split_off(target_size);
+ }
+ return chunk;
+ }
+
+ /// Does buffer length exceed given limit
+ pub fn is_saturated(&self) -> bool {
+ self.buf.len() >= self.limit
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.buf.is_empty()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ const LIMIT: usize = 100;
+
+ #[test]
+ fn test_response_buffer() {
+ let mut buffer = ResponseBuffer::new(LIMIT);
+ assert_eq!(buffer.is_empty(), true);
+
+ // put 80 bytes of data
+ buffer.put(vec![0u8; 80]);
+ assert_eq!(buffer.is_empty(), false);
+ assert_eq!(buffer.is_saturated(), false);
+
+ // put next 80 bytes, should be saturated as 160 > 100
+ buffer.put(vec![0u8; 80]);
+ let buf_size = buffer.buf.len();
+ assert_eq!(buffer.is_saturated(), true);
+ assert_eq!(buf_size, 160);
+
+ // get one chunk
+ let padding: usize = 10;
+ let expected_chunk_size = LIMIT - padding;
+ let chunk = buffer.get_chunk(padding);
+ assert_eq!(chunk.len(), expected_chunk_size); // 90
+
+ // buffer should not be saturated now (160 - 90 < 100)
+ let remaining_buf_size = buffer.buf.len();
+ assert_eq!(remaining_buf_size, buf_size - expected_chunk_size);
+ assert_eq!(buffer.is_saturated(), false);
+
+ // get last chunk
+ let chunk = buffer.get_chunk(padding);
+ assert_eq!(chunk.len(), remaining_buf_size);
+ assert_eq!(buffer.is_empty(), true);
+ }
+}

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 8:12 PM (18 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2678749
Default Alt Text
D6242.diff (4 KB)

Event Timeline