Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3504658
D6385.id21299.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Referenced Files
None
Subscribers
None
D6385.id21299.diff
View Options
diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs
--- a/services/backup/src/main.rs
+++ b/services/backup/src/main.rs
@@ -4,6 +4,7 @@
use tracing::{info, Level};
use tracing_subscriber::EnvFilter;
+use crate::blob::BlobClient;
use crate::service::{BackupServiceServer, MyBackupService};
pub mod blob;
@@ -27,9 +28,12 @@
Ok(())
}
-async fn run_grpc_server(db: database::DatabaseClient) -> Result<()> {
+async fn run_grpc_server(
+ db: database::DatabaseClient,
+ blob_client: BlobClient,
+) -> Result<()> {
let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?;
- let backup_service = MyBackupService::new(db);
+ let backup_service = MyBackupService::new(db, blob_client);
info!("Starting gRPC server listening at {}", addr.to_string());
Server::builder()
@@ -47,6 +51,7 @@
let aws_config = config::load_aws_config().await;
let db = database::DatabaseClient::new(&aws_config);
+ let blob_client = blob::init_blob_client();
- run_grpc_server(db).await
+ run_grpc_server(db, blob_client).await
}
diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs
--- a/services/backup/src/service/handlers/add_attachments.rs
+++ b/services/backup/src/service/handlers/add_attachments.rs
@@ -5,12 +5,14 @@
use super::handle_db_error;
use super::proto;
use crate::{
+ blob::BlobClient,
constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
database::{DatabaseClient, LogItem},
};
pub async fn handle_add_attachments(
db: &DatabaseClient,
+ blob_client: &BlobClient,
request: proto::AddAttachmentsRequest,
) -> Result<(), Status> {
let proto::AddAttachmentsRequest {
@@ -68,7 +70,7 @@
&& log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT
{
debug!("Log item too large. Persisting in blob service...");
- log_item = move_to_blob(log_item).await?;
+ log_item = move_to_blob(log_item, blob_client).await?;
}
db.put_log_item(log_item).await.map_err(handle_db_error)?;
@@ -77,7 +79,10 @@
Ok(())
}
-async fn move_to_blob(log_item: LogItem) -> Result<LogItem, Status> {
+async fn move_to_blob(
+ log_item: LogItem,
+ blob_client: &BlobClient,
+) -> Result<LogItem, Status> {
let holder = crate::utils::generate_blob_holder(
&log_item.data_hash,
&log_item.backup_id,
diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs
--- a/services/backup/src/service/handlers/create_backup.rs
+++ b/services/backup/src/service/handlers/create_backup.rs
@@ -2,7 +2,7 @@
use tracing::{debug, error, trace, warn};
use crate::{
- blob::{start_simple_uploader, BlobUploader},
+ blob::{start_simple_uploader, BlobClient, BlobUploader},
database::{BackupItem, DatabaseClient},
service::proto,
};
@@ -33,6 +33,7 @@
// client instances
db: DatabaseClient,
+ blob_client: BlobClient,
// internal state
state: HandlerState,
@@ -41,14 +42,15 @@
}
impl CreateBackupHandler {
- pub fn new(db: &DatabaseClient) -> Self {
+ pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self {
CreateBackupHandler {
should_close_stream: false,
user_id: None,
device_id: None,
key_entropy: None,
data_hash: None,
- db: db.clone(),
+ db,
+ blob_client,
state: HandlerState::ReceivingParams,
backup_id: String::new(),
holder: None,
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ b/services/backup/src/service/handlers/pull_backup.rs
@@ -7,7 +7,7 @@
use super::handle_db_error;
use super::proto::{self, PullBackupResponse};
use crate::{
- blob::BlobDownloader,
+ blob::{BlobClient, BlobDownloader},
constants::{
BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID,
GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
@@ -17,6 +17,7 @@
};
pub struct PullBackupHandler {
+ blob_client: BlobClient,
backup_item: BackupItem,
logs: Vec<LogItem>,
}
@@ -24,6 +25,7 @@
impl PullBackupHandler {
pub async fn new(
db: &DatabaseClient,
+ blob_client: &BlobClient,
request: proto::PullBackupRequest,
) -> Result<Self, Status> {
let proto::PullBackupRequest { user_id, backup_id } = request;
@@ -42,7 +44,11 @@
.await
.map_err(handle_db_error)?;
- Ok(PullBackupHandler { backup_item, logs })
+ Ok(PullBackupHandler {
+ backup_item,
+ logs,
+ blob_client: blob_client.clone(),
+ })
}
/// Consumes the handler and provides a response `Stream`. The stream will
@@ -59,7 +65,7 @@
try_stream! {
debug!("Pulling backup...");
{
- let compaction_stream = data_stream(&self.backup_item);
+ let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone());
tokio::pin!(compaction_stream);
while let Some(response) = compaction_stream.try_next().await? {
yield response;
@@ -79,7 +85,7 @@
if log.persisted_in_blob {
trace!(parent: &span, "Log persisted in blob");
- let log_data_stream = data_stream(&log).instrument(span);
+ let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span);
tokio::pin!(log_data_stream);
while let Some(response) = log_data_stream.try_next().await? {
yield response;
@@ -102,6 +108,7 @@
/// stream of [`PullBackupResponse`] objects, handles gRPC message size details.
fn data_stream<Item>(
item: &Item,
+ blob_client: BlobClient,
) -> impl Stream<Item = Result<PullBackupResponse, Status>> + '_
where
Item: BlobStoredItem,
diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
--- a/services/backup/src/service/handlers/send_log.rs
+++ b/services/backup/src/service/handlers/send_log.rs
@@ -4,7 +4,7 @@
use super::handle_db_error;
use crate::{
- blob::BlobUploader,
+ blob::{BlobClient, BlobUploader},
constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
database::{DatabaseClient, LogItem},
service::proto::SendLogResponse,
@@ -34,13 +34,15 @@
// client instances
db: DatabaseClient,
+ blob_client: BlobClient,
uploader: Option<BlobUploader>,
}
impl SendLogHandler {
- pub fn new(db: &DatabaseClient) -> Self {
+ pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self {
SendLogHandler {
db: db.clone(),
+ blob_client: blob_client.clone(),
uploader: None,
user_id: None,
backup_id: None,
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
--- a/services/backup/src/service/mod.rs
+++ b/services/backup/src/service/mod.rs
@@ -8,6 +8,7 @@
use tracing_futures::Instrument;
use crate::{
+ blob::BlobClient,
constants::MPSC_CHANNEL_BUFFER_CAPACITY,
database::{DatabaseClient, Error as DBError},
};
@@ -34,11 +35,15 @@
pub struct MyBackupService {
db: DatabaseClient,
+ blob_client: BlobClient,
}
impl MyBackupService {
- pub fn new(db_client: DatabaseClient) -> Self {
- MyBackupService { db: db_client }
+ pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self {
+ MyBackupService {
+ db: db_client,
+ blob_client,
+ }
}
}
@@ -62,8 +67,9 @@
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let db = self.db.clone();
+ let blob_client = self.blob_client.clone();
let worker = async move {
- let mut handler = CreateBackupHandler::new(&db);
+ let mut handler = CreateBackupHandler::new(db, blob_client);
while let Some(message) = in_stream.next().await {
let response = match message {
Ok(proto::CreateNewBackupRequest {
@@ -119,7 +125,7 @@
use proto::send_log_request::Data::*;
info!("SendLog request: {:?}", request);
- let mut handler = SendLogHandler::new(&self.db);
+ let mut handler = SendLogHandler::new(&self.db, &self.blob_client);
let mut in_stream = request.into_inner();
while let Some(message) = in_stream.next().await {
@@ -183,7 +189,8 @@
info!("PullBackup request: {:?}", request);
let handler =
- PullBackupHandler::new(&self.db, request.into_inner()).await?;
+ PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner())
+ .await?;
let stream = handler.into_response_stream().in_current_span();
Ok(Response::new(Box::pin(stream) as Self::PullBackupStream))
@@ -205,6 +212,7 @@
handlers::add_attachments::handle_add_attachments(
&self.db,
+ &self.blob_client,
request.into_inner(),
)
.await?;
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 21, 9:54 AM (19 h, 16 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2685911
Default Alt Text
D6385.id21299.diff (8 KB)
Attached To
Mode
D6385: [backup-service] Propagate 'BlobClient' to handlers
Attached
Detach File
Event Timeline
Log In to Comment