Page MenuHomePhabricator

D6385.id21299.diff
No OneTemporary

D6385.id21299.diff

diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs
--- a/services/backup/src/main.rs
+++ b/services/backup/src/main.rs
@@ -4,6 +4,7 @@
use tracing::{info, Level};
use tracing_subscriber::EnvFilter;
+use crate::blob::BlobClient;
use crate::service::{BackupServiceServer, MyBackupService};
pub mod blob;
@@ -27,9 +28,12 @@
Ok(())
}
-async fn run_grpc_server(db: database::DatabaseClient) -> Result<()> {
+async fn run_grpc_server(
+ db: database::DatabaseClient,
+ blob_client: BlobClient,
+) -> Result<()> {
let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?;
- let backup_service = MyBackupService::new(db);
+ let backup_service = MyBackupService::new(db, blob_client);
info!("Starting gRPC server listening at {}", addr.to_string());
Server::builder()
@@ -47,6 +51,7 @@
let aws_config = config::load_aws_config().await;
let db = database::DatabaseClient::new(&aws_config);
+ let blob_client = blob::init_blob_client();
- run_grpc_server(db).await
+ run_grpc_server(db, blob_client).await
}
diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs
--- a/services/backup/src/service/handlers/add_attachments.rs
+++ b/services/backup/src/service/handlers/add_attachments.rs
@@ -5,12 +5,14 @@
use super::handle_db_error;
use super::proto;
use crate::{
+ blob::BlobClient,
constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
database::{DatabaseClient, LogItem},
};
pub async fn handle_add_attachments(
db: &DatabaseClient,
+ blob_client: &BlobClient,
request: proto::AddAttachmentsRequest,
) -> Result<(), Status> {
let proto::AddAttachmentsRequest {
@@ -68,7 +70,7 @@
&& log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT
{
debug!("Log item too large. Persisting in blob service...");
- log_item = move_to_blob(log_item).await?;
+ log_item = move_to_blob(log_item, blob_client).await?;
}
db.put_log_item(log_item).await.map_err(handle_db_error)?;
@@ -77,7 +79,10 @@
Ok(())
}
-async fn move_to_blob(log_item: LogItem) -> Result<LogItem, Status> {
+async fn move_to_blob(
+ log_item: LogItem,
+ blob_client: &BlobClient,
+) -> Result<LogItem, Status> {
let holder = crate::utils::generate_blob_holder(
&log_item.data_hash,
&log_item.backup_id,
diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs
--- a/services/backup/src/service/handlers/create_backup.rs
+++ b/services/backup/src/service/handlers/create_backup.rs
@@ -2,7 +2,7 @@
use tracing::{debug, error, trace, warn};
use crate::{
- blob::{start_simple_uploader, BlobUploader},
+ blob::{start_simple_uploader, BlobClient, BlobUploader},
database::{BackupItem, DatabaseClient},
service::proto,
};
@@ -33,6 +33,7 @@
// client instances
db: DatabaseClient,
+ blob_client: BlobClient,
// internal state
state: HandlerState,
@@ -41,14 +42,15 @@
}
impl CreateBackupHandler {
- pub fn new(db: &DatabaseClient) -> Self {
+ pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self {
CreateBackupHandler {
should_close_stream: false,
user_id: None,
device_id: None,
key_entropy: None,
data_hash: None,
- db: db.clone(),
+ db,
+ blob_client,
state: HandlerState::ReceivingParams,
backup_id: String::new(),
holder: None,
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ b/services/backup/src/service/handlers/pull_backup.rs
@@ -7,7 +7,7 @@
use super::handle_db_error;
use super::proto::{self, PullBackupResponse};
use crate::{
- blob::BlobDownloader,
+ blob::{BlobClient, BlobDownloader},
constants::{
BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID,
GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
@@ -17,6 +17,7 @@
};
pub struct PullBackupHandler {
+ blob_client: BlobClient,
backup_item: BackupItem,
logs: Vec<LogItem>,
}
@@ -24,6 +25,7 @@
impl PullBackupHandler {
pub async fn new(
db: &DatabaseClient,
+ blob_client: &BlobClient,
request: proto::PullBackupRequest,
) -> Result<Self, Status> {
let proto::PullBackupRequest { user_id, backup_id } = request;
@@ -42,7 +44,11 @@
.await
.map_err(handle_db_error)?;
- Ok(PullBackupHandler { backup_item, logs })
+ Ok(PullBackupHandler {
+ backup_item,
+ logs,
+ blob_client: blob_client.clone(),
+ })
}
/// Consumes the handler and provides a response `Stream`. The stream will
@@ -59,7 +65,7 @@
try_stream! {
debug!("Pulling backup...");
{
- let compaction_stream = data_stream(&self.backup_item);
+ let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone());
tokio::pin!(compaction_stream);
while let Some(response) = compaction_stream.try_next().await? {
yield response;
@@ -79,7 +85,7 @@
if log.persisted_in_blob {
trace!(parent: &span, "Log persisted in blob");
- let log_data_stream = data_stream(&log).instrument(span);
+ let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span);
tokio::pin!(log_data_stream);
while let Some(response) = log_data_stream.try_next().await? {
yield response;
@@ -102,6 +108,7 @@
/// stream of [`PullBackupResponse`] objects, handles gRPC message size details.
fn data_stream<Item>(
item: &Item,
+ blob_client: BlobClient,
) -> impl Stream<Item = Result<PullBackupResponse, Status>> + '_
where
Item: BlobStoredItem,
diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
--- a/services/backup/src/service/handlers/send_log.rs
+++ b/services/backup/src/service/handlers/send_log.rs
@@ -4,7 +4,7 @@
use super::handle_db_error;
use crate::{
- blob::BlobUploader,
+ blob::{BlobClient, BlobUploader},
constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
database::{DatabaseClient, LogItem},
service::proto::SendLogResponse,
@@ -34,13 +34,15 @@
// client instances
db: DatabaseClient,
+ blob_client: BlobClient,
uploader: Option<BlobUploader>,
}
impl SendLogHandler {
- pub fn new(db: &DatabaseClient) -> Self {
+ pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self {
SendLogHandler {
db: db.clone(),
+ blob_client: blob_client.clone(),
uploader: None,
user_id: None,
backup_id: None,
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
--- a/services/backup/src/service/mod.rs
+++ b/services/backup/src/service/mod.rs
@@ -8,6 +8,7 @@
use tracing_futures::Instrument;
use crate::{
+ blob::BlobClient,
constants::MPSC_CHANNEL_BUFFER_CAPACITY,
database::{DatabaseClient, Error as DBError},
};
@@ -34,11 +35,15 @@
pub struct MyBackupService {
db: DatabaseClient,
+ blob_client: BlobClient,
}
impl MyBackupService {
- pub fn new(db_client: DatabaseClient) -> Self {
- MyBackupService { db: db_client }
+ pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self {
+ MyBackupService {
+ db: db_client,
+ blob_client,
+ }
}
}
@@ -62,8 +67,9 @@
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let db = self.db.clone();
+ let blob_client = self.blob_client.clone();
let worker = async move {
- let mut handler = CreateBackupHandler::new(&db);
+ let mut handler = CreateBackupHandler::new(db, blob_client);
while let Some(message) = in_stream.next().await {
let response = match message {
Ok(proto::CreateNewBackupRequest {
@@ -119,7 +125,7 @@
use proto::send_log_request::Data::*;
info!("SendLog request: {:?}", request);
- let mut handler = SendLogHandler::new(&self.db);
+ let mut handler = SendLogHandler::new(&self.db, &self.blob_client);
let mut in_stream = request.into_inner();
while let Some(message) = in_stream.next().await {
@@ -183,7 +189,8 @@
info!("PullBackup request: {:?}", request);
let handler =
- PullBackupHandler::new(&self.db, request.into_inner()).await?;
+ PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner())
+ .await?;
let stream = handler.into_response_stream().in_current_span();
Ok(Response::new(Box::pin(stream) as Self::PullBackupStream))
@@ -205,6 +212,7 @@
handlers::add_attachments::handle_add_attachments(
&self.db,
+ &self.blob_client,
request.into_inner(),
)
.await?;

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 21, 9:54 AM (19 h, 16 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2685911
Default Alt Text
D6385.id21299.diff (8 KB)

Event Timeline