Page MenuHomePhabricator

D8796.id29840.diff
No OneTemporary

D8796.id29840.diff

diff --git a/services/backup/Cargo.lock b/services/backup/Cargo.lock
--- a/services/backup/Cargo.lock
+++ b/services/backup/Cargo.lock
@@ -47,17 +47,6 @@
"syn",
]
-[[package]]
-name = "async-trait"
-version = "0.1.59"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn",
-]
-
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -382,52 +371,6 @@
"tracing",
]
-[[package]]
-name = "axum"
-version = "0.6.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48"
-dependencies = [
- "async-trait",
- "axum-core",
- "bitflags",
- "bytes",
- "futures-util",
- "http",
- "http-body",
- "hyper",
- "itoa",
- "matchit",
- "memchr",
- "mime",
- "percent-encoding",
- "pin-project-lite",
- "rustversion",
- "serde",
- "sync_wrapper",
- "tower",
- "tower-http",
- "tower-layer",
- "tower-service",
-]
-
-[[package]]
-name = "axum-core"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92"
-dependencies = [
- "async-trait",
- "bytes",
- "futures-util",
- "http",
- "http-body",
- "mime",
- "rustversion",
- "tower-layer",
- "tower-service",
-]
-
[[package]]
name = "backup"
version = "0.1.0"
@@ -441,11 +384,9 @@
"clap",
"comm-services-lib",
"once_cell",
- "prost",
"rand",
"tokio",
"tokio-stream",
- "tonic",
"tonic-build",
"tracing",
"tracing-futures",
@@ -963,12 +904,6 @@
"pin-project-lite",
]
-[[package]]
-name = "http-range-header"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
-
[[package]]
name = "httparse"
version = "1.8.0"
@@ -1020,18 +955,6 @@
"tokio-rustls",
]
-[[package]]
-name = "hyper-timeout"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
-dependencies = [
- "hyper",
- "pin-project-lite",
- "tokio",
- "tokio-io-timeout",
-]
-
[[package]]
name = "iana-time-zone"
version = "0.1.53"
@@ -1166,24 +1089,12 @@
"regex-automata",
]
-[[package]]
-name = "matchit"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
-
[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
-[[package]]
-name = "mime"
-version = "0.3.16"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
-
[[package]]
name = "mio"
version = "0.8.5"
@@ -1361,9 +1272,9 @@
[[package]]
name = "proc-macro2"
-version = "1.0.47"
+version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
+checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
@@ -1425,9 +1336,9 @@
[[package]]
name = "quote"
-version = "1.0.21"
+version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
+checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965"
dependencies = [
"proc-macro2",
]
@@ -1577,12 +1488,6 @@
"base64 0.13.1",
]
-[[package]]
-name = "rustversion"
-version = "1.0.9"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
-
[[package]]
name = "ryu"
version = "1.0.11"
@@ -1749,12 +1654,6 @@
"unicode-ident",
]
-[[package]]
-name = "sync_wrapper"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
-
[[package]]
name = "tempfile"
version = "3.3.0"
@@ -1842,16 +1741,6 @@
"windows-sys 0.42.0",
]
-[[package]]
-name = "tokio-io-timeout"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
-dependencies = [
- "pin-project-lite",
- "tokio",
-]
-
[[package]]
name = "tokio-macros"
version = "1.8.2"
@@ -1899,38 +1788,6 @@
"tracing",
]
-[[package]]
-name = "tonic"
-version = "0.8.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
-dependencies = [
- "async-stream",
- "async-trait",
- "axum",
- "base64 0.13.1",
- "bytes",
- "futures-core",
- "futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
- "hyper-timeout",
- "percent-encoding",
- "pin-project",
- "prost",
- "prost-derive",
- "tokio",
- "tokio-stream",
- "tokio-util",
- "tower",
- "tower-layer",
- "tower-service",
- "tracing",
- "tracing-futures",
-]
-
[[package]]
name = "tonic-build"
version = "0.8.4"
@@ -1952,37 +1809,14 @@
dependencies = [
"futures-core",
"futures-util",
- "indexmap",
"pin-project",
"pin-project-lite",
- "rand",
- "slab",
"tokio",
- "tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
-[[package]]
-name = "tower-http"
-version = "0.3.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
-dependencies = [
- "bitflags",
- "bytes",
- "futures-core",
- "futures-util",
- "http",
- "http-body",
- "http-range-header",
- "pin-project-lite",
- "tower",
- "tower-layer",
- "tower-service",
-]
-
[[package]]
name = "tower-layer"
version = "0.3.2"
diff --git a/services/backup/Cargo.toml b/services/backup/Cargo.toml
--- a/services/backup/Cargo.toml
+++ b/services/backup/Cargo.toml
@@ -16,14 +16,12 @@
clap = { version = "4.0", features = ["derive", "env"] }
comm-services-lib = { path = "../comm-services-lib" }
once_cell = "1.17"
-prost = "0.11"
rand = "0.8.5"
-tokio = { version = "1.24", features = ["rt-multi-thread"]}
+tokio = { version = "1.24", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1"
-tonic = "0.8"
tracing = "0.1"
-tracing-futures = { version = "0.2", features = ["futures-03"] }
-tracing-subscriber = { version = "0.3", features = ["env-filter"]}
+tracing-futures = { version = "0.2", features = ["futures-03"] }
+tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.2", features = ["v4"] }
[build-dependencies]
diff --git a/services/backup/Dockerfile b/services/backup/Dockerfile
--- a/services/backup/Dockerfile
+++ b/services/backup/Dockerfile
@@ -21,8 +21,6 @@
# Copy actual application sources
COPY services/backup .
-COPY shared/protos/backup.proto ../../shared/protos/
-COPY shared/protos/blob.proto ../../shared/protos/
# Remove the previously-built binary so that only the application itself is
# rebuilt
diff --git a/services/backup/build.rs b/services/backup/build.rs
deleted file mode 100644
--- a/services/backup/build.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-fn main() {
- println!("cargo:rerun-if-changed=src/main.rs");
-
- println!("cargo:rerun-if-changed=../../shared/protos/backup.proto");
- println!("cargo:rerun-if-changed=../../shared/protos/blob.proto");
- tonic_build::compile_protos("../../shared/protos/backup.proto")
- .expect("Failed to compile Backup protobuf file");
- tonic_build::compile_protos("../../shared/protos/blob.proto")
- .expect("Failed to compile Blob protobuf file");
-}
diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs
deleted file mode 100644
--- a/services/backup/src/blob/downloader.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-use anyhow::{bail, Result};
-use tokio::{
- sync::mpsc::{self, Receiver},
- task::JoinHandle,
-};
-use tracing::{error, instrument, Instrument};
-
-use super::{proto, BlobClient};
-use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
-
-pub use proto::put_request::Data as PutRequestData;
-pub use proto::{PutRequest, PutResponse};
-
-/// The BlobDownloader instance is a handle holder of a Tokio task running the
-/// actual blob client instance. The communication is done via a MPSC channel
-/// and is one-sided - the data is transmitted from the client task to the
-/// caller. Blob chunks received in response stream are waiting
-/// for the channel to have capacity, so it is recommended to read them quickly
-/// to make room for more.
-/// The client task can be stopped and awaited for result via the `terminate()`
-/// method.
-pub struct BlobDownloader {
- rx: Receiver<Vec<u8>>,
- handle: JoinHandle<anyhow::Result<()>>,
-}
-
-impl BlobDownloader {
- /// Connects to the Blob service and keeps the client connection open
- /// in a separate Tokio task.
- #[instrument(name = "blob_downloader")]
- pub fn start(holder: String, mut blob_client: BlobClient) -> Self {
- let (blob_res_tx, blob_res_rx) =
- mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let client_thread = async move {
- let response = blob_client
- .get(proto::GetRequest { holder })
- .await
- .map_err(|err| {
- error!("Get request failed: {:?}", err);
- err
- })?;
- let mut inner_response = response.into_inner();
- loop {
- match inner_response.message().await? {
- Some(data) => {
- let data: Vec<u8> = data.data_chunk;
- if let Err(err) = blob_res_tx.send(data).await {
- bail!(err);
- }
- }
- // Response stream was closed
- None => break,
- }
- }
- Ok(())
- };
- let handle = tokio::spawn(client_thread.in_current_span());
-
- BlobDownloader {
- rx: blob_res_rx,
- handle,
- }
- }
-
- /// Receives the next chunk of blob data if ready or sleeps
- /// until the data is available.
- ///
- /// Returns `None` when the transmission is finished, but this doesn't
- /// determine if it was successful. After receiving `None`, the client
- /// should be consumed by calling [`GetClient::terminate`] to handle
- /// possible errors.
- pub async fn next_chunk(&mut self) -> Option<Vec<u8>> {
- self.rx.recv().await
- }
-
- /// Stops receiving messages and awaits the client thread to exit
- /// and returns its status.
- pub async fn terminate(mut self) -> Result<()> {
- self.rx.close();
- self.handle.await?
- }
-}
diff --git a/services/backup/src/blob/mod.rs b/services/backup/src/blob/mod.rs
deleted file mode 100644
--- a/services/backup/src/blob/mod.rs
+++ /dev/null
@@ -1,22 +0,0 @@
-mod proto {
- tonic::include_proto!("blob");
-}
-use proto::blob_service_client::BlobServiceClient;
-pub use proto::put_request::Data as PutRequestData;
-pub use proto::{PutRequest, PutResponse};
-
-mod downloader;
-mod uploader;
-pub use downloader::*;
-pub use uploader::*;
-
-pub type BlobClient = BlobServiceClient<tonic::transport::Channel>;
-
-/// Creates a new Blob service client instance. It does not attempt to connect
-/// to the service until first use.
-pub fn init_blob_client() -> BlobClient {
- let service_url = &crate::CONFIG.blob_service_url;
- let channel =
- tonic::transport::Channel::from_static(service_url).connect_lazy();
- BlobServiceClient::new(channel)
-}
diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs
deleted file mode 100644
--- a/services/backup/src/blob/uploader.rs
+++ /dev/null
@@ -1,168 +0,0 @@
-use anyhow::{anyhow, bail, Result};
-use tokio::{
- sync::mpsc::{self, Receiver, Sender},
- task::JoinHandle,
-};
-use tokio_stream::wrappers::ReceiverStream;
-use tonic::Status;
-use tracing::{error, instrument, Instrument};
-
-use super::{proto, BlobClient};
-use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
-
-pub use proto::put_request::Data as PutRequestData;
-pub use proto::{PutRequest, PutResponse};
-
-pub struct BlobUploader {
- req_tx: Sender<proto::PutRequest>,
- res_rx: Receiver<proto::PutResponse>,
- handle: JoinHandle<anyhow::Result<()>>,
-}
-
-/// The BlobUploader instance is a handle holder of a Tokio task running the
-/// actual blob client instance. The communication is done via two MPSC
-/// channels - one sending requests to the client task, and another for sending
-/// responses back to the caller. These messages should go in pairs
-/// - one request for one response.
-/// The client task can be stopped and awaited for result via the `terminate()`
-/// method.
-impl BlobUploader {
- /// Connects to the Blob service and keeps the client connection open
- /// in a separate Tokio task.
- #[instrument(name = "blob_uploader")]
- pub fn start(mut blob_client: BlobClient) -> Self {
- let (blob_req_tx, blob_req_rx) =
- mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let (blob_res_tx, blob_res_rx) =
- mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let client_thread = async move {
- match blob_client
- .put(tonic::Request::new(ReceiverStream::new(blob_req_rx)))
- .await
- {
- Ok(response) => {
- let mut response_stream = response.into_inner();
- loop {
- match response_stream.message().await? {
- Some(response_message) => {
- // warning: this will produce an error if there's more unread
- // responses than MPSC_CHANNEL_BUFFER_CAPACITY
- // so you should always read the response MPSC channel
- // right after sending a request to dequeue the responses
- // and make room for more.
- // The PutClient::put() function should take care of that
- if let Err(err) = blob_res_tx.try_send(response_message) {
- bail!(err);
- }
- }
- // Response stream was closed
- None => break,
- }
- }
- }
- Err(err) => {
- error!("Put request failed: {:?}", err);
- bail!(err.to_string());
- }
- };
- Ok(())
- };
- let handle = tokio::spawn(client_thread.in_current_span());
-
- BlobUploader {
- req_tx: blob_req_tx,
- res_rx: blob_res_rx,
- handle,
- }
- }
-
- /// Sends a [`PutRequest`] to the stream and waits for blob service
- /// to send a response. After all data is sent, the [`PutClient::terminate`]
- /// should be called to end the transmission and handle possible errors.
- pub async fn put(&mut self, req: PutRequest) -> Result<PutResponse> {
- self.req_tx.try_send(req)?;
- self
- .res_rx
- .recv()
- .await
- .ok_or_else(|| anyhow!("Blob client channel closed"))
- }
-
- /// Convenience wrapper for
- /// ```
- /// BlobClient::put(PutRequest {
- /// data: Some(PutRequestData::DataChunk(data))
- /// })
- /// ```
- pub async fn put_data(&mut self, data: Vec<u8>) -> Result<PutResponse> {
- self
- .put(PutRequest {
- data: Some(PutRequestData::DataChunk(data)),
- })
- .await
- }
-
- /// Closes the connection and awaits the blob client task to finish.
- pub async fn terminate(self) -> Result<()> {
- drop(self.req_tx);
- let thread_result = self.handle.await?;
- thread_result
- }
-}
-
-/// Starts a put client instance. Fulfills request with blob hash and holder.
-///
-/// `None` is returned if given `holder` already exists.
-///
-/// ## Example
-/// ```
-/// if let Some(mut client) =
-/// start_simple_uploader("my_holder", "my_hash").await? {
-/// let my_data = vec![1,2,3,4];
-/// let _ = client.put_data(my_data).await;
-///
-/// let status = client.terminate().await;
-/// }
-/// ```
-pub async fn start_simple_uploader(
- holder: &str,
- blob_hash: &str,
- blob_client: BlobClient,
-) -> Result<Option<BlobUploader>, Status> {
- // start upload request
- let mut uploader = BlobUploader::start(blob_client);
-
- // send holder
- uploader
- .put(PutRequest {
- data: Some(PutRequestData::Holder(holder.to_string())),
- })
- .await
- .map_err(|err| {
- error!("Failed to set blob holder: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- // send hash
- let PutResponse { data_exists } = uploader
- .put(PutRequest {
- data: Some(PutRequestData::BlobHash(blob_hash.to_string())),
- })
- .await
- .map_err(|err| {
- error!("Failed to set blob hash: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- // Blob with given holder already exists, nothing to do
- if data_exists {
- // the connection is already terminated by server,
- // but it's good to await it anyway
- uploader.terminate().await.map_err(|err| {
- error!("Put client task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- return Ok(None);
- }
- Ok(Some(uploader))
-}
diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs
--- a/services/backup/src/main.rs
+++ b/services/backup/src/main.rs
@@ -1,17 +1,10 @@
use anyhow::Result;
-use std::net::SocketAddr;
-use tonic::transport::Server;
-use tracing::{info, Level};
+use tracing::Level;
use tracing_subscriber::EnvFilter;
-use crate::blob::BlobClient;
-use crate::service::{BackupServiceServer, MyBackupService};
-
-pub mod blob;
pub mod config;
pub mod constants;
pub mod database;
-pub mod service;
pub mod utils;
// re-export this to be available as crate::CONFIG
@@ -28,30 +21,13 @@
Ok(())
}
-async fn run_grpc_server(
- db: database::DatabaseClient,
- blob_client: BlobClient,
-) -> Result<()> {
- let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?;
- let backup_service = MyBackupService::new(db, blob_client);
-
- info!("Starting gRPC server listening at {}", addr.to_string());
- Server::builder()
- .add_service(BackupServiceServer::new(backup_service))
- .serve(addr)
- .await?;
-
- Ok(())
-}
-
#[tokio::main]
async fn main() -> Result<()> {
config::parse_cmdline_args();
configure_logging()?;
let aws_config = config::load_aws_config().await;
- let db = database::DatabaseClient::new(&aws_config);
- let blob_client = blob::init_blob_client();
+ let _db = database::DatabaseClient::new(&aws_config);
- run_grpc_server(db, blob_client).await
+ Ok(())
}
diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/add_attachments.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use tonic::Status;
-use tracing::debug;
-use tracing::error;
-
-use super::handle_db_error;
-use super::proto;
-use crate::{
- blob::BlobClient,
- constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
- database::{DatabaseClient, LogItem},
-};
-
-pub async fn handle_add_attachments(
- db: &DatabaseClient,
- blob_client: &BlobClient,
- request: proto::AddAttachmentsRequest,
-) -> Result<(), Status> {
- let proto::AddAttachmentsRequest {
- user_id,
- backup_id,
- log_id,
- holders,
- } = request;
-
- if user_id.is_empty() {
- return Err(Status::invalid_argument(
- "user id required but not provided",
- ));
- }
- if backup_id.is_empty() {
- return Err(Status::invalid_argument(
- "backup id required but not provided",
- ));
- }
- if holders.is_empty() {
- return Err(Status::invalid_argument(
- "holders required but not provided",
- ));
- }
-
- if log_id.is_empty() {
- let backup_item_result = db
- .find_backup_item(&user_id, &backup_id)
- .await
- .map_err(handle_db_error)?;
- let mut backup_item = backup_item_result.ok_or_else(|| {
- debug!("Backup item not found");
- Status::not_found("Backup item not found")
- })?;
-
- add_new_attachments(&mut backup_item.attachment_holders, &holders);
-
- db.put_backup_item(backup_item)
- .await
- .map_err(handle_db_error)?;
- } else {
- let log_item_result = db
- .find_log_item(&backup_id, &log_id)
- .await
- .map_err(handle_db_error)?;
- let mut log_item = log_item_result.ok_or_else(|| {
- debug!("Log item not found");
- Status::not_found("Log item not found")
- })?;
-
- add_new_attachments(&mut log_item.attachment_holders, &holders);
-
- // log item too large for database, move it to blob-service stroage
- if !log_item.persisted_in_blob
- && log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT
- {
- debug!("Log item too large. Persisting in blob service...");
- log_item = move_to_blob(log_item, blob_client).await?;
- }
-
- db.put_log_item(log_item).await.map_err(handle_db_error)?;
- }
-
- Ok(())
-}
-
-async fn move_to_blob(
- log_item: LogItem,
- blob_client: &BlobClient,
-) -> Result<LogItem, Status> {
- let holder = crate::utils::generate_blob_holder(
- &log_item.data_hash,
- &log_item.backup_id,
- Some(&log_item.log_id),
- );
-
- if let Some(mut uploader) = crate::blob::start_simple_uploader(
- &holder,
- &log_item.data_hash,
- blob_client.clone(),
- )
- .await?
- {
- let blob_chunk = log_item.value.into_bytes();
- uploader.put_data(blob_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- uploader.terminate().await.map_err(|err| {
- error!("Put client task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- } else {
- debug!("Blob holder for log ID={} already exists", &log_item.log_id);
- }
-
- Ok(LogItem {
- persisted_in_blob: true,
- value: holder,
- ..log_item
- })
-}
-
-/// Modifies the [`current_holders_str`] by appending attachment holders
-/// contained in [`new_holders`]. Removes duplicates. Both arguments
-/// are expected to be [`ATTACHMENT_HOLDER_SEPARATOR`] separated strings.
-fn add_new_attachments(current_holders_str: &mut String, new_holders: &str) {
- let mut current_holders = parse_attachment_holders(current_holders_str);
-
- let new_holders = parse_attachment_holders(new_holders);
- current_holders.extend(new_holders);
-
- *current_holders_str = current_holders
- .into_iter()
- .collect::<Vec<_>>()
- .join(ATTACHMENT_HOLDER_SEPARATOR);
-}
-
-/// Parses an [`ATTACHMENT_HOLDER_SEPARATOR`] separated string into a `HashSet`
-/// of attachment holder string slices.
-fn parse_attachment_holders(
- holders_str: &str,
-) -> std::collections::HashSet<&str> {
- holders_str
- .split(ATTACHMENT_HOLDER_SEPARATOR)
- .filter(|holder| !holder.is_empty())
- .collect()
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use std::collections::HashSet;
-
- #[test]
- fn test_parse_attachments() {
- let holders = "h1;h2;h3";
- let expected = HashSet::from(["h1", "h2", "h3"]);
- assert_eq!(parse_attachment_holders(holders), expected);
- }
-
- #[test]
- fn test_empty_attachments() {
- let actual = parse_attachment_holders("");
- let expected = HashSet::new();
- assert_eq!(actual, expected);
- }
-
- #[test]
- fn test_add_attachments() {
- let mut current_holders = "holder1;holder2".to_string();
- let new_holders = "holder3;holder4";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(&current_holders),
- HashSet::from(["holder1", "holder2", "holder3", "holder4"])
- );
- }
-
- #[test]
- fn test_add_to_empty() {
- let mut current_holders = String::new();
- let new_holders = "holder3;holder4";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(&current_holders),
- HashSet::from(["holder3", "holder4"])
- );
- }
-
- #[test]
- fn test_add_none() {
- let mut current_holders = "holder1;holder2".to_string();
- let new_holders = "";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(&current_holders),
- HashSet::from(["holder1", "holder2"])
- );
- }
-
- #[test]
- fn test_remove_duplicates() {
- let mut current_holders = "holder1;holder2".to_string();
- let new_holders = "holder2;holder3";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(&current_holders),
- HashSet::from(["holder1", "holder2", "holder3"])
- );
- }
-}
diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/create_backup.rs
+++ /dev/null
@@ -1,234 +0,0 @@
-use tonic::Status;
-use tracing::{debug, error, trace, warn};
-
-use crate::{
- blob::{start_simple_uploader, BlobClient, BlobUploader},
- database::{BackupItem, DatabaseClient},
- service::proto,
-};
-
-use super::handle_db_error;
-
-type CreateBackupResult = Result<proto::CreateNewBackupResponse, Status>;
-
-enum HandlerState {
- /// Initial state. Handler is receiving non-data inputs
- ReceivingParams,
- /// Handler is receiving data chunks
- ReceivingData { uploader: BlobUploader },
- /// A special case when Blob service claims that a blob with given
- /// [`CreateBackupHandler::data_hash`] already exists
- DataAlreadyExists,
-}
-
-pub struct CreateBackupHandler {
- // flow control
- pub should_close_stream: bool,
-
- // inputs
- user_id: Option<String>,
- device_id: Option<String>,
- key_entropy: Option<Vec<u8>>,
- data_hash: Option<String>,
-
- // client instances
- db: DatabaseClient,
- blob_client: BlobClient,
-
- // internal state
- state: HandlerState,
- backup_id: String,
- holder: Option<String>,
-}
-
-impl CreateBackupHandler {
- pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self {
- CreateBackupHandler {
- should_close_stream: false,
- user_id: None,
- device_id: None,
- key_entropy: None,
- data_hash: None,
- db,
- blob_client,
- state: HandlerState::ReceivingParams,
- backup_id: String::new(),
- holder: None,
- }
- }
-
- pub async fn handle_user_id(
- &mut self,
- user_id: String,
- ) -> CreateBackupResult {
- if self.user_id.is_some() {
- warn!("user ID already provided");
- return Err(Status::invalid_argument("User ID already provided"));
- }
- self.user_id = Some(user_id);
- self.handle_internal().await
- }
- pub async fn handle_device_id(
- &mut self,
- device_id: String,
- ) -> CreateBackupResult {
- if self.device_id.is_some() {
- warn!("Device ID already provided");
- return Err(Status::invalid_argument("Device ID already provided"));
- }
- tracing::Span::current().record("device_id", &device_id);
- self.device_id = Some(device_id);
- self.handle_internal().await
- }
- pub async fn handle_key_entropy(
- &mut self,
- key_entropy: Vec<u8>,
- ) -> CreateBackupResult {
- if self.key_entropy.is_some() {
- warn!("Key entropy already provided");
- return Err(Status::invalid_argument("Key entropy already provided"));
- }
- self.key_entropy = Some(key_entropy);
- self.handle_internal().await
- }
- pub async fn handle_data_hash(
- &mut self,
- data_hash: Vec<u8>,
- ) -> CreateBackupResult {
- if self.data_hash.is_some() {
- warn!("Data hash already provided");
- return Err(Status::invalid_argument("Data hash already provided"));
- }
- let hash_str = String::from_utf8(data_hash).map_err(|err| {
- error!("Failed to convert data_hash into string: {:?}", err);
- Status::aborted("Unexpected error")
- })?;
- debug!("Received data hash: {}", &hash_str);
- self.data_hash = Some(hash_str);
- self.handle_internal().await
- }
-
- pub async fn handle_data_chunk(
- &mut self,
- data_chunk: Vec<u8>,
- ) -> CreateBackupResult {
- let HandlerState::ReceivingData { ref mut uploader } = self.state else {
- self.should_close_stream = true;
- error!("Data chunk sent before other inputs");
- return Err(Status::invalid_argument(
- "Data chunk sent before other inputs",
- ));
- };
-
- // empty chunk ends transmission
- if data_chunk.is_empty() {
- self.should_close_stream = true;
- return Ok(proto::CreateNewBackupResponse {
- backup_id: self.backup_id.clone(),
- });
- }
-
- trace!("Received {} bytes of data", data_chunk.len());
- uploader.put_data(data_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- Ok(proto::CreateNewBackupResponse {
- // actual Backup ID should be sent only once, the time it is generated
- // see handle_internal()
- backup_id: String::new(),
- })
- }
-
- /// This function should be called after the input stream is finished.
- pub async fn finish(self) -> Result<(), Status> {
- match self.state {
- HandlerState::ReceivingParams => {
- // client probably aborted early
- trace!("Nothing to store in database. Finishing early");
- return Ok(());
- }
- HandlerState::ReceivingData { uploader } => {
- uploader.terminate().await.map_err(|err| {
- error!("Uploader task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- }
- HandlerState::DataAlreadyExists => (),
- }
-
- let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else {
- error!("Holder / UserID absent in data mode. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
- if self.backup_id.is_empty() {
- error!("Backup ID was not generated. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- }
- let backup_item = BackupItem::new(user_id, self.backup_id, holder);
-
- self
- .db
- .put_backup_item(backup_item)
- .await
- .map_err(handle_db_error)?;
-
- Ok(())
- }
-
- // internal param handler helper
- async fn handle_internal(&mut self) -> CreateBackupResult {
- if !matches!(self.state, HandlerState::ReceivingParams) {
- error!("CreateBackupHandler already received all non-data params.");
- return Err(Status::failed_precondition("Backup data chunk expected"));
- }
-
- // all non-data inputs must be set before receiving backup data chunks
- let (Some(data_hash), Some(device_id), Some(_), Some(_)) = (
- self.data_hash.as_ref(),
- self.device_id.as_ref(),
- self.user_id.as_ref(),
- self.key_entropy.as_ref(),
- ) else {
- // return empty backup ID when inputs are incomplete
- return Ok(proto::CreateNewBackupResponse {
- backup_id: "".to_string(),
- });
- };
-
- let backup_id = generate_backup_id(device_id);
- let holder =
- crate::utils::generate_blob_holder(data_hash, &backup_id, None);
- self.backup_id = backup_id.clone();
- self.holder = Some(holder.clone());
- tracing::Span::current().record("backup_id", &backup_id);
- tracing::Span::current().record("blob_holder", &holder);
-
- match start_simple_uploader(&holder, data_hash, self.blob_client.clone())
- .await?
- {
- Some(uploader) => {
- self.state = HandlerState::ReceivingData { uploader };
- trace!("Everything prepared, waiting for data...");
- }
- None => {
- // Blob with given data_hash already exists
- debug!("Blob already exists, finishing");
- self.should_close_stream = true;
- self.state = HandlerState::DataAlreadyExists;
- }
- };
-
- Ok(proto::CreateNewBackupResponse { backup_id })
- }
-}
-
-/// Generates ID for a new backup
-fn generate_backup_id(device_id: &str) -> String {
- format!(
- "{device_id}_{timestamp}",
- device_id = device_id,
- timestamp = chrono::Utc::now().timestamp_millis()
- )
-}
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ /dev/null
@@ -1,330 +0,0 @@
-use async_stream::try_stream;
-use tokio_stream::{Stream, StreamExt};
-use tonic::Status;
-use tracing::{debug, error, trace, warn};
-use tracing_futures::Instrument;
-
-use super::handle_db_error;
-use super::proto::{self, PullBackupResponse};
-use crate::{
- blob::{BlobClient, BlobDownloader},
- constants::{
- BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID,
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_LOG_ID,
- },
- database::{BackupItem, DatabaseClient, LogItem},
-};
-
-pub struct PullBackupHandler {
- blob_client: BlobClient,
- backup_item: BackupItem,
- logs: Vec<LogItem>,
-}
-
-impl PullBackupHandler {
- pub async fn new(
- db: &DatabaseClient,
- blob_client: &BlobClient,
- request: proto::PullBackupRequest,
- ) -> Result<Self, Status> {
- let proto::PullBackupRequest { user_id, backup_id } = request;
- let backup_item = db
- .find_backup_item(&user_id, &backup_id)
- .await
- .map_err(handle_db_error)?
- .ok_or_else(|| {
- debug!("Backup item not found");
- Status::not_found("Backup item not found")
- })?;
-
- let backup_id = backup_item.backup_id.as_str();
- let logs = db
- .find_log_items_for_backup(backup_id)
- .await
- .map_err(handle_db_error)?;
-
- Ok(PullBackupHandler {
- backup_item,
- logs,
- blob_client: blob_client.clone(),
- })
- }
-
- /// Consumes the handler and provides a response `Stream`. The stream will
- /// produce the following in order:
- /// - Backup compaction data chunks
- /// - Backup logs
- /// - Whole log, if stored in db
- /// - Log chunks, if stored in blob
- pub fn into_response_stream(
- self,
- ) -> impl Stream<Item = Result<PullBackupResponse, Status>> {
- use proto::pull_backup_response::*;
-
- try_stream! {
- debug!("Pulling backup...");
- {
- let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone());
- tokio::pin!(compaction_stream);
- while let Some(response) = compaction_stream.try_next().await? {
- yield response;
- }
- }
- trace!("Backup data pull complete.");
-
- if self.logs.is_empty() {
- debug!("No logs to pull. Finishing");
- return;
- }
-
- debug!("Pulling logs...");
- for log in self.logs {
- trace!("Pulling log ID={}", &log.log_id);
- let span = tracing::trace_span!("log", log_id = &log.log_id);
-
- if log.persisted_in_blob {
- trace!(parent: &span, "Log persisted in blob");
- let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span);
- tokio::pin!(log_data_stream);
- while let Some(response) = log_data_stream.try_next().await? {
- yield response;
- }
- } else {
- trace!(parent: &span, "Log persisted in database");
- yield proto::PullBackupResponse {
- attachment_holders: Some(log.attachment_holders),
- id: Some(Id::LogId(log.log_id)),
- data: Some(Data::LogChunk(log.value.into_bytes())),
- };
- }
- }
- trace!("Pulled all logs, done");
- }
- }
-}
-
-/// Downloads a blob-stored [`BlobStoredItem`] and streams its content into
-/// stream of [`PullBackupResponse`] objects, handles gRPC message size details.
-fn data_stream<Item>(
- item: &Item,
- blob_client: BlobClient,
-) -> impl Stream<Item = Result<PullBackupResponse, Status>> + '_
-where
- Item: BlobStoredItem,
-{
- try_stream! {
- let mut buffer = ResponseBuffer::default();
- let mut downloader =
- BlobDownloader::start(item.get_holder().to_string(), blob_client);
-
- let mut is_first_chunk = true;
- loop {
- if !buffer.is_saturated() {
- if let Some(data) = downloader.next_chunk().await {
- buffer.put(data);
- }
- }
- if buffer.is_empty() {
- break;
- }
-
- // get data chunk, shortened by length of metadata
- let padding = item.metadata_size(is_first_chunk);
- let chunk = buffer.get_chunk(padding);
-
- trace!(
- with_attachments = is_first_chunk,
- data_size = chunk.len(),
- "Sending data chunk"
- );
- yield item.to_response(chunk, is_first_chunk);
- is_first_chunk = false;
- }
-
- downloader.terminate().await.map_err(|err| {
- error!("Blob downloader failed: {:?}", err);
- Status::aborted("Internal error")
- })?;
- }
-}
-
-/// Represents downloadable item stored in Blob service
-trait BlobStoredItem {
- // Blob holder representing this item
- fn get_holder(&self) -> &str;
-
- /// Generates a gRPC response for given `data_chunk`.
- /// The response may be in extended version, with `include_extra_info`,
- /// ususally sent with first chunk
- fn to_response(
- &self,
- data_chunk: Vec<u8>,
- include_extra_info: bool,
- ) -> proto::PullBackupResponse;
-
- /// Size in bytes of non-data fields contained in response message.
- fn metadata_size(&self, include_extra_info: bool) -> usize;
-}
-
-impl BlobStoredItem for BackupItem {
- fn get_holder(&self) -> &str {
- &self.compaction_holder
- }
-
- fn to_response(
- &self,
- data_chunk: Vec<u8>,
- include_extra_info: bool,
- ) -> proto::PullBackupResponse {
- use proto::pull_backup_response::*;
- let attachment_holders = if include_extra_info {
- Some(self.attachment_holders.clone())
- } else {
- None
- };
- proto::PullBackupResponse {
- id: Some(Id::BackupId(self.backup_id.clone())),
- data: Some(Data::CompactionChunk(data_chunk)),
- attachment_holders,
- }
- }
-
- fn metadata_size(&self, include_extra_info: bool) -> usize {
- let mut extra_bytes: usize = 0;
- extra_bytes += BACKUP_TABLE_FIELD_BACKUP_ID.as_bytes().len();
- extra_bytes += self.backup_id.as_bytes().len();
- if include_extra_info {
- extra_bytes += BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len();
- extra_bytes += self.attachment_holders.as_bytes().len();
- }
- extra_bytes
- }
-}
-
-impl BlobStoredItem for LogItem {
- fn get_holder(&self) -> &str {
- &self.value
- }
-
- fn to_response(
- &self,
- data_chunk: Vec<u8>,
- include_extra_info: bool,
- ) -> proto::PullBackupResponse {
- use proto::pull_backup_response::*;
- let attachment_holders = if include_extra_info {
- Some(self.attachment_holders.clone())
- } else {
- None
- };
- proto::PullBackupResponse {
- id: Some(Id::LogId(self.log_id.clone())),
- data: Some(Data::LogChunk(data_chunk)),
- attachment_holders,
- }
- }
-
- fn metadata_size(&self, include_extra_info: bool) -> usize {
- let mut extra_bytes: usize = 0;
- extra_bytes += LOG_TABLE_FIELD_LOG_ID.as_bytes().len();
- extra_bytes += self.log_id.as_bytes().len();
- if include_extra_info {
- extra_bytes += LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len();
- extra_bytes += self.attachment_holders.as_bytes().len();
- }
- extra_bytes
- }
-}
-
-/// A utility structure that buffers downloaded data and allows to retrieve it
-/// as chunks of arbitrary size, not greater than provided `limit`.
-struct ResponseBuffer {
- buf: Vec<u8>,
- limit: usize,
-}
-
-impl Default for ResponseBuffer {
- /// Buffer size defaults to max usable gRPC message size
- fn default() -> Self {
- ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE)
- }
-}
-
-impl ResponseBuffer {
- pub fn new(limit: usize) -> Self {
- ResponseBuffer {
- buf: Vec::new(),
- limit,
- }
- }
-
- pub fn put(&mut self, data: Vec<u8>) {
- if data.len() > self.limit {
- warn!("Data saved to buffer is larger than chunk limit.");
- }
-
- self.buf.extend(data);
- }
-
- /// Gets chunk of size `limit - padding` and leaves remainder in buffer
- pub fn get_chunk(&mut self, padding: usize) -> Vec<u8> {
- let mut chunk = std::mem::take(&mut self.buf);
-
- let target_size = self.limit - padding;
- if chunk.len() > target_size {
- // after this operation, chunk=0..target_size, self.buf=target_size..end
- self.buf = chunk.split_off(target_size);
- }
- return chunk;
- }
-
- /// Does buffer length exceed given limit
- pub fn is_saturated(&self) -> bool {
- self.buf.len() >= self.limit
- }
-
- pub fn is_empty(&self) -> bool {
- self.buf.is_empty()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- const LIMIT: usize = 100;
-
- #[test]
- fn test_response_buffer() {
- let mut buffer = ResponseBuffer::new(LIMIT);
- assert_eq!(buffer.is_empty(), true);
-
- // put 80 bytes of data
- buffer.put(vec![0u8; 80]);
- assert_eq!(buffer.is_empty(), false);
- assert_eq!(buffer.is_saturated(), false);
-
- // put next 80 bytes, should be saturated as 160 > 100
- buffer.put(vec![0u8; 80]);
- let buf_size = buffer.buf.len();
- assert_eq!(buffer.is_saturated(), true);
- assert_eq!(buf_size, 160);
-
- // get one chunk
- let padding: usize = 10;
- let expected_chunk_size = LIMIT - padding;
- let chunk = buffer.get_chunk(padding);
- assert_eq!(chunk.len(), expected_chunk_size); // 90
-
- // buffer should not be saturated now (160 - 90 < 100)
- let remaining_buf_size = buffer.buf.len();
- assert_eq!(remaining_buf_size, buf_size - expected_chunk_size);
- assert_eq!(buffer.is_saturated(), false);
-
- // get last chunk
- let chunk = buffer.get_chunk(padding);
- assert_eq!(chunk.len(), remaining_buf_size);
- assert_eq!(buffer.is_empty(), true);
- }
-}
diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/send_log.rs
+++ /dev/null
@@ -1,272 +0,0 @@
-use tonic::Status;
-use tracing::{debug, error, trace, warn};
-use uuid::Uuid;
-
-use super::handle_db_error;
-use crate::{
- blob::{BlobClient, BlobUploader},
- constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
- database::{DatabaseClient, LogItem},
- service::proto::SendLogResponse,
-};
-
-enum LogPersistence {
- /// Log entirely stored in DynamoDB database
- DB,
- /// Log contents stored with Blob service
- BLOB { holder: String },
-}
-
-pub struct SendLogHandler {
- // flow control
- pub should_close_stream: bool,
-
- // inputs
- user_id: Option<String>,
- backup_id: Option<String>,
- log_hash: Option<String>,
-
- // internal state
- log_id: Option<String>,
- log_buffer: Vec<u8>,
- persistence_method: LogPersistence,
- should_receive_data: bool,
-
- // client instances
- db: DatabaseClient,
- blob_client: BlobClient,
- uploader: Option<BlobUploader>,
-}
-
-impl SendLogHandler {
- pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self {
- SendLogHandler {
- db: db.clone(),
- blob_client: blob_client.clone(),
- uploader: None,
- user_id: None,
- backup_id: None,
- log_hash: None,
- log_id: None,
- log_buffer: Vec::new(),
- persistence_method: LogPersistence::DB,
- should_receive_data: false,
- should_close_stream: false,
- }
- }
-
- pub async fn handle_user_id(
- &mut self,
- user_id: String,
- ) -> Result<(), Status> {
- if self.user_id.is_some() {
- warn!("user ID already provided");
- return Err(Status::invalid_argument("User ID already provided"));
- }
- self.user_id = Some(user_id);
- self.handle_internal().await
- }
- pub async fn handle_backup_id(
- &mut self,
- backup_id: String,
- ) -> Result<(), Status> {
- if self.backup_id.is_some() {
- warn!("backup ID already provided");
- return Err(Status::invalid_argument("Backup ID already provided"));
- }
- tracing::Span::current().record("backup_id", &backup_id);
- self.backup_id = Some(backup_id);
- self.handle_internal().await
- }
- pub async fn handle_log_hash(
- &mut self,
- log_hash: Vec<u8>,
- ) -> Result<(), Status> {
- if self.log_hash.is_some() {
- warn!("Log hash already provided");
- return Err(Status::invalid_argument("Log hash already provided"));
- }
- let hash_str = String::from_utf8(log_hash).map_err(|err| {
- error!("Failed to convert data_hash into string: {:?}", err);
- Status::aborted("Unexpected error")
- })?;
- debug!("Received log hash: {}", &hash_str);
- self.log_hash = Some(hash_str);
- self.handle_internal().await
- }
- pub async fn handle_log_data(
- &mut self,
- data_chunk: Vec<u8>,
- ) -> Result<(), Status> {
- if !self.should_receive_data || self.log_id.is_none() {
- self.should_close_stream = true;
- error!("Data chunk sent before other inputs");
- return Err(Status::invalid_argument(
- "Data chunk sent before other inputs",
- ));
- }
-
- // empty chunk ends transmission
- if data_chunk.is_empty() {
- self.should_close_stream = true;
- return Ok(());
- }
-
- match self.persistence_method {
- LogPersistence::DB => {
- self.log_buffer.extend(data_chunk);
- self.ensure_size_constraints().await?;
- }
- LogPersistence::BLOB { .. } => {
- let Some(client) = self.uploader.as_mut() else {
- self.should_close_stream = true;
- error!("Put client uninitialized. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
- client.put_data(data_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
- }
- }
- Ok(())
- }
-
- pub async fn finish(self) -> Result<SendLogResponse, Status> {
- if let Some(client) = self.uploader {
- client.terminate().await.map_err(|err| {
- error!("Put client task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- } else {
- trace!("No uploader initialized. Skipping termination");
- }
-
- if !self.should_receive_data {
- // client probably aborted early
- trace!("Nothing to store in database. Finishing early");
- return Ok(SendLogResponse {
- log_checkpoint: "".to_string(),
- });
- }
-
- let (Some(backup_id), Some(log_id), Some(data_hash)) = (
- self.backup_id,
- self.log_id,
- self.log_hash
- ) else {
- error!("Log info absent in data mode. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
-
- let (log_value, persisted_in_blob) = match self.persistence_method {
- LogPersistence::BLOB { holder } => (holder, true),
- LogPersistence::DB => {
- let contents = String::from_utf8(self.log_buffer).map_err(|err| {
- error!("Failed to convert log contents data into string: {:?}", err);
- Status::aborted("Unexpected error")
- })?;
- (contents, false)
- }
- };
-
- let log_item = LogItem {
- backup_id,
- log_id: log_id.clone(),
- persisted_in_blob,
- value: log_value,
- attachment_holders: String::new(),
- data_hash,
- };
-
- self
- .db
- .put_log_item(log_item)
- .await
- .map_err(handle_db_error)?;
-
- Ok(SendLogResponse {
- log_checkpoint: log_id,
- })
- }
-
- // internal param handler helper
- async fn handle_internal(&mut self) -> Result<(), Status> {
- if self.should_receive_data {
- error!("SendLogHandler is already expecting data chunks");
- return Err(Status::failed_precondition("Log data chunk expected"));
- }
-
- // all non-data inputs must be set before receiving log contents
- let (Some(backup_id), Some(_), Some(_)) = (
- self.backup_id.as_ref(),
- self.user_id.as_ref(),
- self.log_hash.as_ref()
- ) else { return Ok(()); };
-
- let log_id = generate_log_id(backup_id);
- tracing::Span::current().record("log_id", &log_id);
- self.log_id = Some(log_id);
-
- trace!("Everything prepared, waiting for data...");
- self.should_receive_data = true;
- Ok(())
- }
-
- /// Ensures log fits db size constraints. If not, it is moved to blob
- /// persistence
- async fn ensure_size_constraints(&mut self) -> Result<(), Status> {
- let (Some(backup_id), Some(log_id), Some(log_hash)) = (
- self.backup_id.as_ref(),
- self.log_id.as_ref(),
- self.log_hash.as_ref()
- ) else {
- self.should_close_stream = true;
- error!("Log info absent in data mode. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
-
- let log_size = LogItem::size_from_components(
- backup_id,
- log_id,
- log_hash,
- &self.log_buffer,
- );
- if log_size > LOG_DATA_SIZE_DATABASE_LIMIT {
- debug!("Log too large, switching persistence to Blob");
- let holder =
- crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id));
- match crate::blob::start_simple_uploader(
- &holder,
- &log_hash,
- self.blob_client.clone(),
- )
- .await?
- {
- Some(mut uploader) => {
- let blob_chunk = std::mem::take(&mut self.log_buffer);
- uploader.put_data(blob_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
- self.uploader = Some(uploader);
- }
- None => {
- debug!("Log hash already exists");
- self.should_close_stream = true;
- }
- }
- self.persistence_method = LogPersistence::BLOB { holder };
- }
- Ok(())
- }
-}
-
-fn generate_log_id(backup_id: &str) -> String {
- format!(
- "{backup_id}{sep}{uuid}",
- backup_id = backup_id,
- sep = ID_SEPARATOR,
- uuid = Uuid::new_v4()
- )
-}
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
deleted file mode 100644
--- a/services/backup/src/service/mod.rs
+++ /dev/null
@@ -1,241 +0,0 @@
-use aws_sdk_dynamodb::Error as DynamoDBError;
-use comm_services_lib::database::Error as DBError;
-use proto::backup_service_server::BackupService;
-use std::pin::Pin;
-use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
-use tonic::{Request, Response, Status};
-use tracing::{debug, error, info, instrument, trace, warn};
-use tracing_futures::Instrument;
-
-use crate::{
- blob::BlobClient, constants::MPSC_CHANNEL_BUFFER_CAPACITY,
- database::DatabaseClient,
-};
-
-mod proto {
- tonic::include_proto!("backup");
-}
-pub use proto::backup_service_server::BackupServiceServer;
-
-/// submodule containing gRPC endpoint handler implementations
-mod handlers {
- pub(super) mod add_attachments;
- pub(super) mod create_backup;
- pub(super) mod pull_backup;
- pub(super) mod send_log;
-
- // re-exports for convenient usage in handlers
- pub(self) use super::handle_db_error;
- pub(self) use super::proto;
-}
-use self::handlers::create_backup::CreateBackupHandler;
-use self::handlers::pull_backup::PullBackupHandler;
-use self::handlers::send_log::SendLogHandler;
-
-pub struct MyBackupService {
- db: DatabaseClient,
- blob_client: BlobClient,
-}
-
-impl MyBackupService {
- pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self {
- MyBackupService {
- db: db_client,
- blob_client,
- }
- }
-}
-
-// gRPC implementation
-#[tonic::async_trait]
-impl BackupService for MyBackupService {
- type CreateNewBackupStream = Pin<
- Box<
- dyn Stream<Item = Result<proto::CreateNewBackupResponse, Status>> + Send,
- >,
- >;
-
- #[instrument(skip_all, fields(device_id, backup_id, blob_holder))]
- async fn create_new_backup(
- &self,
- request: Request<tonic::Streaming<proto::CreateNewBackupRequest>>,
- ) -> Result<Response<Self::CreateNewBackupStream>, Status> {
- use proto::create_new_backup_request::Data::*;
-
- info!("CreateNewBackup request: {:?}", request);
- let mut in_stream = request.into_inner();
- let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let db = self.db.clone();
- let blob_client = self.blob_client.clone();
- let worker = async move {
- let mut handler = CreateBackupHandler::new(db, blob_client);
- while let Some(message) = in_stream.next().await {
- let response = match message {
- Ok(proto::CreateNewBackupRequest {
- data: Some(UserId(user_id)),
- }) => handler.handle_user_id(user_id).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(DeviceId(device_id)),
- }) => handler.handle_device_id(device_id).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(KeyEntropy(key_entropy)),
- }) => handler.handle_key_entropy(key_entropy).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(NewCompactionHash(hash)),
- }) => handler.handle_data_hash(hash).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(NewCompactionChunk(chunk)),
- }) => handler.handle_data_chunk(chunk).await,
- unexpected => {
- error!("Received an unexpected request: {:?}", unexpected);
- Err(Status::unknown("unknown error"))
- }
- };
-
- trace!("Sending response: {:?}", response);
- if let Err(e) = tx.send(response).await {
- error!("Response was dropped: {}", e);
- break;
- }
- if handler.should_close_stream {
- trace!("Handler requested to close stream");
- break;
- }
- }
- if let Err(status) = handler.finish().await {
- trace!("Sending error response: {:?}", status);
- let _ = tx.send(Err(status)).await;
- }
- debug!("Request finished processing");
- };
- tokio::spawn(worker.in_current_span());
-
- let out_stream = ReceiverStream::new(rx);
- Ok(Response::new(
- Box::pin(out_stream) as Self::CreateNewBackupStream
- ))
- }
-
- #[instrument(skip_all, fields(backup_id, log_id))]
- async fn send_log(
- &self,
- request: Request<tonic::Streaming<proto::SendLogRequest>>,
- ) -> Result<Response<proto::SendLogResponse>, Status> {
- use proto::send_log_request::Data::*;
-
- info!("SendLog request: {:?}", request);
- let mut handler = SendLogHandler::new(&self.db, &self.blob_client);
-
- let mut in_stream = request.into_inner();
- while let Some(message) = in_stream.next().await {
- let result = match message {
- Ok(proto::SendLogRequest {
- data: Some(UserId(user_id)),
- }) => handler.handle_user_id(user_id).await,
- Ok(proto::SendLogRequest {
- data: Some(BackupId(backup_id)),
- }) => handler.handle_backup_id(backup_id).await,
- Ok(proto::SendLogRequest {
- data: Some(LogHash(log_hash)),
- }) => handler.handle_log_hash(log_hash).await,
- Ok(proto::SendLogRequest {
- data: Some(LogData(chunk)),
- }) => handler.handle_log_data(chunk).await,
- unexpected => {
- error!("Received an unexpected request: {:?}", unexpected);
- Err(Status::unknown("unknown error"))
- }
- };
-
- if let Err(err) = result {
- error!("An error occurred when processing request: {:?}", err);
- return Err(err);
- }
- if handler.should_close_stream {
- trace!("Handler requested to close request stream");
- break;
- }
- }
-
- let response = handler.finish().await;
- debug!("Finished. Sending response: {:?}", response);
- response.map(|response_body| Response::new(response_body))
- }
-
- type RecoverBackupKeyStream = Pin<
- Box<
- dyn Stream<Item = Result<proto::RecoverBackupKeyResponse, Status>> + Send,
- >,
- >;
-
- #[instrument(skip(self))]
- async fn recover_backup_key(
- &self,
- _request: Request<tonic::Streaming<proto::RecoverBackupKeyRequest>>,
- ) -> Result<Response<Self::RecoverBackupKeyStream>, Status> {
- Err(Status::unimplemented("unimplemented"))
- }
-
- type PullBackupStream = Pin<
- Box<dyn Stream<Item = Result<proto::PullBackupResponse, Status>> + Send>,
- >;
-
- #[instrument(skip_all, fields(backup_id = &request.get_ref().backup_id))]
- async fn pull_backup(
- &self,
- request: Request<proto::PullBackupRequest>,
- ) -> Result<Response<Self::PullBackupStream>, Status> {
- info!("PullBackup request: {:?}", request);
-
- let handler =
- PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner())
- .await?;
-
- let stream = handler.into_response_stream().in_current_span();
- Ok(Response::new(Box::pin(stream) as Self::PullBackupStream))
- }
-
- #[instrument(skip_all,
- fields(
- backup_id = &request.get_ref().backup_id,
- log_id = &request.get_ref().log_id)
- )]
- async fn add_attachments(
- &self,
- request: Request<proto::AddAttachmentsRequest>,
- ) -> Result<Response<()>, Status> {
- info!(
- "AddAttachment request. New holders: {}",
- &request.get_ref().holders
- );
-
- handlers::add_attachments::handle_add_attachments(
- &self.db,
- &self.blob_client,
- request.into_inner(),
- )
- .await?;
-
- info!("Request processed successfully");
- Ok(Response::new(()))
- }
-}
-
-/// A helper converting our Database errors into gRPC responses
-fn handle_db_error(db_error: DBError) -> Status {
- match db_error {
- DBError::AwsSdk(DynamoDBError::InternalServerError(_))
- | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException(
- _,
- ))
- | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => {
- warn!("AWS transient error occurred");
- Status::unavailable("please retry")
- }
- e => {
- error!("Encountered an unexpected error: {}", e);
- Status::failed_precondition("unexpected error")
- }
- }
-}
diff --git a/shared/protos/backup.proto b/shared/protos/backup.proto
deleted file mode 100644
--- a/shared/protos/backup.proto
+++ /dev/null
@@ -1,96 +0,0 @@
-syntax = "proto3";
-
-package backup;
-
-import "google/protobuf/empty.proto";
-
-/**
- * API - description
- * CreateNewBackup - This method is called when we want to create a new backup.
- * We send a new backup key encrypted with the user's password and also the
- * new compaction. New logs that will be sent from now on will be assigned to
- * this backup.
- * SendLog - User sends a new log to the backup service. The log is being
- * assigned to the latest(or desired) backup's compaction item.
- * RecoverBackupKey - Pulls data necessary for regenerating the backup key
- * on the client-side for the latest(or desired) backup
- * PullBackup - Fetches compaction + all logs assigned to it for the
- * specified backup(default is the last backup)
- */
-
-service BackupService {
- rpc CreateNewBackup(stream CreateNewBackupRequest) returns (stream CreateNewBackupResponse) {}
- rpc SendLog(stream SendLogRequest) returns (SendLogResponse) {}
- rpc RecoverBackupKey(stream RecoverBackupKeyRequest) returns (stream RecoverBackupKeyResponse) {}
- rpc PullBackup(PullBackupRequest) returns (stream PullBackupResponse) {}
- rpc AddAttachments(AddAttachmentsRequest) returns (google.protobuf.Empty) {}
-}
-
-// CreateNewBackup
-
-message CreateNewBackupRequest {
- oneof data {
- string userID = 1;
- string deviceID = 2;
- bytes keyEntropy = 3;
- bytes newCompactionHash = 4;
- bytes newCompactionChunk = 5;
- }
-}
-
-message CreateNewBackupResponse {
- string backupID = 1;
-}
-
-// SendLog
-
-message SendLogRequest {
- oneof data {
- string userID = 1;
- string backupID = 2;
- bytes logHash = 3;
- bytes logData = 4;
- }
-}
-
-message SendLogResponse {
- string logCheckpoint = 1;
-}
-
-// RecoverBackupKey
-
-message RecoverBackupKeyRequest {
- string userID = 1;
-}
-
-message RecoverBackupKeyResponse {
- string backupID = 4;
-}
-
-// PullBackup
-
-message PullBackupRequest {
- string userID = 1;
- string backupID = 2;
-}
-
-message PullBackupResponse {
- oneof id {
- string backupID = 1;
- string logID = 2;
- }
- oneof data {
- bytes compactionChunk = 3;
- bytes logChunk = 4;
- }
- optional string attachmentHolders = 5;
-}
-
-// AddAttachment
-
-message AddAttachmentsRequest {
- string userID = 1;
- string backupID = 2;
- string logID = 3;
- string holders = 4;
-}

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 9:05 AM (18 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2569991
Default Alt Text
D8796.id29840.diff (59 KB)

Event Timeline