diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,10 +1,15 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use blob::blob_service_server::BlobService; use std::{pin::Pin, sync::Arc}; -use tokio_stream::Stream; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{Request, Response, Status}; use crate::{ + constants::{ + GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, + MPSC_CHANNEL_BUFFER_CAPACITY, + }, database::{BlobItem, DatabaseClient, ReverseIndexItem}, s3::S3Path, }; @@ -69,9 +74,74 @@ Pin> + Send>>; async fn get( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let message: blob::GetRequest = request.into_inner(); + let s3_path = self + .find_s3_path_by_holder(&message.holder) + .await + .context("Failed to find S3 path") + .map_err(|err| Status::not_found(err.to_string()))?; + + let chunk_size: u64 = + GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; + let file_size = self + .s3 + .head_object() + .bucket(s3_path.bucket_name.clone()) + .key(s3_path.object_name.clone()) + .send() + .await + .context("Failed to get object metadata") + .map(|metadata| metadata.content_length().unsigned_abs()) + .map_err(|err| Status::unknown(err.to_string()))?; + + let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let s3 = self.s3.clone(); + + tokio::spawn(async move { + let mut offset: u64 = 0; + while offset < file_size { + let next_size = std::cmp::min(chunk_size, file_size - offset); + let range = format!("bytes={}-{}", offset, offset + next_size - 1); + + let data = match s3 + .get_object() + .bucket(&s3_path.bucket_name) + .key(&s3_path.object_name) + .range(range) + .send() + .await + .context("Failed to retrieve object data") + { + Ok(part) => { + part.body.collect().await.context("Failed to collect bytes") + } + Err(e) => Err(e), + }; + + let response = match data { + Ok(data) => Ok(blob::GetResponse { + data_chunk: data.into_bytes().to_vec(), + }), + Err(e) => { + let response = Err(Status::unknown(e.to_string())); + let _ = tx.send(response).await; + break; + } + }; + + if let Err(e) = tx.send(response).await { + println!("Response was dropped: {}", e); + break; + } + + offset += chunk_size; + } + }); + + let out_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) } async fn remove(