diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs index 0a957ea16..b8c59a6fb 100644 --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -1,25 +1,45 @@ // Assorted constants pub const GRPC_SERVER_DEFAULT_PORT: u64 = 50051; pub const AWS_REGION: &str = "us-east-2"; pub const LOCALSTACK_URL: &str = "http://localhost:4566"; +pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; + +/// 4MB limit +/// +/// WARNING: use keeping in mind that grpc adds its own headers to messages +/// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md +/// so the message that actually is being sent over the network looks like this +/// ``` +/// [Compressed-Flag] [Message-Length] [Message] +/// [Compressed-Flag] 1 byte - added by grpc +/// [Message-Length] 4 bytes - added by grpc +/// [Message] N bytes - actual data +/// ``` +/// so for every message we get 5 additional bytes of data +/// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671), +/// gRPC stream may contain more than one message +pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024; + +/// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details +pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5; // DynamoDB constants pub const BLOB_TABLE_NAME: &str = "blob-service-blob"; pub const BLOB_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_TABLE_S3_PATH_FIELD: &str = "s3Path"; pub const BLOB_TABLE_CREATED_FIELD: &str = "created"; pub const BLOB_REVERSE_INDEX_TABLE_NAME: &str = "blob-service-reverse-index"; pub const BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD: &str = "holder"; pub const BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME: &str = "blobHash-index"; // Environment variables pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; // S3 constants pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob"; diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs index f71bde8f8..959262eb3 100644 --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,54 +1,82 @@ use anyhow::Result; use blob::blob_service_server::BlobService; use std::{pin::Pin, sync::Arc}; use tokio_stream::Stream; use tonic::{Request, Response, Status}; -use crate::database::DatabaseClient; +use crate::{ + database::{BlobItem, DatabaseClient, ReverseIndexItem}, + s3::S3Path, +}; pub mod blob { tonic::include_proto!("blob"); } pub struct MyBlobService { db: DatabaseClient, s3: Arc, } impl MyBlobService { pub fn new(db_client: DatabaseClient, s3_client: aws_sdk_s3::Client) -> Self { MyBlobService { db: db_client, s3: Arc::new(s3_client), } } + + async fn find_s3_path_by_reverse_index( + &self, + reverse_index_item: &ReverseIndexItem, + ) -> Result { + let blob_hash = &reverse_index_item.blob_hash; + match self.db.find_blob_item(&blob_hash).await { + Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), + Ok(None) => Err(Status::not_found("blob not found")), + Err(_) => Err(Status::aborted("internal error")), + } + } + + async fn find_s3_path_by_holder( + &self, + holder: &str, + ) -> Result { + match self.db.find_reverse_index_by_holder(holder).await { + Ok(Some(reverse_index)) => { + self.find_s3_path_by_reverse_index(&reverse_index).await + } + Ok(None) => Err(Status::not_found("blob not found")), + Err(_) => Err(Status::aborted("internal error")), + } + } } // gRPC implementation #[tonic::async_trait] impl BlobService for MyBlobService { type PutStream = Pin> + Send>>; async fn put( &self, _request: Request>, ) -> Result, Status> { Err(Status::unimplemented("Not implemented yet")) } type GetStream = Pin> + Send>>; async fn get( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("Not implemented yet")) } async fn remove( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("Not implemented yet")) } }