Page MenuHomePhabricator

D5701.id18916.diff
No OneTemporary

D5701.id18916.diff

diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,10 +1,15 @@
-use anyhow::Result;
+use anyhow::{Context, Result};
use blob::blob_service_server::BlobService;
use std::{pin::Pin, sync::Arc};
-use tokio_stream::Stream;
+use tokio::sync::mpsc;
+use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::{Request, Response, Status};
use crate::{
+ constants::{
+ GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
+ MPSC_CHANNEL_BUFFER_CAPACITY,
+ },
database::{BlobItem, DatabaseClient, ReverseIndexItem},
s3::S3Path,
};
@@ -68,9 +73,73 @@
Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>;
async fn get(
&self,
- _request: Request<blob::GetRequest>,
+ request: Request<blob::GetRequest>,
) -> Result<Response<Self::GetStream>, Status> {
- Err(Status::unimplemented("Not implemented yet"))
+ let message: blob::GetRequest = request.into_inner();
+ let s3_path = self.find_s3_path_by_holder(&message.holder).await?;
+
+ let object_metadata = self
+ .s3
+ .head_object()
+ .bucket(s3_path.bucket_name.clone())
+ .key(s3_path.object_name.clone())
+ .send()
+ .await
+ .map_err(|_| Status::aborted("server error"))?;
+
+ let file_size: u64 = object_metadata
+ .content_length()
+ .try_into()
+ .map_err(|_| Status::aborted("server error"))?;
+ let chunk_size: u64 =
+ GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
+
+ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let s3 = self.s3.clone();
+
+ tokio::spawn(async move {
+ let mut offset: u64 = 0;
+ while offset < file_size {
+ let next_size = std::cmp::min(chunk_size, file_size - offset);
+ let range = format!("bytes={}-{}", offset, offset + next_size - 1);
+
+ let data = match s3
+ .get_object()
+ .bucket(&s3_path.bucket_name)
+ .key(&s3_path.object_name)
+ .range(range)
+ .send()
+ .await
+ .context("Failed to retrieve object data")
+ {
+ Ok(part) => {
+ part.body.collect().await.context("Failed to collect bytes")
+ }
+ Err(e) => Err(e),
+ };
+
+ let response = match data {
+ Ok(data) => Ok(blob::GetResponse {
+ data_chunk: data.into_bytes().to_vec(),
+ }),
+ Err(_) => Err(Status::aborted("download failed")),
+ };
+
+ let should_abort = response.is_err();
+ if let Err(e) = tx.send(response).await {
+ println!("Response was dropped: {}", e);
+ break;
+ }
+ if should_abort {
+ break;
+ }
+
+ offset += chunk_size;
+ }
+ });
+
+ let out_stream = ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream) as Self::GetStream))
}
async fn remove(

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 26, 7:27 AM (11 h, 27 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2703015
Default Alt Text
D5701.id18916.diff (2 KB)

Event Timeline