diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,10 +1,15 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use blob::blob_service_server::BlobService; use std::{pin::Pin, sync::Arc}; -use tokio_stream::Stream; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{Request, Response, Status}; use crate::{ + constants::{ + GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, + MPSC_CHANNEL_BUFFER_CAPACITY, + }, database::{BlobItem, DatabaseClient, ReverseIndexItem}, s3::S3Path, }; @@ -68,9 +73,70 @@ Pin> + Send>>; async fn get( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let message: blob::GetRequest = request.into_inner(); + let s3_path = self.find_s3_path_by_holder(&message.holder).await?; + + let object_metadata = self + .s3 + .head_object() + .bucket(s3_path.bucket_name.clone()) + .key(s3_path.object_name.clone()) + .send() + .await + .map_err(|_| Status::aborted("server error"))?; + + let file_size: u64 = object_metadata.content_length().unsigned_abs(); + let chunk_size: u64 = + GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; + + let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let s3 = self.s3.clone(); + + tokio::spawn(async move { + let mut offset: u64 = 0; + while offset < file_size { + let next_size = std::cmp::min(chunk_size, file_size - offset); + let range = format!("bytes={}-{}", offset, offset + next_size - 1); + + let data = match s3 + .get_object() + .bucket(&s3_path.bucket_name) + .key(&s3_path.object_name) + .range(range) + .send() + .await + .context("Failed to retrieve object data") + { + Ok(part) => { + part.body.collect().await.context("Failed to collect bytes") + } + Err(e) => Err(e), + }; + + let response = match data { + Ok(data) => Ok(blob::GetResponse { + data_chunk: data.into_bytes().to_vec(), + }), + Err(_) => Err(Status::aborted("download failed")), + }; + + let should_abort = response.is_err(); + if let Err(e) = tx.send(response).await { + println!("Response was dropped: {}", e); + break; + } + if should_abort { + break; + } + + offset += chunk_size; + } + }); + + let out_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) } async fn remove(