diff --git a/services/backup/Cargo.lock b/services/backup/Cargo.lock --- a/services/backup/Cargo.lock +++ b/services/backup/Cargo.lock @@ -48,17 +48,6 @@ "syn 2.0.28", ] -[[package]] -name = "async-trait" -version = "0.1.59" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.105", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -383,52 +372,6 @@ "tracing", ] -[[package]] -name = "axum" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" -dependencies = [ - "async-trait", - "axum-core", - "bitflags", - "bytes", - "futures-util", - "http", - "http-body", - "hyper", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper", - "tower", - "tower-http", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http", - "http-body", - "mime", - "rustversion", - "tower-layer", - "tower-service", -] - [[package]] name = "backup" version = "0.1.0" @@ -442,11 +385,9 @@ "clap", "comm-services-lib", "once_cell", - "prost", "rand", "tokio", "tokio-stream", - "tonic", "tonic-build", "tracing", "tracing-futures", @@ -955,12 +896,6 @@ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" - [[package]] name = "httparse" version = "1.8.0" @@ -1012,18 +947,6 @@ "tokio-rustls", ] -[[package]] -name = "hyper-timeout" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" -dependencies = [ - "hyper", - "pin-project-lite", - "tokio", - "tokio-io-timeout", -] - [[package]] name = "iana-time-zone" version = "0.1.53" @@ -1158,24 +1081,12 @@ "regex-automata", ] -[[package]] -name = "matchit" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" - [[package]] name = "memchr" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "mime" -version = "0.3.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" - [[package]] name = "mio" version = "0.8.5" @@ -1569,12 +1480,6 @@ "base64", ] -[[package]] -name = "rustversion" -version = "1.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" - [[package]] name = "ryu" version = "1.0.11" @@ -1727,12 +1632,6 @@ "unicode-ident", ] -[[package]] -name = "sync_wrapper" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" - [[package]] name = "tempfile" version = "3.3.0" @@ -1820,16 +1719,6 @@ "windows-sys 0.42.0", ] -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-macros" version = "1.8.2" @@ -1877,38 +1766,6 @@ "tracing", ] -[[package]] -name = "tonic" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64", - "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "prost-derive", - "tokio", - "tokio-stream", - "tokio-util", - "tower", - "tower-layer", - "tower-service", - "tracing", - "tracing-futures", -] - [[package]] name = "tonic-build" version = "0.8.4" @@ -1930,37 +1787,14 @@ dependencies = [ "futures-core", "futures-util", - "indexmap", "pin-project", "pin-project-lite", - "rand", - "slab", "tokio", - "tokio-util", "tower-layer", "tower-service", "tracing", ] -[[package]] -name = "tower-http" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" -dependencies = [ - "bitflags", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", - "pin-project-lite", - "tower", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-layer" version = "0.3.2" diff --git a/services/backup/Cargo.toml b/services/backup/Cargo.toml --- a/services/backup/Cargo.toml +++ b/services/backup/Cargo.toml @@ -16,14 +16,12 @@ clap = { version = "4.0", features = ["derive", "env"] } comm-services-lib = { path = "../comm-services-lib" } once_cell = "1.17" -prost = "0.11" rand = "0.8.5" -tokio = { version = "1.24", features = ["rt-multi-thread"]} +tokio = { version = "1.24", features = ["rt-multi-thread", "macros"] } tokio-stream = "0.1" -tonic = "0.8" tracing = "0.1" -tracing-futures = { version = "0.2", features = ["futures-03"] } -tracing-subscriber = { version = "0.3", features = ["env-filter"]} +tracing-futures = { version = "0.2", features = ["futures-03"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1.2", features = ["v4"] } [build-dependencies] diff --git a/services/backup/Dockerfile b/services/backup/Dockerfile --- a/services/backup/Dockerfile +++ b/services/backup/Dockerfile @@ -21,8 +21,6 @@ # Copy actual application sources COPY services/backup . -COPY shared/protos/backup.proto ../../shared/protos/ -COPY shared/protos/blob.proto ../../shared/protos/ # Remove the previously-built binary so that only the application itself is # rebuilt diff --git a/services/backup/build.rs b/services/backup/build.rs deleted file mode 100644 --- a/services/backup/build.rs +++ /dev/null @@ -1,10 +0,0 @@ -fn main() { - println!("cargo:rerun-if-changed=src/main.rs"); - - println!("cargo:rerun-if-changed=../../shared/protos/backup.proto"); - println!("cargo:rerun-if-changed=../../shared/protos/blob.proto"); - tonic_build::compile_protos("../../shared/protos/backup.proto") - .expect("Failed to compile Backup protobuf file"); - tonic_build::compile_protos("../../shared/protos/blob.proto") - .expect("Failed to compile Blob protobuf file"); -} diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs deleted file mode 100644 --- a/services/backup/src/blob/downloader.rs +++ /dev/null @@ -1,82 +0,0 @@ -use anyhow::{bail, Result}; -use tokio::{ - sync::mpsc::{self, Receiver}, - task::JoinHandle, -}; -use tracing::{error, instrument, Instrument}; - -use super::{proto, BlobClient}; -use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; - -pub use proto::put_request::Data as PutRequestData; -pub use proto::{PutRequest, PutResponse}; - -/// The BlobDownloader instance is a handle holder of a Tokio task running the -/// actual blob client instance. The communication is done via a MPSC channel -/// and is one-sided - the data is transmitted from the client task to the -/// caller. Blob chunks received in response stream are waiting -/// for the channel to have capacity, so it is recommended to read them quickly -/// to make room for more. -/// The client task can be stopped and awaited for result via the `terminate()` -/// method. -pub struct BlobDownloader { - rx: Receiver>, - handle: JoinHandle>, -} - -impl BlobDownloader { - /// Connects to the Blob service and keeps the client connection open - /// in a separate Tokio task. - #[instrument(name = "blob_downloader")] - pub fn start(holder: String, mut blob_client: BlobClient) -> Self { - let (blob_res_tx, blob_res_rx) = - mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let client_thread = async move { - let response = blob_client - .get(proto::GetRequest { holder }) - .await - .map_err(|err| { - error!("Get request failed: {:?}", err); - err - })?; - let mut inner_response = response.into_inner(); - loop { - match inner_response.message().await? { - Some(data) => { - let data: Vec = data.data_chunk; - if let Err(err) = blob_res_tx.send(data).await { - bail!(err); - } - } - // Response stream was closed - None => break, - } - } - Ok(()) - }; - let handle = tokio::spawn(client_thread.in_current_span()); - - BlobDownloader { - rx: blob_res_rx, - handle, - } - } - - /// Receives the next chunk of blob data if ready or sleeps - /// until the data is available. - /// - /// Returns `None` when the transmission is finished, but this doesn't - /// determine if it was successful. After receiving `None`, the client - /// should be consumed by calling [`GetClient::terminate`] to handle - /// possible errors. - pub async fn next_chunk(&mut self) -> Option> { - self.rx.recv().await - } - - /// Stops receiving messages and awaits the client thread to exit - /// and returns its status. - pub async fn terminate(mut self) -> Result<()> { - self.rx.close(); - self.handle.await? - } -} diff --git a/services/backup/src/blob/mod.rs b/services/backup/src/blob/mod.rs deleted file mode 100644 --- a/services/backup/src/blob/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -mod proto { - tonic::include_proto!("blob"); -} -use proto::blob_service_client::BlobServiceClient; -pub use proto::put_request::Data as PutRequestData; -pub use proto::{PutRequest, PutResponse}; - -mod downloader; -mod uploader; -pub use downloader::*; -pub use uploader::*; - -pub type BlobClient = BlobServiceClient; - -/// Creates a new Blob service client instance. It does not attempt to connect -/// to the service until first use. -pub fn init_blob_client() -> BlobClient { - let service_url = &crate::CONFIG.blob_service_url; - let channel = - tonic::transport::Channel::from_static(service_url).connect_lazy(); - BlobServiceClient::new(channel) -} diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs deleted file mode 100644 --- a/services/backup/src/blob/uploader.rs +++ /dev/null @@ -1,168 +0,0 @@ -use anyhow::{anyhow, bail, Result}; -use tokio::{ - sync::mpsc::{self, Receiver, Sender}, - task::JoinHandle, -}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::Status; -use tracing::{error, instrument, Instrument}; - -use super::{proto, BlobClient}; -use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; - -pub use proto::put_request::Data as PutRequestData; -pub use proto::{PutRequest, PutResponse}; - -pub struct BlobUploader { - req_tx: Sender, - res_rx: Receiver, - handle: JoinHandle>, -} - -/// The BlobUploader instance is a handle holder of a Tokio task running the -/// actual blob client instance. The communication is done via two MPSC -/// channels - one sending requests to the client task, and another for sending -/// responses back to the caller. These messages should go in pairs -/// - one request for one response. -/// The client task can be stopped and awaited for result via the `terminate()` -/// method. -impl BlobUploader { - /// Connects to the Blob service and keeps the client connection open - /// in a separate Tokio task. - #[instrument(name = "blob_uploader")] - pub fn start(mut blob_client: BlobClient) -> Self { - let (blob_req_tx, blob_req_rx) = - mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let (blob_res_tx, blob_res_rx) = - mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let client_thread = async move { - match blob_client - .put(tonic::Request::new(ReceiverStream::new(blob_req_rx))) - .await - { - Ok(response) => { - let mut response_stream = response.into_inner(); - loop { - match response_stream.message().await? { - Some(response_message) => { - // warning: this will produce an error if there's more unread - // responses than MPSC_CHANNEL_BUFFER_CAPACITY - // so you should always read the response MPSC channel - // right after sending a request to dequeue the responses - // and make room for more. - // The PutClient::put() function should take care of that - if let Err(err) = blob_res_tx.try_send(response_message) { - bail!(err); - } - } - // Response stream was closed - None => break, - } - } - } - Err(err) => { - error!("Put request failed: {:?}", err); - bail!(err.to_string()); - } - }; - Ok(()) - }; - let handle = tokio::spawn(client_thread.in_current_span()); - - BlobUploader { - req_tx: blob_req_tx, - res_rx: blob_res_rx, - handle, - } - } - - /// Sends a [`PutRequest`] to the stream and waits for blob service - /// to send a response. After all data is sent, the [`PutClient::terminate`] - /// should be called to end the transmission and handle possible errors. - pub async fn put(&mut self, req: PutRequest) -> Result { - self.req_tx.try_send(req)?; - self - .res_rx - .recv() - .await - .ok_or_else(|| anyhow!("Blob client channel closed")) - } - - /// Convenience wrapper for - /// ``` - /// BlobClient::put(PutRequest { - /// data: Some(PutRequestData::DataChunk(data)) - /// }) - /// ``` - pub async fn put_data(&mut self, data: Vec) -> Result { - self - .put(PutRequest { - data: Some(PutRequestData::DataChunk(data)), - }) - .await - } - - /// Closes the connection and awaits the blob client task to finish. - pub async fn terminate(self) -> Result<()> { - drop(self.req_tx); - let thread_result = self.handle.await?; - thread_result - } -} - -/// Starts a put client instance. Fulfills request with blob hash and holder. -/// -/// `None` is returned if given `holder` already exists. -/// -/// ## Example -/// ``` -/// if let Some(mut client) = -/// start_simple_uploader("my_holder", "my_hash").await? { -/// let my_data = vec![1,2,3,4]; -/// let _ = client.put_data(my_data).await; -/// -/// let status = client.terminate().await; -/// } -/// ``` -pub async fn start_simple_uploader( - holder: &str, - blob_hash: &str, - blob_client: BlobClient, -) -> Result, Status> { - // start upload request - let mut uploader = BlobUploader::start(blob_client); - - // send holder - uploader - .put(PutRequest { - data: Some(PutRequestData::Holder(holder.to_string())), - }) - .await - .map_err(|err| { - error!("Failed to set blob holder: {:?}", err); - Status::aborted("Internal error") - })?; - - // send hash - let PutResponse { data_exists } = uploader - .put(PutRequest { - data: Some(PutRequestData::BlobHash(blob_hash.to_string())), - }) - .await - .map_err(|err| { - error!("Failed to set blob hash: {:?}", err); - Status::aborted("Internal error") - })?; - - // Blob with given holder already exists, nothing to do - if data_exists { - // the connection is already terminated by server, - // but it's good to await it anyway - uploader.terminate().await.map_err(|err| { - error!("Put client task closed with error: {:?}", err); - Status::aborted("Internal error") - })?; - return Ok(None); - } - Ok(Some(uploader)) -} diff --git a/services/backup/src/config.rs b/services/backup/src/config.rs --- a/services/backup/src/config.rs +++ b/services/backup/src/config.rs @@ -3,16 +3,12 @@ use tracing::info; use crate::constants::{ - DEFAULT_BLOB_SERVICE_URL, DEFAULT_GRPC_SERVER_PORT, DEFAULT_LOCALSTACK_URL, - SANDBOX_ENV_VAR, + DEFAULT_BLOB_SERVICE_URL, DEFAULT_LOCALSTACK_URL, SANDBOX_ENV_VAR, }; #[derive(Parser)] #[command(version, about, long_about = None)] pub struct AppConfig { - /// gRPC server listening port - #[arg(long = "port", default_value_t = DEFAULT_GRPC_SERVER_PORT)] - pub listening_port: u64, /// Run the service in sandbox #[arg(long = "sandbox", default_value_t = false)] // support the env var for compatibility reasons diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -31,8 +31,6 @@ pub const GRPC_METADATA_SIZE_PER_MESSAGE: usize = 5; // Configuration defaults - -pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051; pub const DEFAULT_LOCALSTACK_URL: &str = "http://localhost:4566"; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs --- a/services/backup/src/main.rs +++ b/services/backup/src/main.rs @@ -1,17 +1,10 @@ use anyhow::Result; -use std::net::SocketAddr; -use tonic::transport::Server; -use tracing::{info, Level}; +use tracing::Level; use tracing_subscriber::EnvFilter; -use crate::blob::BlobClient; -use crate::service::{BackupServiceServer, MyBackupService}; - -pub mod blob; pub mod config; pub mod constants; pub mod database; -pub mod service; pub mod utils; // re-export this to be available as crate::CONFIG @@ -28,30 +21,13 @@ Ok(()) } -async fn run_grpc_server( - db: database::DatabaseClient, - blob_client: BlobClient, -) -> Result<()> { - let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?; - let backup_service = MyBackupService::new(db, blob_client); - - info!("Starting gRPC server listening at {}", addr.to_string()); - Server::builder() - .add_service(BackupServiceServer::new(backup_service)) - .serve(addr) - .await?; - - Ok(()) -} - #[tokio::main] async fn main() -> Result<()> { config::parse_cmdline_args(); configure_logging()?; let aws_config = config::load_aws_config().await; - let db = database::DatabaseClient::new(&aws_config); - let blob_client = blob::init_blob_client(); + let _db = database::DatabaseClient::new(&aws_config); - run_grpc_server(db, blob_client).await + Ok(()) } diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs deleted file mode 100644 --- a/services/backup/src/service/handlers/add_attachments.rs +++ /dev/null @@ -1,208 +0,0 @@ -use tonic::Status; -use tracing::debug; -use tracing::error; - -use super::handle_db_error; -use super::proto; -use crate::{ - blob::BlobClient, - constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, - database::{DatabaseClient, LogItem}, -}; - -pub async fn handle_add_attachments( - db: &DatabaseClient, - blob_client: &BlobClient, - request: proto::AddAttachmentsRequest, -) -> Result<(), Status> { - let proto::AddAttachmentsRequest { - user_id, - backup_id, - log_id, - holders, - } = request; - - if user_id.is_empty() { - return Err(Status::invalid_argument( - "user id required but not provided", - )); - } - if backup_id.is_empty() { - return Err(Status::invalid_argument( - "backup id required but not provided", - )); - } - if holders.is_empty() { - return Err(Status::invalid_argument( - "holders required but not provided", - )); - } - - if log_id.is_empty() { - let backup_item_result = db - .find_backup_item(&user_id, &backup_id) - .await - .map_err(handle_db_error)?; - let mut backup_item = backup_item_result.ok_or_else(|| { - debug!("Backup item not found"); - Status::not_found("Backup item not found") - })?; - - add_new_attachments(&mut backup_item.attachment_holders, &holders); - - db.put_backup_item(backup_item) - .await - .map_err(handle_db_error)?; - } else { - let log_item_result = db - .find_log_item(&backup_id, &log_id) - .await - .map_err(handle_db_error)?; - let mut log_item = log_item_result.ok_or_else(|| { - debug!("Log item not found"); - Status::not_found("Log item not found") - })?; - - add_new_attachments(&mut log_item.attachment_holders, &holders); - - // log item too large for database, move it to blob-service stroage - if !log_item.persisted_in_blob - && log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT - { - debug!("Log item too large. Persisting in blob service..."); - log_item = move_to_blob(log_item, blob_client).await?; - } - - db.put_log_item(log_item).await.map_err(handle_db_error)?; - } - - Ok(()) -} - -async fn move_to_blob( - log_item: LogItem, - blob_client: &BlobClient, -) -> Result { - let holder = crate::utils::generate_blob_holder( - &log_item.data_hash, - &log_item.backup_id, - Some(&log_item.log_id), - ); - - if let Some(mut uploader) = crate::blob::start_simple_uploader( - &holder, - &log_item.data_hash, - blob_client.clone(), - ) - .await? - { - let blob_chunk = log_item.value.into_bytes(); - uploader.put_data(blob_chunk).await.map_err(|err| { - error!("Failed to upload data chunk: {:?}", err); - Status::aborted("Internal error") - })?; - - uploader.terminate().await.map_err(|err| { - error!("Put client task closed with error: {:?}", err); - Status::aborted("Internal error") - })?; - } else { - debug!("Blob holder for log ID={} already exists", &log_item.log_id); - } - - Ok(LogItem { - persisted_in_blob: true, - value: holder, - ..log_item - }) -} - -/// Modifies the [`current_holders_str`] by appending attachment holders -/// contained in [`new_holders`]. Removes duplicates. Both arguments -/// are expected to be [`ATTACHMENT_HOLDER_SEPARATOR`] separated strings. -fn add_new_attachments(current_holders_str: &mut String, new_holders: &str) { - let mut current_holders = parse_attachment_holders(current_holders_str); - - let new_holders = parse_attachment_holders(new_holders); - current_holders.extend(new_holders); - - *current_holders_str = current_holders - .into_iter() - .collect::>() - .join(ATTACHMENT_HOLDER_SEPARATOR); -} - -/// Parses an [`ATTACHMENT_HOLDER_SEPARATOR`] separated string into a `HashSet` -/// of attachment holder string slices. -fn parse_attachment_holders( - holders_str: &str, -) -> std::collections::HashSet<&str> { - holders_str - .split(ATTACHMENT_HOLDER_SEPARATOR) - .filter(|holder| !holder.is_empty()) - .collect() -} - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashSet; - - #[test] - fn test_parse_attachments() { - let holders = "h1;h2;h3"; - let expected = HashSet::from(["h1", "h2", "h3"]); - assert_eq!(parse_attachment_holders(holders), expected); - } - - #[test] - fn test_empty_attachments() { - let actual = parse_attachment_holders(""); - let expected = HashSet::new(); - assert_eq!(actual, expected); - } - - #[test] - fn test_add_attachments() { - let mut current_holders = "holder1;holder2".to_string(); - let new_holders = "holder3;holder4"; - add_new_attachments(&mut current_holders, new_holders); - assert_eq!( - parse_attachment_holders(¤t_holders), - HashSet::from(["holder1", "holder2", "holder3", "holder4"]) - ); - } - - #[test] - fn test_add_to_empty() { - let mut current_holders = String::new(); - let new_holders = "holder3;holder4"; - add_new_attachments(&mut current_holders, new_holders); - assert_eq!( - parse_attachment_holders(¤t_holders), - HashSet::from(["holder3", "holder4"]) - ); - } - - #[test] - fn test_add_none() { - let mut current_holders = "holder1;holder2".to_string(); - let new_holders = ""; - add_new_attachments(&mut current_holders, new_holders); - assert_eq!( - parse_attachment_holders(¤t_holders), - HashSet::from(["holder1", "holder2"]) - ); - } - - #[test] - fn test_remove_duplicates() { - let mut current_holders = "holder1;holder2".to_string(); - let new_holders = "holder2;holder3"; - add_new_attachments(&mut current_holders, new_holders); - assert_eq!( - parse_attachment_holders(¤t_holders), - HashSet::from(["holder1", "holder2", "holder3"]) - ); - } -} diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs deleted file mode 100644 --- a/services/backup/src/service/handlers/create_backup.rs +++ /dev/null @@ -1,234 +0,0 @@ -use tonic::Status; -use tracing::{debug, error, trace, warn}; - -use crate::{ - blob::{start_simple_uploader, BlobClient, BlobUploader}, - database::{BackupItem, DatabaseClient}, - service::proto, -}; - -use super::handle_db_error; - -type CreateBackupResult = Result; - -enum HandlerState { - /// Initial state. Handler is receiving non-data inputs - ReceivingParams, - /// Handler is receiving data chunks - ReceivingData { uploader: BlobUploader }, - /// A special case when Blob service claims that a blob with given - /// [`CreateBackupHandler::data_hash`] already exists - DataAlreadyExists, -} - -pub struct CreateBackupHandler { - // flow control - pub should_close_stream: bool, - - // inputs - user_id: Option, - device_id: Option, - key_entropy: Option>, - data_hash: Option, - - // client instances - db: DatabaseClient, - blob_client: BlobClient, - - // internal state - state: HandlerState, - backup_id: String, - holder: Option, -} - -impl CreateBackupHandler { - pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self { - CreateBackupHandler { - should_close_stream: false, - user_id: None, - device_id: None, - key_entropy: None, - data_hash: None, - db, - blob_client, - state: HandlerState::ReceivingParams, - backup_id: String::new(), - holder: None, - } - } - - pub async fn handle_user_id( - &mut self, - user_id: String, - ) -> CreateBackupResult { - if self.user_id.is_some() { - warn!("user ID already provided"); - return Err(Status::invalid_argument("User ID already provided")); - } - self.user_id = Some(user_id); - self.handle_internal().await - } - pub async fn handle_device_id( - &mut self, - device_id: String, - ) -> CreateBackupResult { - if self.device_id.is_some() { - warn!("Device ID already provided"); - return Err(Status::invalid_argument("Device ID already provided")); - } - tracing::Span::current().record("device_id", &device_id); - self.device_id = Some(device_id); - self.handle_internal().await - } - pub async fn handle_key_entropy( - &mut self, - key_entropy: Vec, - ) -> CreateBackupResult { - if self.key_entropy.is_some() { - warn!("Key entropy already provided"); - return Err(Status::invalid_argument("Key entropy already provided")); - } - self.key_entropy = Some(key_entropy); - self.handle_internal().await - } - pub async fn handle_data_hash( - &mut self, - data_hash: Vec, - ) -> CreateBackupResult { - if self.data_hash.is_some() { - warn!("Data hash already provided"); - return Err(Status::invalid_argument("Data hash already provided")); - } - let hash_str = String::from_utf8(data_hash).map_err(|err| { - error!("Failed to convert data_hash into string: {:?}", err); - Status::aborted("Unexpected error") - })?; - debug!("Received data hash: {}", &hash_str); - self.data_hash = Some(hash_str); - self.handle_internal().await - } - - pub async fn handle_data_chunk( - &mut self, - data_chunk: Vec, - ) -> CreateBackupResult { - let HandlerState::ReceivingData { ref mut uploader } = self.state else { - self.should_close_stream = true; - error!("Data chunk sent before other inputs"); - return Err(Status::invalid_argument( - "Data chunk sent before other inputs", - )); - }; - - // empty chunk ends transmission - if data_chunk.is_empty() { - self.should_close_stream = true; - return Ok(proto::CreateNewBackupResponse { - backup_id: self.backup_id.clone(), - }); - } - - trace!("Received {} bytes of data", data_chunk.len()); - uploader.put_data(data_chunk).await.map_err(|err| { - error!("Failed to upload data chunk: {:?}", err); - Status::aborted("Internal error") - })?; - - Ok(proto::CreateNewBackupResponse { - // actual Backup ID should be sent only once, the time it is generated - // see handle_internal() - backup_id: String::new(), - }) - } - - /// This function should be called after the input stream is finished. - pub async fn finish(self) -> Result<(), Status> { - match self.state { - HandlerState::ReceivingParams => { - // client probably aborted early - trace!("Nothing to store in database. Finishing early"); - return Ok(()); - } - HandlerState::ReceivingData { uploader } => { - uploader.terminate().await.map_err(|err| { - error!("Uploader task closed with error: {:?}", err); - Status::aborted("Internal error") - })?; - } - HandlerState::DataAlreadyExists => (), - } - - let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else { - error!("Holder / UserID absent in data mode. This should never happen!"); - return Err(Status::failed_precondition("Internal error")); - }; - if self.backup_id.is_empty() { - error!("Backup ID was not generated. This should never happen!"); - return Err(Status::failed_precondition("Internal error")); - } - let backup_item = BackupItem::new(user_id, self.backup_id, holder); - - self - .db - .put_backup_item(backup_item) - .await - .map_err(handle_db_error)?; - - Ok(()) - } - - // internal param handler helper - async fn handle_internal(&mut self) -> CreateBackupResult { - if !matches!(self.state, HandlerState::ReceivingParams) { - error!("CreateBackupHandler already received all non-data params."); - return Err(Status::failed_precondition("Backup data chunk expected")); - } - - // all non-data inputs must be set before receiving backup data chunks - let (Some(data_hash), Some(device_id), Some(_), Some(_)) = ( - self.data_hash.as_ref(), - self.device_id.as_ref(), - self.user_id.as_ref(), - self.key_entropy.as_ref(), - ) else { - // return empty backup ID when inputs are incomplete - return Ok(proto::CreateNewBackupResponse { - backup_id: "".to_string(), - }); - }; - - let backup_id = generate_backup_id(device_id); - let holder = - crate::utils::generate_blob_holder(data_hash, &backup_id, None); - self.backup_id = backup_id.clone(); - self.holder = Some(holder.clone()); - tracing::Span::current().record("backup_id", &backup_id); - tracing::Span::current().record("blob_holder", &holder); - - match start_simple_uploader(&holder, data_hash, self.blob_client.clone()) - .await? - { - Some(uploader) => { - self.state = HandlerState::ReceivingData { uploader }; - trace!("Everything prepared, waiting for data..."); - } - None => { - // Blob with given data_hash already exists - debug!("Blob already exists, finishing"); - self.should_close_stream = true; - self.state = HandlerState::DataAlreadyExists; - } - }; - - Ok(proto::CreateNewBackupResponse { backup_id }) - } -} - -/// Generates ID for a new backup -fn generate_backup_id(device_id: &str) -> String { - format!( - "{device_id}_{timestamp}", - device_id = device_id, - timestamp = chrono::Utc::now().timestamp_millis() - ) -} diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs deleted file mode 100644 --- a/services/backup/src/service/handlers/pull_backup.rs +++ /dev/null @@ -1,330 +0,0 @@ -use async_stream::try_stream; -use tokio_stream::{Stream, StreamExt}; -use tonic::Status; -use tracing::{debug, error, trace, warn}; -use tracing_futures::Instrument; - -use super::handle_db_error; -use super::proto::{self, PullBackupResponse}; -use crate::{ - blob::{BlobClient, BlobDownloader}, - constants::{ - BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, - GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, - LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_LOG_ID, - }, - database::{BackupItem, DatabaseClient, LogItem}, -}; - -pub struct PullBackupHandler { - blob_client: BlobClient, - backup_item: BackupItem, - logs: Vec, -} - -impl PullBackupHandler { - pub async fn new( - db: &DatabaseClient, - blob_client: &BlobClient, - request: proto::PullBackupRequest, - ) -> Result { - let proto::PullBackupRequest { user_id, backup_id } = request; - let backup_item = db - .find_backup_item(&user_id, &backup_id) - .await - .map_err(handle_db_error)? - .ok_or_else(|| { - debug!("Backup item not found"); - Status::not_found("Backup item not found") - })?; - - let backup_id = backup_item.backup_id.as_str(); - let logs = db - .find_log_items_for_backup(backup_id) - .await - .map_err(handle_db_error)?; - - Ok(PullBackupHandler { - backup_item, - logs, - blob_client: blob_client.clone(), - }) - } - - /// Consumes the handler and provides a response `Stream`. The stream will - /// produce the following in order: - /// - Backup compaction data chunks - /// - Backup logs - /// - Whole log, if stored in db - /// - Log chunks, if stored in blob - pub fn into_response_stream( - self, - ) -> impl Stream> { - use proto::pull_backup_response::*; - - try_stream! { - debug!("Pulling backup..."); - { - let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone()); - tokio::pin!(compaction_stream); - while let Some(response) = compaction_stream.try_next().await? { - yield response; - } - } - trace!("Backup data pull complete."); - - if self.logs.is_empty() { - debug!("No logs to pull. Finishing"); - return; - } - - debug!("Pulling logs..."); - for log in self.logs { - trace!("Pulling log ID={}", &log.log_id); - let span = tracing::trace_span!("log", log_id = &log.log_id); - - if log.persisted_in_blob { - trace!(parent: &span, "Log persisted in blob"); - let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span); - tokio::pin!(log_data_stream); - while let Some(response) = log_data_stream.try_next().await? { - yield response; - } - } else { - trace!(parent: &span, "Log persisted in database"); - yield proto::PullBackupResponse { - attachment_holders: Some(log.attachment_holders), - id: Some(Id::LogId(log.log_id)), - data: Some(Data::LogChunk(log.value.into_bytes())), - }; - } - } - trace!("Pulled all logs, done"); - } - } -} - -/// Downloads a blob-stored [`BlobStoredItem`] and streams its content into -/// stream of [`PullBackupResponse`] objects, handles gRPC message size details. -fn data_stream( - item: &Item, - blob_client: BlobClient, -) -> impl Stream> + '_ -where - Item: BlobStoredItem, -{ - try_stream! { - let mut buffer = ResponseBuffer::default(); - let mut downloader = - BlobDownloader::start(item.get_holder().to_string(), blob_client); - - let mut is_first_chunk = true; - loop { - if !buffer.is_saturated() { - if let Some(data) = downloader.next_chunk().await { - buffer.put(data); - } - } - if buffer.is_empty() { - break; - } - - // get data chunk, shortened by length of metadata - let padding = item.metadata_size(is_first_chunk); - let chunk = buffer.get_chunk(padding); - - trace!( - with_attachments = is_first_chunk, - data_size = chunk.len(), - "Sending data chunk" - ); - yield item.to_response(chunk, is_first_chunk); - is_first_chunk = false; - } - - downloader.terminate().await.map_err(|err| { - error!("Blob downloader failed: {:?}", err); - Status::aborted("Internal error") - })?; - } -} - -/// Represents downloadable item stored in Blob service -trait BlobStoredItem { - // Blob holder representing this item - fn get_holder(&self) -> &str; - - /// Generates a gRPC response for given `data_chunk`. - /// The response may be in extended version, with `include_extra_info`, - /// ususally sent with first chunk - fn to_response( - &self, - data_chunk: Vec, - include_extra_info: bool, - ) -> proto::PullBackupResponse; - - /// Size in bytes of non-data fields contained in response message. - fn metadata_size(&self, include_extra_info: bool) -> usize; -} - -impl BlobStoredItem for BackupItem { - fn get_holder(&self) -> &str { - &self.compaction_holder - } - - fn to_response( - &self, - data_chunk: Vec, - include_extra_info: bool, - ) -> proto::PullBackupResponse { - use proto::pull_backup_response::*; - let attachment_holders = if include_extra_info { - Some(self.attachment_holders.clone()) - } else { - None - }; - proto::PullBackupResponse { - id: Some(Id::BackupId(self.backup_id.clone())), - data: Some(Data::CompactionChunk(data_chunk)), - attachment_holders, - } - } - - fn metadata_size(&self, include_extra_info: bool) -> usize { - let mut extra_bytes: usize = 0; - extra_bytes += BACKUP_TABLE_FIELD_BACKUP_ID.as_bytes().len(); - extra_bytes += self.backup_id.as_bytes().len(); - if include_extra_info { - extra_bytes += BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len(); - extra_bytes += self.attachment_holders.as_bytes().len(); - } - extra_bytes - } -} - -impl BlobStoredItem for LogItem { - fn get_holder(&self) -> &str { - &self.value - } - - fn to_response( - &self, - data_chunk: Vec, - include_extra_info: bool, - ) -> proto::PullBackupResponse { - use proto::pull_backup_response::*; - let attachment_holders = if include_extra_info { - Some(self.attachment_holders.clone()) - } else { - None - }; - proto::PullBackupResponse { - id: Some(Id::LogId(self.log_id.clone())), - data: Some(Data::LogChunk(data_chunk)), - attachment_holders, - } - } - - fn metadata_size(&self, include_extra_info: bool) -> usize { - let mut extra_bytes: usize = 0; - extra_bytes += LOG_TABLE_FIELD_LOG_ID.as_bytes().len(); - extra_bytes += self.log_id.as_bytes().len(); - if include_extra_info { - extra_bytes += LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len(); - extra_bytes += self.attachment_holders.as_bytes().len(); - } - extra_bytes - } -} - -/// A utility structure that buffers downloaded data and allows to retrieve it -/// as chunks of arbitrary size, not greater than provided `limit`. -struct ResponseBuffer { - buf: Vec, - limit: usize, -} - -impl Default for ResponseBuffer { - /// Buffer size defaults to max usable gRPC message size - fn default() -> Self { - ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE) - } -} - -impl ResponseBuffer { - pub fn new(limit: usize) -> Self { - ResponseBuffer { - buf: Vec::new(), - limit, - } - } - - pub fn put(&mut self, data: Vec) { - if data.len() > self.limit { - warn!("Data saved to buffer is larger than chunk limit."); - } - - self.buf.extend(data); - } - - /// Gets chunk of size `limit - padding` and leaves remainder in buffer - pub fn get_chunk(&mut self, padding: usize) -> Vec { - let mut chunk = std::mem::take(&mut self.buf); - - let target_size = self.limit - padding; - if chunk.len() > target_size { - // after this operation, chunk=0..target_size, self.buf=target_size..end - self.buf = chunk.split_off(target_size); - } - return chunk; - } - - /// Does buffer length exceed given limit - pub fn is_saturated(&self) -> bool { - self.buf.len() >= self.limit - } - - pub fn is_empty(&self) -> bool { - self.buf.is_empty() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - const LIMIT: usize = 100; - - #[test] - fn test_response_buffer() { - let mut buffer = ResponseBuffer::new(LIMIT); - assert_eq!(buffer.is_empty(), true); - - // put 80 bytes of data - buffer.put(vec![0u8; 80]); - assert_eq!(buffer.is_empty(), false); - assert_eq!(buffer.is_saturated(), false); - - // put next 80 bytes, should be saturated as 160 > 100 - buffer.put(vec![0u8; 80]); - let buf_size = buffer.buf.len(); - assert_eq!(buffer.is_saturated(), true); - assert_eq!(buf_size, 160); - - // get one chunk - let padding: usize = 10; - let expected_chunk_size = LIMIT - padding; - let chunk = buffer.get_chunk(padding); - assert_eq!(chunk.len(), expected_chunk_size); // 90 - - // buffer should not be saturated now (160 - 90 < 100) - let remaining_buf_size = buffer.buf.len(); - assert_eq!(remaining_buf_size, buf_size - expected_chunk_size); - assert_eq!(buffer.is_saturated(), false); - - // get last chunk - let chunk = buffer.get_chunk(padding); - assert_eq!(chunk.len(), remaining_buf_size); - assert_eq!(buffer.is_empty(), true); - } -} diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs deleted file mode 100644 --- a/services/backup/src/service/handlers/send_log.rs +++ /dev/null @@ -1,272 +0,0 @@ -use tonic::Status; -use tracing::{debug, error, trace, warn}; -use uuid::Uuid; - -use super::handle_db_error; -use crate::{ - blob::{BlobClient, BlobUploader}, - constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, - database::{DatabaseClient, LogItem}, - service::proto::SendLogResponse, -}; - -enum LogPersistence { - /// Log entirely stored in DynamoDB database - DB, - /// Log contents stored with Blob service - BLOB { holder: String }, -} - -pub struct SendLogHandler { - // flow control - pub should_close_stream: bool, - - // inputs - user_id: Option, - backup_id: Option, - log_hash: Option, - - // internal state - log_id: Option, - log_buffer: Vec, - persistence_method: LogPersistence, - should_receive_data: bool, - - // client instances - db: DatabaseClient, - blob_client: BlobClient, - uploader: Option, -} - -impl SendLogHandler { - pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self { - SendLogHandler { - db: db.clone(), - blob_client: blob_client.clone(), - uploader: None, - user_id: None, - backup_id: None, - log_hash: None, - log_id: None, - log_buffer: Vec::new(), - persistence_method: LogPersistence::DB, - should_receive_data: false, - should_close_stream: false, - } - } - - pub async fn handle_user_id( - &mut self, - user_id: String, - ) -> Result<(), Status> { - if self.user_id.is_some() { - warn!("user ID already provided"); - return Err(Status::invalid_argument("User ID already provided")); - } - self.user_id = Some(user_id); - self.handle_internal().await - } - pub async fn handle_backup_id( - &mut self, - backup_id: String, - ) -> Result<(), Status> { - if self.backup_id.is_some() { - warn!("backup ID already provided"); - return Err(Status::invalid_argument("Backup ID already provided")); - } - tracing::Span::current().record("backup_id", &backup_id); - self.backup_id = Some(backup_id); - self.handle_internal().await - } - pub async fn handle_log_hash( - &mut self, - log_hash: Vec, - ) -> Result<(), Status> { - if self.log_hash.is_some() { - warn!("Log hash already provided"); - return Err(Status::invalid_argument("Log hash already provided")); - } - let hash_str = String::from_utf8(log_hash).map_err(|err| { - error!("Failed to convert data_hash into string: {:?}", err); - Status::aborted("Unexpected error") - })?; - debug!("Received log hash: {}", &hash_str); - self.log_hash = Some(hash_str); - self.handle_internal().await - } - pub async fn handle_log_data( - &mut self, - data_chunk: Vec, - ) -> Result<(), Status> { - if !self.should_receive_data || self.log_id.is_none() { - self.should_close_stream = true; - error!("Data chunk sent before other inputs"); - return Err(Status::invalid_argument( - "Data chunk sent before other inputs", - )); - } - - // empty chunk ends transmission - if data_chunk.is_empty() { - self.should_close_stream = true; - return Ok(()); - } - - match self.persistence_method { - LogPersistence::DB => { - self.log_buffer.extend(data_chunk); - self.ensure_size_constraints().await?; - } - LogPersistence::BLOB { .. } => { - let Some(client) = self.uploader.as_mut() else { - self.should_close_stream = true; - error!("Put client uninitialized. This should never happen!"); - return Err(Status::failed_precondition("Internal error")); - }; - client.put_data(data_chunk).await.map_err(|err| { - error!("Failed to upload data chunk: {:?}", err); - Status::aborted("Internal error") - })?; - } - } - Ok(()) - } - - pub async fn finish(self) -> Result { - if let Some(client) = self.uploader { - client.terminate().await.map_err(|err| { - error!("Put client task closed with error: {:?}", err); - Status::aborted("Internal error") - })?; - } else { - trace!("No uploader initialized. Skipping termination"); - } - - if !self.should_receive_data { - // client probably aborted early - trace!("Nothing to store in database. Finishing early"); - return Ok(SendLogResponse { - log_checkpoint: "".to_string(), - }); - } - - let (Some(backup_id), Some(log_id), Some(data_hash)) = ( - self.backup_id, - self.log_id, - self.log_hash - ) else { - error!("Log info absent in data mode. This should never happen!"); - return Err(Status::failed_precondition("Internal error")); - }; - - let (log_value, persisted_in_blob) = match self.persistence_method { - LogPersistence::BLOB { holder } => (holder, true), - LogPersistence::DB => { - let contents = String::from_utf8(self.log_buffer).map_err(|err| { - error!("Failed to convert log contents data into string: {:?}", err); - Status::aborted("Unexpected error") - })?; - (contents, false) - } - }; - - let log_item = LogItem { - backup_id, - log_id: log_id.clone(), - persisted_in_blob, - value: log_value, - attachment_holders: String::new(), - data_hash, - }; - - self - .db - .put_log_item(log_item) - .await - .map_err(handle_db_error)?; - - Ok(SendLogResponse { - log_checkpoint: log_id, - }) - } - - // internal param handler helper - async fn handle_internal(&mut self) -> Result<(), Status> { - if self.should_receive_data { - error!("SendLogHandler is already expecting data chunks"); - return Err(Status::failed_precondition("Log data chunk expected")); - } - - // all non-data inputs must be set before receiving log contents - let (Some(backup_id), Some(_), Some(_)) = ( - self.backup_id.as_ref(), - self.user_id.as_ref(), - self.log_hash.as_ref() - ) else { return Ok(()); }; - - let log_id = generate_log_id(backup_id); - tracing::Span::current().record("log_id", &log_id); - self.log_id = Some(log_id); - - trace!("Everything prepared, waiting for data..."); - self.should_receive_data = true; - Ok(()) - } - - /// Ensures log fits db size constraints. If not, it is moved to blob - /// persistence - async fn ensure_size_constraints(&mut self) -> Result<(), Status> { - let (Some(backup_id), Some(log_id), Some(log_hash)) = ( - self.backup_id.as_ref(), - self.log_id.as_ref(), - self.log_hash.as_ref() - ) else { - self.should_close_stream = true; - error!("Log info absent in data mode. This should never happen!"); - return Err(Status::failed_precondition("Internal error")); - }; - - let log_size = LogItem::size_from_components( - backup_id, - log_id, - log_hash, - &self.log_buffer, - ); - if log_size > LOG_DATA_SIZE_DATABASE_LIMIT { - debug!("Log too large, switching persistence to Blob"); - let holder = - crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); - match crate::blob::start_simple_uploader( - &holder, - &log_hash, - self.blob_client.clone(), - ) - .await? - { - Some(mut uploader) => { - let blob_chunk = std::mem::take(&mut self.log_buffer); - uploader.put_data(blob_chunk).await.map_err(|err| { - error!("Failed to upload data chunk: {:?}", err); - Status::aborted("Internal error") - })?; - self.uploader = Some(uploader); - } - None => { - debug!("Log hash already exists"); - self.should_close_stream = true; - } - } - self.persistence_method = LogPersistence::BLOB { holder }; - } - Ok(()) - } -} - -fn generate_log_id(backup_id: &str) -> String { - format!( - "{backup_id}{sep}{uuid}", - backup_id = backup_id, - sep = ID_SEPARATOR, - uuid = Uuid::new_v4() - ) -} diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs deleted file mode 100644 --- a/services/backup/src/service/mod.rs +++ /dev/null @@ -1,241 +0,0 @@ -use aws_sdk_dynamodb::Error as DynamoDBError; -use comm_services_lib::database::Error as DBError; -use proto::backup_service_server::BackupService; -use std::pin::Pin; -use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tonic::{Request, Response, Status}; -use tracing::{debug, error, info, instrument, trace, warn}; -use tracing_futures::Instrument; - -use crate::{ - blob::BlobClient, constants::MPSC_CHANNEL_BUFFER_CAPACITY, - database::DatabaseClient, -}; - -mod proto { - tonic::include_proto!("backup"); -} -pub use proto::backup_service_server::BackupServiceServer; - -/// submodule containing gRPC endpoint handler implementations -mod handlers { - pub(super) mod add_attachments; - pub(super) mod create_backup; - pub(super) mod pull_backup; - pub(super) mod send_log; - - // re-exports for convenient usage in handlers - pub(self) use super::handle_db_error; - pub(self) use super::proto; -} -use self::handlers::create_backup::CreateBackupHandler; -use self::handlers::pull_backup::PullBackupHandler; -use self::handlers::send_log::SendLogHandler; - -pub struct MyBackupService { - db: DatabaseClient, - blob_client: BlobClient, -} - -impl MyBackupService { - pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self { - MyBackupService { - db: db_client, - blob_client, - } - } -} - -// gRPC implementation -#[tonic::async_trait] -impl BackupService for MyBackupService { - type CreateNewBackupStream = Pin< - Box< - dyn Stream> + Send, - >, - >; - - #[instrument(skip_all, fields(device_id, backup_id, blob_holder))] - async fn create_new_backup( - &self, - request: Request>, - ) -> Result, Status> { - use proto::create_new_backup_request::Data::*; - - info!("CreateNewBackup request: {:?}", request); - let mut in_stream = request.into_inner(); - let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let db = self.db.clone(); - let blob_client = self.blob_client.clone(); - let worker = async move { - let mut handler = CreateBackupHandler::new(db, blob_client); - while let Some(message) = in_stream.next().await { - let response = match message { - Ok(proto::CreateNewBackupRequest { - data: Some(UserId(user_id)), - }) => handler.handle_user_id(user_id).await, - Ok(proto::CreateNewBackupRequest { - data: Some(DeviceId(device_id)), - }) => handler.handle_device_id(device_id).await, - Ok(proto::CreateNewBackupRequest { - data: Some(KeyEntropy(key_entropy)), - }) => handler.handle_key_entropy(key_entropy).await, - Ok(proto::CreateNewBackupRequest { - data: Some(NewCompactionHash(hash)), - }) => handler.handle_data_hash(hash).await, - Ok(proto::CreateNewBackupRequest { - data: Some(NewCompactionChunk(chunk)), - }) => handler.handle_data_chunk(chunk).await, - unexpected => { - error!("Received an unexpected request: {:?}", unexpected); - Err(Status::unknown("unknown error")) - } - }; - - trace!("Sending response: {:?}", response); - if let Err(e) = tx.send(response).await { - error!("Response was dropped: {}", e); - break; - } - if handler.should_close_stream { - trace!("Handler requested to close stream"); - break; - } - } - if let Err(status) = handler.finish().await { - trace!("Sending error response: {:?}", status); - let _ = tx.send(Err(status)).await; - } - debug!("Request finished processing"); - }; - tokio::spawn(worker.in_current_span()); - - let out_stream = ReceiverStream::new(rx); - Ok(Response::new( - Box::pin(out_stream) as Self::CreateNewBackupStream - )) - } - - #[instrument(skip_all, fields(backup_id, log_id))] - async fn send_log( - &self, - request: Request>, - ) -> Result, Status> { - use proto::send_log_request::Data::*; - - info!("SendLog request: {:?}", request); - let mut handler = SendLogHandler::new(&self.db, &self.blob_client); - - let mut in_stream = request.into_inner(); - while let Some(message) = in_stream.next().await { - let result = match message { - Ok(proto::SendLogRequest { - data: Some(UserId(user_id)), - }) => handler.handle_user_id(user_id).await, - Ok(proto::SendLogRequest { - data: Some(BackupId(backup_id)), - }) => handler.handle_backup_id(backup_id).await, - Ok(proto::SendLogRequest { - data: Some(LogHash(log_hash)), - }) => handler.handle_log_hash(log_hash).await, - Ok(proto::SendLogRequest { - data: Some(LogData(chunk)), - }) => handler.handle_log_data(chunk).await, - unexpected => { - error!("Received an unexpected request: {:?}", unexpected); - Err(Status::unknown("unknown error")) - } - }; - - if let Err(err) = result { - error!("An error occurred when processing request: {:?}", err); - return Err(err); - } - if handler.should_close_stream { - trace!("Handler requested to close request stream"); - break; - } - } - - let response = handler.finish().await; - debug!("Finished. Sending response: {:?}", response); - response.map(|response_body| Response::new(response_body)) - } - - type RecoverBackupKeyStream = Pin< - Box< - dyn Stream> + Send, - >, - >; - - #[instrument(skip(self))] - async fn recover_backup_key( - &self, - _request: Request>, - ) -> Result, Status> { - Err(Status::unimplemented("unimplemented")) - } - - type PullBackupStream = Pin< - Box> + Send>, - >; - - #[instrument(skip_all, fields(backup_id = &request.get_ref().backup_id))] - async fn pull_backup( - &self, - request: Request, - ) -> Result, Status> { - info!("PullBackup request: {:?}", request); - - let handler = - PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner()) - .await?; - - let stream = handler.into_response_stream().in_current_span(); - Ok(Response::new(Box::pin(stream) as Self::PullBackupStream)) - } - - #[instrument(skip_all, - fields( - backup_id = &request.get_ref().backup_id, - log_id = &request.get_ref().log_id) - )] - async fn add_attachments( - &self, - request: Request, - ) -> Result, Status> { - info!( - "AddAttachment request. New holders: {}", - &request.get_ref().holders - ); - - handlers::add_attachments::handle_add_attachments( - &self.db, - &self.blob_client, - request.into_inner(), - ) - .await?; - - info!("Request processed successfully"); - Ok(Response::new(())) - } -} - -/// A helper converting our Database errors into gRPC responses -fn handle_db_error(db_error: DBError) -> Status { - match db_error { - DBError::AwsSdk(DynamoDBError::InternalServerError(_)) - | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( - _, - )) - | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { - warn!("AWS transient error occurred"); - Status::unavailable("please retry") - } - e => { - error!("Encountered an unexpected error: {}", e); - Status::failed_precondition("unexpected error") - } - } -} diff --git a/shared/protos/backup.proto b/shared/protos/backup.proto deleted file mode 100644 --- a/shared/protos/backup.proto +++ /dev/null @@ -1,96 +0,0 @@ -syntax = "proto3"; - -package backup; - -import "google/protobuf/empty.proto"; - -/** - * API - description - * CreateNewBackup - This method is called when we want to create a new backup. - * We send a new backup key encrypted with the user's password and also the - * new compaction. New logs that will be sent from now on will be assigned to - * this backup. - * SendLog - User sends a new log to the backup service. The log is being - * assigned to the latest(or desired) backup's compaction item. - * RecoverBackupKey - Pulls data necessary for regenerating the backup key - * on the client-side for the latest(or desired) backup - * PullBackup - Fetches compaction + all logs assigned to it for the - * specified backup(default is the last backup) - */ - -service BackupService { - rpc CreateNewBackup(stream CreateNewBackupRequest) returns (stream CreateNewBackupResponse) {} - rpc SendLog(stream SendLogRequest) returns (SendLogResponse) {} - rpc RecoverBackupKey(stream RecoverBackupKeyRequest) returns (stream RecoverBackupKeyResponse) {} - rpc PullBackup(PullBackupRequest) returns (stream PullBackupResponse) {} - rpc AddAttachments(AddAttachmentsRequest) returns (google.protobuf.Empty) {} -} - -// CreateNewBackup - -message CreateNewBackupRequest { - oneof data { - string userID = 1; - string deviceID = 2; - bytes keyEntropy = 3; - bytes newCompactionHash = 4; - bytes newCompactionChunk = 5; - } -} - -message CreateNewBackupResponse { - string backupID = 1; -} - -// SendLog - -message SendLogRequest { - oneof data { - string userID = 1; - string backupID = 2; - bytes logHash = 3; - bytes logData = 4; - } -} - -message SendLogResponse { - string logCheckpoint = 1; -} - -// RecoverBackupKey - -message RecoverBackupKeyRequest { - string userID = 1; -} - -message RecoverBackupKeyResponse { - string backupID = 4; -} - -// PullBackup - -message PullBackupRequest { - string userID = 1; - string backupID = 2; -} - -message PullBackupResponse { - oneof id { - string backupID = 1; - string logID = 2; - } - oneof data { - bytes compactionChunk = 3; - bytes logChunk = 4; - } - optional string attachmentHolders = 5; -} - -// AddAttachment - -message AddAttachmentsRequest { - string userID = 1; - string backupID = 2; - string logID = 3; - string holders = 4; -}