Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3491740
D6242.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Referenced Files
None
Subscribers
None
D6242.diff
View Options
diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs
--- a/services/backup/src/constants.rs
+++ b/services/backup/src/constants.rs
@@ -8,8 +8,29 @@
// 400KiB limit (in docs there is KB but they mean KiB) -
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
// This includes both attribute names' and values' lengths
+//
+// This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to
+// recognize if we may receive multiple chunks or just one. If it was larger
+// than the chunk limit, once we get the amount of data of size equal to the
+// limit, we wouldn't know if we should put this in the database right away or
+// wait for more data.
pub const LOG_DATA_SIZE_DATABASE_LIMIT: usize = 1024 * 400;
+// 4MB limit
+// WARNING: use keeping in mind that grpc adds its own headers to messages
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
+// so the message that actually is being sent over the network looks like this
+// [Compressed-Flag] [Message-Length] [Message]
+// [Compressed-Flag] 1 byte - added by grpc
+// [Message-Length] 4 bytes - added by grpc
+// [Message] N bytes - actual data
+// so for every message we get 5 additional bytes of data
+// as mentioned here
+// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671
+// grpc stream may contain more than one message
+pub const GRPC_CHUNK_SIZE_LIMIT: usize = 4 * 1024 * 1024;
+pub const GRPC_METADATA_SIZE_PER_MESSAGE: usize = 5;
+
// Configuration defaults
pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051;
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ b/services/backup/src/service/handlers/pull_backup.rs
@@ -1,11 +1,14 @@
use async_stream::try_stream;
use tokio_stream::{Stream, StreamExt};
use tonic::Status;
-use tracing::{debug, trace};
+use tracing::{debug, trace, warn};
use super::handle_db_error;
use super::proto::{self, PullBackupResponse};
-use crate::database::{BackupItem, DatabaseClient, LogItem};
+use crate::{
+ constants::{GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE},
+ database::{BackupItem, DatabaseClient, LogItem},
+};
pub struct PullBackupHandler {
backup_item: BackupItem,
@@ -102,3 +105,95 @@
yield Err(Status::unimplemented("Not implemented yet"));
}
}
+
+/// A utility structure that buffers downloaded data and allows to retrieve it
+/// as chunks of arbitrary size, not greater than provided `limit`.
+struct ResponseBuffer {
+ buf: Vec<u8>,
+ limit: usize,
+}
+
+impl Default for ResponseBuffer {
+ /// Buffer size defaults to max usable gRPC message size
+ fn default() -> Self {
+ ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE)
+ }
+}
+
+impl ResponseBuffer {
+ pub fn new(limit: usize) -> Self {
+ ResponseBuffer {
+ buf: Vec::new(),
+ limit,
+ }
+ }
+
+ pub fn put(&mut self, data: Vec<u8>) {
+ if data.len() > self.limit {
+ warn!("Data saved to buffer is larger than chunk limit.");
+ }
+
+ self.buf.extend(data);
+ }
+
+ /// Gets chunk of size `limit - padding` and leaves remainder in buffer
+ pub fn get_chunk(&mut self, padding: usize) -> Vec<u8> {
+ let mut chunk = std::mem::take(&mut self.buf);
+
+ let target_size = self.limit - padding;
+ if chunk.len() > target_size {
+ // after this operation, chunk=0..target_size, self.buf=target_size..end
+ self.buf = chunk.split_off(target_size);
+ }
+ return chunk;
+ }
+
+ /// Does buffer length exceed given limit
+ pub fn is_saturated(&self) -> bool {
+ self.buf.len() >= self.limit
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.buf.is_empty()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ const LIMIT: usize = 100;
+
+ #[test]
+ fn test_response_buffer() {
+ let mut buffer = ResponseBuffer::new(LIMIT);
+ assert_eq!(buffer.is_empty(), true);
+
+ // put 80 bytes of data
+ buffer.put(vec![0u8; 80]);
+ assert_eq!(buffer.is_empty(), false);
+ assert_eq!(buffer.is_saturated(), false);
+
+ // put next 80 bytes, should be saturated as 160 > 100
+ buffer.put(vec![0u8; 80]);
+ let buf_size = buffer.buf.len();
+ assert_eq!(buffer.is_saturated(), true);
+ assert_eq!(buf_size, 160);
+
+ // get one chunk
+ let padding: usize = 10;
+ let expected_chunk_size = LIMIT - padding;
+ let chunk = buffer.get_chunk(padding);
+ assert_eq!(chunk.len(), expected_chunk_size); // 90
+
+ // buffer should not be saturated now (160 - 90 < 100)
+ let remaining_buf_size = buffer.buf.len();
+ assert_eq!(remaining_buf_size, buf_size - expected_chunk_size);
+ assert_eq!(buffer.is_saturated(), false);
+
+ // get last chunk
+ let chunk = buffer.get_chunk(padding);
+ assert_eq!(chunk.len(), remaining_buf_size);
+ assert_eq!(buffer.is_empty(), true);
+ }
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 8:12 PM (18 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2678749
Default Alt Text
D6242.diff (4 KB)
Attached To
Mode
D6242: [services][backup] PullBackup 3/5 - response buffer utility
Attached
Detach File
Event Timeline
Log In to Comment