Changeset View
Changeset View
Standalone View
Standalone View
services/blob/src/service.rs
use anyhow::Result; | use anyhow::{Context, Result}; | ||||
use blob::blob_service_server::BlobService; | use blob::blob_service_server::BlobService; | ||||
use std::{pin::Pin, sync::Arc}; | use std::{pin::Pin, sync::Arc}; | ||||
use tokio_stream::Stream; | use tokio::sync::mpsc; | ||||
use tokio_stream::{wrappers::ReceiverStream, Stream}; | |||||
use tonic::{Request, Response, Status}; | use tonic::{Request, Response, Status}; | ||||
use crate::{ | use crate::{ | ||||
constants::{ | |||||
GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, | |||||
MPSC_CHANNEL_BUFFER_CAPACITY, | |||||
}, | |||||
database::{BlobItem, DatabaseClient, ReverseIndexItem}, | database::{BlobItem, DatabaseClient, ReverseIndexItem}, | ||||
s3::S3Path, | s3::S3Path, | ||||
}; | }; | ||||
pub mod blob { | pub mod blob { | ||||
tonic::include_proto!("blob"); | tonic::include_proto!("blob"); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | impl BlobService for MyBlobService { | ||||
) -> Result<Response<Self::PutStream>, Status> { | ) -> Result<Response<Self::PutStream>, Status> { | ||||
Err(Status::unimplemented("Not implemented yet")) | Err(Status::unimplemented("Not implemented yet")) | ||||
} | } | ||||
type GetStream = | type GetStream = | ||||
Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>; | Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>; | ||||
async fn get( | async fn get( | ||||
&self, | &self, | ||||
_request: Request<blob::GetRequest>, | request: Request<blob::GetRequest>, | ||||
) -> Result<Response<Self::GetStream>, Status> { | ) -> Result<Response<Self::GetStream>, Status> { | ||||
Err(Status::unimplemented("Not implemented yet")) | let message: blob::GetRequest = request.into_inner(); | ||||
let s3_path = self.find_s3_path_by_holder(&message.holder).await?; | |||||
let object_metadata = self | |||||
.s3 | |||||
.head_object() | |||||
.bucket(s3_path.bucket_name.clone()) | |||||
.key(s3_path.object_name.clone()) | |||||
.send() | |||||
.await | |||||
.map_err(|_| Status::aborted("server error"))?; | |||||
let file_size: u64 = object_metadata | |||||
.content_length() | |||||
.try_into() | |||||
.map_err(|_| Status::aborted("server error"))?; | |||||
let chunk_size: u64 = | |||||
GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; | |||||
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); | |||||
let s3 = self.s3.clone(); | |||||
tokio::spawn(async move { | |||||
let mut offset: u64 = 0; | |||||
while offset < file_size { | |||||
let next_size = std::cmp::min(chunk_size, file_size - offset); | |||||
let range = format!("bytes={}-{}", offset, offset + next_size - 1); | |||||
let data = match s3 | |||||
.get_object() | |||||
.bucket(&s3_path.bucket_name) | |||||
.key(&s3_path.object_name) | |||||
.range(range) | |||||
.send() | |||||
.await | |||||
.context("Failed to retrieve object data") | |||||
{ | |||||
Ok(part) => { | |||||
part.body.collect().await.context("Failed to collect bytes") | |||||
} | |||||
Err(e) => Err(e), | |||||
}; | |||||
let response = match data { | |||||
Ok(data) => Ok(blob::GetResponse { | |||||
data_chunk: data.into_bytes().to_vec(), | |||||
}), | |||||
Err(_) => Err(Status::aborted("download failed")), | |||||
}; | |||||
let should_abort = response.is_err(); | |||||
if let Err(e) = tx.send(response).await { | |||||
println!("Response was dropped: {}", e); | |||||
break; | |||||
} | |||||
if should_abort { | |||||
break; | |||||
} | |||||
offset += chunk_size; | |||||
} | |||||
}); | |||||
let out_stream = ReceiverStream::new(rx); | |||||
Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) | |||||
} | } | ||||
async fn remove( | async fn remove( | ||||
&self, | &self, | ||||
_request: Request<blob::RemoveRequest>, | _request: Request<blob::RemoveRequest>, | ||||
) -> Result<Response<()>, Status> { | ) -> Result<Response<()>, Status> { | ||||
Err(Status::unimplemented("Not implemented yet")) | Err(Status::unimplemented("Not implemented yet")) | ||||
} | } | ||||
} | } |