Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3249486
D5787.id19227.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D5787.id19227.diff
View Options
diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs
--- a/services/blob/src/main.rs
+++ b/services/blob/src/main.rs
@@ -7,6 +7,7 @@
use anyhow::Result;
use aws_sdk_dynamodb::{Endpoint, Region};
use database::DatabaseClient;
+use s3::S3Client;
use service::{blob::blob_service_server::BlobServiceServer, MyBlobService};
use std::net::SocketAddr;
use tonic::transport::{Server, Uri};
@@ -40,7 +41,7 @@
async fn run_grpc_server(
db_client: DatabaseClient,
- s3_client: aws_sdk_s3::Client,
+ s3_client: S3Client,
) -> Result<()> {
let addr: SocketAddr =
format!("[::]:{}", constants::GRPC_SERVER_DEFAULT_PORT).parse()?;
@@ -61,7 +62,7 @@
let aws_config = get_aws_config().await;
let db = database::DatabaseClient::new(&aws_config);
- let s3 = aws_sdk_s3::Client::new(&aws_config);
+ let s3 = s3::S3Client::new(&aws_config);
run_grpc_server(db, s3).await
}
diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs
--- a/services/blob/src/s3.rs
+++ b/services/blob/src/s3.rs
@@ -140,7 +140,8 @@
impl MultiPartUploadSession {
/// Starts a new upload session and returns its instance
- pub async fn start(
+ /// Don't call this directly, use [`S3Client::start_upload_session()`] instead
+ async fn start(
client: &Arc<aws_sdk_s3::Client>,
s3_path: &S3Path,
) -> Result<Self> {
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,7 +1,7 @@
-use anyhow::{Context, Result};
+use anyhow::Result;
use blob::blob_service_server::BlobService;
use chrono::Utc;
-use std::{pin::Pin, sync::Arc};
+use std::pin::Pin;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
@@ -13,7 +13,7 @@
MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
},
database::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::{MultiPartUploadSession, S3Path},
+ s3::{MultiPartUploadSession, S3Client, S3Path},
};
pub mod blob {
@@ -22,14 +22,14 @@
pub struct MyBlobService {
db: DatabaseClient,
- s3: Arc<aws_sdk_s3::Client>,
+ s3: S3Client,
}
impl MyBlobService {
- pub fn new(db_client: DatabaseClient, s3_client: aws_sdk_s3::Client) -> Self {
+ pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self {
MyBlobService {
db: db_client,
- s3: Arc::new(s3_client),
+ s3: s3_client,
}
}
@@ -141,14 +141,8 @@
let s3_path = self.find_s3_path_by_holder(&message.holder).await?;
tracing::Span::current().record("s3_path", s3_path.to_full_path());
- let object_metadata = self
- .s3
- .head_object()
- .bucket(s3_path.bucket_name.clone())
- .key(s3_path.object_name.clone())
- .send()
- .await
- .map_err(|err| {
+ let object_metadata =
+ self.s3.get_object_metadata(&s3_path).await.map_err(|err| {
error!("Failed to get S3 object metadata: {:?}", err);
Status::aborted("server error")
})?;
@@ -168,28 +162,11 @@
let mut offset: u64 = 0;
while offset < file_size {
let next_size = std::cmp::min(chunk_size, file_size - offset);
- let range = format!("bytes={}-{}", offset, offset + next_size - 1);
- trace!(range, "Getting {} bytes of data", next_size);
-
- let data = match s3
- .get_object()
- .bucket(&s3_path.bucket_name)
- .key(&s3_path.object_name)
- .range(range)
- .send()
- .await
- .context("Failed to retrieve object data")
- {
- Ok(part) => {
- part.body.collect().await.context("Failed to collect bytes")
- }
- Err(e) => Err(e),
- };
+ let range = offset..(offset + next_size);
+ trace!(?range, "Getting {} bytes of data", next_size);
- let response = match data {
- Ok(data) => Ok(blob::GetResponse {
- data_chunk: data.into_bytes().to_vec(),
- }),
+ let response = match s3.get_object_bytes(&s3_path, range).await {
+ Ok(data) => Ok(blob::GetResponse { data_chunk: data }),
Err(err) => {
error!("Failed to download data chunk: {:?}", err);
Err(Status::aborted("download failed"))
@@ -258,17 +235,10 @@
.find_s3_path_by_reverse_index(&reverse_index_item)
.await?;
- self
- .s3
- .delete_object()
- .bucket(&s3_path.bucket_name)
- .key(&s3_path.object_name)
- .send()
- .await
- .map_err(|err| {
- error!("Failed to delete S3 object: {:?}", err);
- Status::aborted("Internal error")
- })?;
+ self.s3.delete_object(&s3_path).await.map_err(|err| {
+ error!("Failed to delete S3 object: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
if let Err(err) = self.db.remove_blob_item(blob_hash).await {
error!("Failed to remove blob item from database: {:?}", err);
@@ -299,11 +269,11 @@
uploader: Option<MultiPartUploadSession>,
db: DatabaseClient,
- s3: Arc<aws_sdk_s3::Client>,
+ s3: S3Client,
}
impl PutHandler {
- fn new(db: &DatabaseClient, s3: &Arc<aws_sdk_s3::Client>) -> Self {
+ fn new(db: &DatabaseClient, s3: &S3Client) -> Self {
PutHandler {
should_close_stream: false,
action: None,
@@ -407,8 +377,7 @@
if self.uploader.is_none() {
debug!("Uploader doesn't exist, starting new session");
self.uploader =
- match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await
- {
+ match self.s3.start_upload_session(&blob_item.s3_path).await {
Ok(session) => Some(session),
Err(err) => {
self.should_close_stream = true;
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 16, 3:23 PM (20 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2498119
Default Alt Text
D5787.id19227.diff (5 KB)
Attached To
Mode
D5787: [services][blob] Use S3 abstraction in service
Attached
Detach File
Event Timeline
Log In to Comment