diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs --- a/services/blob/src/database.rs +++ b/services/blob/src/database.rs @@ -15,8 +15,8 @@ BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME, BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME, - BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, - BLOB_TABLE_S3_PATH_FIELD, + BLOB_S3_BUCKET_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, + BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD, }, s3::S3Path, }; @@ -28,6 +28,20 @@ pub created: DateTime, } +impl BlobItem { + pub fn new(blob_hash: impl Into) -> Self { + let hash_str = blob_hash.into(); + BlobItem { + blob_hash: hash_str.clone(), + s3_path: S3Path { + bucket_name: BLOB_S3_BUCKET_NAME.to_string(), + object_name: hash_str, + }, + created: Utc::now(), + } + } +} + #[derive(Clone, Debug)] pub struct ReverseIndexItem { pub holder: String, diff --git a/services/blob/src/grpc.rs b/services/blob/src/grpc.rs --- a/services/blob/src/grpc.rs +++ b/services/blob/src/grpc.rs @@ -1,7 +1,6 @@ use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; use blob::blob_service_server::BlobService; -use chrono::Utc; use std::{net::SocketAddr, pin::Pin}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; @@ -11,7 +10,7 @@ use crate::{ config::CONFIG, constants::{ - BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, + GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }, database::{BlobItem, DatabaseClient, Error as DBError, ReverseIndexItem}, @@ -347,14 +346,7 @@ // Hash doesn't exist, so we're starting a new upload session Ok(None) => { debug!("Blob not found, starting upload action"); - self.action = Some(PutAction::UploadNewBlob(BlobItem { - blob_hash: blob_hash.to_string(), - s3_path: S3Path { - bucket_name: BLOB_S3_BUCKET_NAME.to_string(), - object_name: blob_hash.to_string(), - }, - created: Utc::now(), - })); + self.action = Some(PutAction::UploadNewBlob(BlobItem::new(blob_hash))); Ok(blob::PutResponse { data_exists: false }) } Err(db_err) => {