diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs --- a/services/backup/src/main.rs +++ b/services/backup/src/main.rs @@ -4,6 +4,7 @@ use tracing::{info, Level}; use tracing_subscriber::EnvFilter; +use crate::blob::BlobClient; use crate::service::{BackupServiceServer, MyBackupService}; pub mod blob; @@ -27,9 +28,12 @@ Ok(()) } -async fn run_grpc_server(db: database::DatabaseClient) -> Result<()> { +async fn run_grpc_server( + db: database::DatabaseClient, + blob_client: BlobClient, +) -> Result<()> { let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?; - let backup_service = MyBackupService::new(db); + let backup_service = MyBackupService::new(db, blob_client); info!("Starting gRPC server listening at {}", addr.to_string()); Server::builder() @@ -47,6 +51,7 @@ let aws_config = config::load_aws_config().await; let db = database::DatabaseClient::new(&aws_config); + let blob_client = blob::init_blob_client(); - run_grpc_server(db).await + run_grpc_server(db, blob_client).await } diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs --- a/services/backup/src/service/handlers/add_attachments.rs +++ b/services/backup/src/service/handlers/add_attachments.rs @@ -5,12 +5,14 @@ use super::handle_db_error; use super::proto; use crate::{ + blob::BlobClient, constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, }; pub async fn handle_add_attachments( db: &DatabaseClient, + blob_client: &BlobClient, request: proto::AddAttachmentsRequest, ) -> Result<(), Status> { let proto::AddAttachmentsRequest { @@ -68,7 +70,7 @@ && log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT { debug!("Log item too large. Persisting in blob service..."); - log_item = move_to_blob(log_item).await?; + log_item = move_to_blob(log_item, blob_client).await?; } db.put_log_item(log_item).await.map_err(handle_db_error)?; @@ -77,7 +79,10 @@ Ok(()) } -async fn move_to_blob(log_item: LogItem) -> Result { +async fn move_to_blob( + log_item: LogItem, + blob_client: &BlobClient, +) -> Result { let holder = crate::utils::generate_blob_holder( &log_item.data_hash, &log_item.backup_id, diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -2,7 +2,7 @@ use tracing::{debug, error, trace, warn}; use crate::{ - blob::{start_simple_uploader, BlobUploader}, + blob::{start_simple_uploader, BlobClient, BlobUploader}, database::{BackupItem, DatabaseClient}, service::proto, }; @@ -33,6 +33,7 @@ // client instances db: DatabaseClient, + blob_client: BlobClient, // internal state state: HandlerState, @@ -41,14 +42,15 @@ } impl CreateBackupHandler { - pub fn new(db: &DatabaseClient) -> Self { + pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self { CreateBackupHandler { should_close_stream: false, user_id: None, device_id: None, key_entropy: None, data_hash: None, - db: db.clone(), + db, + blob_client, state: HandlerState::ReceivingParams, backup_id: String::new(), holder: None, diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -7,7 +7,7 @@ use super::handle_db_error; use super::proto::{self, PullBackupResponse}; use crate::{ - blob::BlobDownloader, + blob::{BlobClient, BlobDownloader}, constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, @@ -17,6 +17,7 @@ }; pub struct PullBackupHandler { + blob_client: BlobClient, backup_item: BackupItem, logs: Vec, } @@ -24,6 +25,7 @@ impl PullBackupHandler { pub async fn new( db: &DatabaseClient, + blob_client: &BlobClient, request: proto::PullBackupRequest, ) -> Result { let proto::PullBackupRequest { user_id, backup_id } = request; @@ -42,7 +44,11 @@ .await .map_err(handle_db_error)?; - Ok(PullBackupHandler { backup_item, logs }) + Ok(PullBackupHandler { + backup_item, + logs, + blob_client: blob_client.clone(), + }) } /// Consumes the handler and provides a response `Stream`. The stream will @@ -59,7 +65,7 @@ try_stream! { debug!("Pulling backup..."); { - let compaction_stream = data_stream(&self.backup_item); + let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone()); tokio::pin!(compaction_stream); while let Some(response) = compaction_stream.try_next().await? { yield response; @@ -79,7 +85,7 @@ if log.persisted_in_blob { trace!(parent: &span, "Log persisted in blob"); - let log_data_stream = data_stream(&log).instrument(span); + let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span); tokio::pin!(log_data_stream); while let Some(response) = log_data_stream.try_next().await? { yield response; @@ -102,6 +108,7 @@ /// stream of [`PullBackupResponse`] objects, handles gRPC message size details. fn data_stream( item: &Item, + blob_client: BlobClient, ) -> impl Stream> + '_ where Item: BlobStoredItem, diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -4,7 +4,7 @@ use super::handle_db_error; use crate::{ - blob::BlobUploader, + blob::{BlobClient, BlobUploader}, constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, service::proto::SendLogResponse, @@ -34,13 +34,15 @@ // client instances db: DatabaseClient, + blob_client: BlobClient, uploader: Option, } impl SendLogHandler { - pub fn new(db: &DatabaseClient) -> Self { + pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self { SendLogHandler { db: db.clone(), + blob_client: blob_client.clone(), uploader: None, user_id: None, backup_id: None, diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs --- a/services/backup/src/service/mod.rs +++ b/services/backup/src/service/mod.rs @@ -8,6 +8,7 @@ use tracing_futures::Instrument; use crate::{ + blob::BlobClient, constants::MPSC_CHANNEL_BUFFER_CAPACITY, database::{DatabaseClient, Error as DBError}, }; @@ -34,11 +35,15 @@ pub struct MyBackupService { db: DatabaseClient, + blob_client: BlobClient, } impl MyBackupService { - pub fn new(db_client: DatabaseClient) -> Self { - MyBackupService { db: db_client } + pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self { + MyBackupService { + db: db_client, + blob_client, + } } } @@ -62,8 +67,9 @@ let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let db = self.db.clone(); + let blob_client = self.blob_client.clone(); let worker = async move { - let mut handler = CreateBackupHandler::new(&db); + let mut handler = CreateBackupHandler::new(db, blob_client); while let Some(message) = in_stream.next().await { let response = match message { Ok(proto::CreateNewBackupRequest { @@ -119,7 +125,7 @@ use proto::send_log_request::Data::*; info!("SendLog request: {:?}", request); - let mut handler = SendLogHandler::new(&self.db); + let mut handler = SendLogHandler::new(&self.db, &self.blob_client); let mut in_stream = request.into_inner(); while let Some(message) = in_stream.next().await { @@ -183,7 +189,8 @@ info!("PullBackup request: {:?}", request); let handler = - PullBackupHandler::new(&self.db, request.into_inner()).await?; + PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner()) + .await?; let stream = handler.into_response_stream().in_current_span(); Ok(Response::new(Box::pin(stream) as Self::PullBackupStream)) @@ -205,6 +212,7 @@ handlers::add_attachments::handle_add_attachments( &self.db, + &self.blob_client, request.into_inner(), ) .await?;