diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -3,6 +3,26 @@ pub const GRPC_SERVER_DEFAULT_PORT: u64 = 50051; pub const AWS_REGION: &str = "us-east-2"; pub const LOCALSTACK_URL: &str = "http://localhost:4566"; +pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; + +/// 4MB limit +/// +/// WARNING: use keeping in mind that grpc adds its own headers to messages +/// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md +/// so the message that actually is being sent over the network looks like this +/// ``` +/// [Compressed-Flag] [Message-Length] [Message] +/// [Compressed-Flag] 1 byte - added by grpc +/// [Message-Length] 4 bytes - added by grpc +/// [Message] N bytes - actual data +/// ``` +/// so for every message we get 5 additional bytes of data +/// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671), +/// gRPC stream may contain more than one message +pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024; + +/// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details +pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5; // DynamoDB constants diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -4,7 +4,10 @@ use tokio_stream::Stream; use tonic::{Request, Response, Status}; -use crate::database::DatabaseClient; +use crate::{ + database::{BlobItem, DatabaseClient, ReverseIndexItem}, + s3::S3Path, +}; pub mod blob { tonic::include_proto!("blob"); @@ -22,6 +25,43 @@ s3: Arc::new(s3_client), } } + + async fn find_s3_path_by_reverse_index( + &self, + reverse_index_item: &ReverseIndexItem, + ) -> Result { + let blob_hash = &reverse_index_item.blob_hash; + match self.db.find_blob_item(&blob_hash).await { + Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), + Ok(None) => Err(Status::not_found(format!( + "No blob found for blobHash: [{}]", + blob_hash + ))), + Err(e) => Err(Status::internal(format!( + "Could not load S3 path: {}", + e.to_string() + ))), + } + } + + async fn find_s3_path_by_holder( + &self, + holder: &str, + ) -> Result { + match self.db.find_reverse_index_by_holder(holder).await { + Ok(Some(reverse_index)) => { + self.find_s3_path_by_reverse_index(&reverse_index).await + } + Ok(None) => Err(Status::not_found(format!( + "No blob found for holder: [{}]", + holder + ))), + Err(e) => Err(Status::internal(format!( + "Could not load S3 path: {}", + e.to_string() + ))), + } + } } // gRPC implementation