diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -7,6 +7,7 @@ use anyhow::Result; use aws_sdk_dynamodb::{Endpoint, Region}; use database::DatabaseClient; +use s3::S3Client; use service::{blob::blob_service_server::BlobServiceServer, MyBlobService}; use std::net::SocketAddr; use tonic::transport::{Server, Uri}; @@ -40,7 +41,7 @@ async fn run_grpc_server( db_client: DatabaseClient, - s3_client: aws_sdk_s3::Client, + s3_client: S3Client, ) -> Result<()> { let addr: SocketAddr = format!("[::]:{}", constants::GRPC_SERVER_DEFAULT_PORT).parse()?; @@ -61,7 +62,7 @@ let aws_config = get_aws_config().await; let db = database::DatabaseClient::new(&aws_config); - let s3 = aws_sdk_s3::Client::new(&aws_config); + let s3 = s3::S3Client::new(&aws_config); run_grpc_server(db, s3).await } diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -140,7 +140,8 @@ impl MultiPartUploadSession { /// Starts a new upload session and returns its instance - pub async fn start( + /// Don't call this directly, use [`S3Client::start_upload_session()`] instead + async fn start( client: &Arc, s3_path: &S3Path, ) -> Result { diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,7 +1,7 @@ -use anyhow::{Context, Result}; +use anyhow::Result; use blob::blob_service_server::BlobService; use chrono::Utc; -use std::{pin::Pin, sync::Arc}; +use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; @@ -13,7 +13,7 @@ MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }, database::{BlobItem, DatabaseClient, ReverseIndexItem}, - s3::{MultiPartUploadSession, S3Path}, + s3::{MultiPartUploadSession, S3Client, S3Path}, }; pub mod blob { @@ -22,14 +22,14 @@ pub struct MyBlobService { db: DatabaseClient, - s3: Arc, + s3: S3Client, } impl MyBlobService { - pub fn new(db_client: DatabaseClient, s3_client: aws_sdk_s3::Client) -> Self { + pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self { MyBlobService { db: db_client, - s3: Arc::new(s3_client), + s3: s3_client, } } @@ -141,14 +141,8 @@ let s3_path = self.find_s3_path_by_holder(&message.holder).await?; tracing::Span::current().record("s3_path", s3_path.to_full_path()); - let object_metadata = self - .s3 - .head_object() - .bucket(s3_path.bucket_name.clone()) - .key(s3_path.object_name.clone()) - .send() - .await - .map_err(|err| { + let object_metadata = + self.s3.get_object_metadata(&s3_path).await.map_err(|err| { error!("Failed to get S3 object metadata: {:?}", err); Status::aborted("server error") })?; @@ -168,28 +162,11 @@ let mut offset: u64 = 0; while offset < file_size { let next_size = std::cmp::min(chunk_size, file_size - offset); - let range = format!("bytes={}-{}", offset, offset + next_size - 1); - trace!(range, "Getting {} bytes of data", next_size); - - let data = match s3 - .get_object() - .bucket(&s3_path.bucket_name) - .key(&s3_path.object_name) - .range(range) - .send() - .await - .context("Failed to retrieve object data") - { - Ok(part) => { - part.body.collect().await.context("Failed to collect bytes") - } - Err(e) => Err(e), - }; + let range = offset..(offset + next_size); + trace!(?range, "Getting {} bytes of data", next_size); - let response = match data { - Ok(data) => Ok(blob::GetResponse { - data_chunk: data.into_bytes().to_vec(), - }), + let response = match s3.get_object_bytes(&s3_path, range).await { + Ok(data) => Ok(blob::GetResponse { data_chunk: data }), Err(err) => { error!("Failed to download data chunk: {:?}", err); Err(Status::aborted("download failed")) @@ -258,17 +235,10 @@ .find_s3_path_by_reverse_index(&reverse_index_item) .await?; - self - .s3 - .delete_object() - .bucket(&s3_path.bucket_name) - .key(&s3_path.object_name) - .send() - .await - .map_err(|err| { - error!("Failed to delete S3 object: {:?}", err); - Status::aborted("Internal error") - })?; + self.s3.delete_object(&s3_path).await.map_err(|err| { + error!("Failed to delete S3 object: {:?}", err); + Status::aborted("Internal error") + })?; if let Err(err) = self.db.remove_blob_item(blob_hash).await { error!("Failed to remove blob item from database: {:?}", err); @@ -299,11 +269,11 @@ uploader: Option, db: DatabaseClient, - s3: Arc, + s3: S3Client, } impl PutHandler { - fn new(db: &DatabaseClient, s3: &Arc) -> Self { + fn new(db: &DatabaseClient, s3: &S3Client) -> Self { PutHandler { should_close_stream: false, action: None, @@ -407,8 +377,7 @@ if self.uploader.is_none() { debug!("Uploader doesn't exist, starting new session"); self.uploader = - match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await - { + match self.s3.start_upload_session(&blob_item.s3_path).await { Ok(session) => Some(session), Err(err) => { self.should_close_stream = true;