Page MenuHomePhabricator

D5787.id19227.diff
No OneTemporary

D5787.id19227.diff

diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs
--- a/services/blob/src/main.rs
+++ b/services/blob/src/main.rs
@@ -7,6 +7,7 @@
use anyhow::Result;
use aws_sdk_dynamodb::{Endpoint, Region};
use database::DatabaseClient;
+use s3::S3Client;
use service::{blob::blob_service_server::BlobServiceServer, MyBlobService};
use std::net::SocketAddr;
use tonic::transport::{Server, Uri};
@@ -40,7 +41,7 @@
async fn run_grpc_server(
db_client: DatabaseClient,
- s3_client: aws_sdk_s3::Client,
+ s3_client: S3Client,
) -> Result<()> {
let addr: SocketAddr =
format!("[::]:{}", constants::GRPC_SERVER_DEFAULT_PORT).parse()?;
@@ -61,7 +62,7 @@
let aws_config = get_aws_config().await;
let db = database::DatabaseClient::new(&aws_config);
- let s3 = aws_sdk_s3::Client::new(&aws_config);
+ let s3 = s3::S3Client::new(&aws_config);
run_grpc_server(db, s3).await
}
diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs
--- a/services/blob/src/s3.rs
+++ b/services/blob/src/s3.rs
@@ -140,7 +140,8 @@
impl MultiPartUploadSession {
/// Starts a new upload session and returns its instance
- pub async fn start(
+ /// Don't call this directly, use [`S3Client::start_upload_session()`] instead
+ async fn start(
client: &Arc<aws_sdk_s3::Client>,
s3_path: &S3Path,
) -> Result<Self> {
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,7 +1,7 @@
-use anyhow::{Context, Result};
+use anyhow::Result;
use blob::blob_service_server::BlobService;
use chrono::Utc;
-use std::{pin::Pin, sync::Arc};
+use std::pin::Pin;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
@@ -13,7 +13,7 @@
MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
},
database::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::{MultiPartUploadSession, S3Path},
+ s3::{MultiPartUploadSession, S3Client, S3Path},
};
pub mod blob {
@@ -22,14 +22,14 @@
pub struct MyBlobService {
db: DatabaseClient,
- s3: Arc<aws_sdk_s3::Client>,
+ s3: S3Client,
}
impl MyBlobService {
- pub fn new(db_client: DatabaseClient, s3_client: aws_sdk_s3::Client) -> Self {
+ pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self {
MyBlobService {
db: db_client,
- s3: Arc::new(s3_client),
+ s3: s3_client,
}
}
@@ -141,14 +141,8 @@
let s3_path = self.find_s3_path_by_holder(&message.holder).await?;
tracing::Span::current().record("s3_path", s3_path.to_full_path());
- let object_metadata = self
- .s3
- .head_object()
- .bucket(s3_path.bucket_name.clone())
- .key(s3_path.object_name.clone())
- .send()
- .await
- .map_err(|err| {
+ let object_metadata =
+ self.s3.get_object_metadata(&s3_path).await.map_err(|err| {
error!("Failed to get S3 object metadata: {:?}", err);
Status::aborted("server error")
})?;
@@ -168,28 +162,11 @@
let mut offset: u64 = 0;
while offset < file_size {
let next_size = std::cmp::min(chunk_size, file_size - offset);
- let range = format!("bytes={}-{}", offset, offset + next_size - 1);
- trace!(range, "Getting {} bytes of data", next_size);
-
- let data = match s3
- .get_object()
- .bucket(&s3_path.bucket_name)
- .key(&s3_path.object_name)
- .range(range)
- .send()
- .await
- .context("Failed to retrieve object data")
- {
- Ok(part) => {
- part.body.collect().await.context("Failed to collect bytes")
- }
- Err(e) => Err(e),
- };
+ let range = offset..(offset + next_size);
+ trace!(?range, "Getting {} bytes of data", next_size);
- let response = match data {
- Ok(data) => Ok(blob::GetResponse {
- data_chunk: data.into_bytes().to_vec(),
- }),
+ let response = match s3.get_object_bytes(&s3_path, range).await {
+ Ok(data) => Ok(blob::GetResponse { data_chunk: data }),
Err(err) => {
error!("Failed to download data chunk: {:?}", err);
Err(Status::aborted("download failed"))
@@ -258,17 +235,10 @@
.find_s3_path_by_reverse_index(&reverse_index_item)
.await?;
- self
- .s3
- .delete_object()
- .bucket(&s3_path.bucket_name)
- .key(&s3_path.object_name)
- .send()
- .await
- .map_err(|err| {
- error!("Failed to delete S3 object: {:?}", err);
- Status::aborted("Internal error")
- })?;
+ self.s3.delete_object(&s3_path).await.map_err(|err| {
+ error!("Failed to delete S3 object: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
if let Err(err) = self.db.remove_blob_item(blob_hash).await {
error!("Failed to remove blob item from database: {:?}", err);
@@ -299,11 +269,11 @@
uploader: Option<MultiPartUploadSession>,
db: DatabaseClient,
- s3: Arc<aws_sdk_s3::Client>,
+ s3: S3Client,
}
impl PutHandler {
- fn new(db: &DatabaseClient, s3: &Arc<aws_sdk_s3::Client>) -> Self {
+ fn new(db: &DatabaseClient, s3: &S3Client) -> Self {
PutHandler {
should_close_stream: false,
action: None,
@@ -407,8 +377,7 @@
if self.uploader.is_none() {
debug!("Uploader doesn't exist, starting new session");
self.uploader =
- match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await
- {
+ match self.s3.start_upload_session(&blob_item.s3_path).await {
Ok(session) => Some(session),
Err(err) => {
self.should_close_stream = true;

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 16, 3:23 PM (20 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2498119
Default Alt Text
D5787.id19227.diff (5 KB)

Event Timeline