Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3346420
D8796.id30150.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
60 KB
Referenced Files
None
Subscribers
None
D8796.id30150.diff
View Options
diff --git a/services/backup/Cargo.lock b/services/backup/Cargo.lock
--- a/services/backup/Cargo.lock
+++ b/services/backup/Cargo.lock
@@ -48,17 +48,6 @@
"syn 2.0.28",
]
-[[package]]
-name = "async-trait"
-version = "0.1.59"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 1.0.105",
-]
-
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -383,52 +372,6 @@
"tracing",
]
-[[package]]
-name = "axum"
-version = "0.6.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48"
-dependencies = [
- "async-trait",
- "axum-core",
- "bitflags",
- "bytes",
- "futures-util",
- "http",
- "http-body",
- "hyper",
- "itoa",
- "matchit",
- "memchr",
- "mime",
- "percent-encoding",
- "pin-project-lite",
- "rustversion",
- "serde",
- "sync_wrapper",
- "tower",
- "tower-http",
- "tower-layer",
- "tower-service",
-]
-
-[[package]]
-name = "axum-core"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92"
-dependencies = [
- "async-trait",
- "bytes",
- "futures-util",
- "http",
- "http-body",
- "mime",
- "rustversion",
- "tower-layer",
- "tower-service",
-]
-
[[package]]
name = "backup"
version = "0.1.0"
@@ -442,11 +385,9 @@
"clap",
"comm-services-lib",
"once_cell",
- "prost",
"rand",
"tokio",
"tokio-stream",
- "tonic",
"tonic-build",
"tracing",
"tracing-futures",
@@ -955,12 +896,6 @@
"pin-project-lite",
]
-[[package]]
-name = "http-range-header"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
-
[[package]]
name = "httparse"
version = "1.8.0"
@@ -1012,18 +947,6 @@
"tokio-rustls",
]
-[[package]]
-name = "hyper-timeout"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
-dependencies = [
- "hyper",
- "pin-project-lite",
- "tokio",
- "tokio-io-timeout",
-]
-
[[package]]
name = "iana-time-zone"
version = "0.1.53"
@@ -1158,24 +1081,12 @@
"regex-automata",
]
-[[package]]
-name = "matchit"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
-
[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
-[[package]]
-name = "mime"
-version = "0.3.16"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
-
[[package]]
name = "mio"
version = "0.8.5"
@@ -1569,12 +1480,6 @@
"base64",
]
-[[package]]
-name = "rustversion"
-version = "1.0.9"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
-
[[package]]
name = "ryu"
version = "1.0.11"
@@ -1727,12 +1632,6 @@
"unicode-ident",
]
-[[package]]
-name = "sync_wrapper"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
-
[[package]]
name = "tempfile"
version = "3.3.0"
@@ -1820,16 +1719,6 @@
"windows-sys 0.42.0",
]
-[[package]]
-name = "tokio-io-timeout"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
-dependencies = [
- "pin-project-lite",
- "tokio",
-]
-
[[package]]
name = "tokio-macros"
version = "1.8.2"
@@ -1877,38 +1766,6 @@
"tracing",
]
-[[package]]
-name = "tonic"
-version = "0.8.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
-dependencies = [
- "async-stream",
- "async-trait",
- "axum",
- "base64",
- "bytes",
- "futures-core",
- "futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
- "hyper-timeout",
- "percent-encoding",
- "pin-project",
- "prost",
- "prost-derive",
- "tokio",
- "tokio-stream",
- "tokio-util",
- "tower",
- "tower-layer",
- "tower-service",
- "tracing",
- "tracing-futures",
-]
-
[[package]]
name = "tonic-build"
version = "0.8.4"
@@ -1930,37 +1787,14 @@
dependencies = [
"futures-core",
"futures-util",
- "indexmap",
"pin-project",
"pin-project-lite",
- "rand",
- "slab",
"tokio",
- "tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
-[[package]]
-name = "tower-http"
-version = "0.3.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
-dependencies = [
- "bitflags",
- "bytes",
- "futures-core",
- "futures-util",
- "http",
- "http-body",
- "http-range-header",
- "pin-project-lite",
- "tower",
- "tower-layer",
- "tower-service",
-]
-
[[package]]
name = "tower-layer"
version = "0.3.2"
diff --git a/services/backup/Cargo.toml b/services/backup/Cargo.toml
--- a/services/backup/Cargo.toml
+++ b/services/backup/Cargo.toml
@@ -16,14 +16,12 @@
clap = { version = "4.0", features = ["derive", "env"] }
comm-services-lib = { path = "../comm-services-lib" }
once_cell = "1.17"
-prost = "0.11"
rand = "0.8.5"
-tokio = { version = "1.24", features = ["rt-multi-thread"]}
+tokio = { version = "1.24", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1"
-tonic = "0.8"
tracing = "0.1"
-tracing-futures = { version = "0.2", features = ["futures-03"] }
-tracing-subscriber = { version = "0.3", features = ["env-filter"]}
+tracing-futures = { version = "0.2", features = ["futures-03"] }
+tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.2", features = ["v4"] }
[build-dependencies]
diff --git a/services/backup/Dockerfile b/services/backup/Dockerfile
--- a/services/backup/Dockerfile
+++ b/services/backup/Dockerfile
@@ -21,8 +21,6 @@
# Copy actual application sources
COPY services/backup .
-COPY shared/protos/backup.proto ../../shared/protos/
-COPY shared/protos/blob.proto ../../shared/protos/
# Remove the previously-built binary so that only the application itself is
# rebuilt
diff --git a/services/backup/build.rs b/services/backup/build.rs
deleted file mode 100644
--- a/services/backup/build.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-fn main() {
- println!("cargo:rerun-if-changed=src/main.rs");
-
- println!("cargo:rerun-if-changed=../../shared/protos/backup.proto");
- println!("cargo:rerun-if-changed=../../shared/protos/blob.proto");
- tonic_build::compile_protos("../../shared/protos/backup.proto")
- .expect("Failed to compile Backup protobuf file");
- tonic_build::compile_protos("../../shared/protos/blob.proto")
- .expect("Failed to compile Blob protobuf file");
-}
diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs
deleted file mode 100644
--- a/services/backup/src/blob/downloader.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-use anyhow::{bail, Result};
-use tokio::{
- sync::mpsc::{self, Receiver},
- task::JoinHandle,
-};
-use tracing::{error, instrument, Instrument};
-
-use super::{proto, BlobClient};
-use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
-
-pub use proto::put_request::Data as PutRequestData;
-pub use proto::{PutRequest, PutResponse};
-
-/// The BlobDownloader instance is a handle holder of a Tokio task running the
-/// actual blob client instance. The communication is done via a MPSC channel
-/// and is one-sided - the data is transmitted from the client task to the
-/// caller. Blob chunks received in response stream are waiting
-/// for the channel to have capacity, so it is recommended to read them quickly
-/// to make room for more.
-/// The client task can be stopped and awaited for result via the `terminate()`
-/// method.
-pub struct BlobDownloader {
- rx: Receiver<Vec<u8>>,
- handle: JoinHandle<anyhow::Result<()>>,
-}
-
-impl BlobDownloader {
- /// Connects to the Blob service and keeps the client connection open
- /// in a separate Tokio task.
- #[instrument(name = "blob_downloader")]
- pub fn start(holder: String, mut blob_client: BlobClient) -> Self {
- let (blob_res_tx, blob_res_rx) =
- mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let client_thread = async move {
- let response = blob_client
- .get(proto::GetRequest { holder })
- .await
- .map_err(|err| {
- error!("Get request failed: {:?}", err);
- err
- })?;
- let mut inner_response = response.into_inner();
- loop {
- match inner_response.message().await? {
- Some(data) => {
- let data: Vec<u8> = data.data_chunk;
- if let Err(err) = blob_res_tx.send(data).await {
- bail!(err);
- }
- }
- // Response stream was closed
- None => break,
- }
- }
- Ok(())
- };
- let handle = tokio::spawn(client_thread.in_current_span());
-
- BlobDownloader {
- rx: blob_res_rx,
- handle,
- }
- }
-
- /// Receives the next chunk of blob data if ready or sleeps
- /// until the data is available.
- ///
- /// Returns `None` when the transmission is finished, but this doesn't
- /// determine if it was successful. After receiving `None`, the client
- /// should be consumed by calling [`GetClient::terminate`] to handle
- /// possible errors.
- pub async fn next_chunk(&mut self) -> Option<Vec<u8>> {
- self.rx.recv().await
- }
-
- /// Stops receiving messages and awaits the client thread to exit
- /// and returns its status.
- pub async fn terminate(mut self) -> Result<()> {
- self.rx.close();
- self.handle.await?
- }
-}
diff --git a/services/backup/src/blob/mod.rs b/services/backup/src/blob/mod.rs
deleted file mode 100644
--- a/services/backup/src/blob/mod.rs
+++ /dev/null
@@ -1,22 +0,0 @@
-mod proto {
- tonic::include_proto!("blob");
-}
-use proto::blob_service_client::BlobServiceClient;
-pub use proto::put_request::Data as PutRequestData;
-pub use proto::{PutRequest, PutResponse};
-
-mod downloader;
-mod uploader;
-pub use downloader::*;
-pub use uploader::*;
-
-pub type BlobClient = BlobServiceClient<tonic::transport::Channel>;
-
-/// Creates a new Blob service client instance. It does not attempt to connect
-/// to the service until first use.
-pub fn init_blob_client() -> BlobClient {
- let service_url = &crate::CONFIG.blob_service_url;
- let channel =
- tonic::transport::Channel::from_static(service_url).connect_lazy();
- BlobServiceClient::new(channel)
-}
diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs
deleted file mode 100644
--- a/services/backup/src/blob/uploader.rs
+++ /dev/null
@@ -1,168 +0,0 @@
-use anyhow::{anyhow, bail, Result};
-use tokio::{
- sync::mpsc::{self, Receiver, Sender},
- task::JoinHandle,
-};
-use tokio_stream::wrappers::ReceiverStream;
-use tonic::Status;
-use tracing::{error, instrument, Instrument};
-
-use super::{proto, BlobClient};
-use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
-
-pub use proto::put_request::Data as PutRequestData;
-pub use proto::{PutRequest, PutResponse};
-
-pub struct BlobUploader {
- req_tx: Sender<proto::PutRequest>,
- res_rx: Receiver<proto::PutResponse>,
- handle: JoinHandle<anyhow::Result<()>>,
-}
-
-/// The BlobUploader instance is a handle holder of a Tokio task running the
-/// actual blob client instance. The communication is done via two MPSC
-/// channels - one sending requests to the client task, and another for sending
-/// responses back to the caller. These messages should go in pairs
-/// - one request for one response.
-/// The client task can be stopped and awaited for result via the `terminate()`
-/// method.
-impl BlobUploader {
- /// Connects to the Blob service and keeps the client connection open
- /// in a separate Tokio task.
- #[instrument(name = "blob_uploader")]
- pub fn start(mut blob_client: BlobClient) -> Self {
- let (blob_req_tx, blob_req_rx) =
- mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let (blob_res_tx, blob_res_rx) =
- mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let client_thread = async move {
- match blob_client
- .put(tonic::Request::new(ReceiverStream::new(blob_req_rx)))
- .await
- {
- Ok(response) => {
- let mut response_stream = response.into_inner();
- loop {
- match response_stream.message().await? {
- Some(response_message) => {
- // warning: this will produce an error if there's more unread
- // responses than MPSC_CHANNEL_BUFFER_CAPACITY
- // so you should always read the response MPSC channel
- // right after sending a request to dequeue the responses
- // and make room for more.
- // The PutClient::put() function should take care of that
- if let Err(err) = blob_res_tx.try_send(response_message) {
- bail!(err);
- }
- }
- // Response stream was closed
- None => break,
- }
- }
- }
- Err(err) => {
- error!("Put request failed: {:?}", err);
- bail!(err.to_string());
- }
- };
- Ok(())
- };
- let handle = tokio::spawn(client_thread.in_current_span());
-
- BlobUploader {
- req_tx: blob_req_tx,
- res_rx: blob_res_rx,
- handle,
- }
- }
-
- /// Sends a [`PutRequest`] to the stream and waits for blob service
- /// to send a response. After all data is sent, the [`PutClient::terminate`]
- /// should be called to end the transmission and handle possible errors.
- pub async fn put(&mut self, req: PutRequest) -> Result<PutResponse> {
- self.req_tx.try_send(req)?;
- self
- .res_rx
- .recv()
- .await
- .ok_or_else(|| anyhow!("Blob client channel closed"))
- }
-
- /// Convenience wrapper for
- /// ```
- /// BlobClient::put(PutRequest {
- /// data: Some(PutRequestData::DataChunk(data))
- /// })
- /// ```
- pub async fn put_data(&mut self, data: Vec<u8>) -> Result<PutResponse> {
- self
- .put(PutRequest {
- data: Some(PutRequestData::DataChunk(data)),
- })
- .await
- }
-
- /// Closes the connection and awaits the blob client task to finish.
- pub async fn terminate(self) -> Result<()> {
- drop(self.req_tx);
- let thread_result = self.handle.await?;
- thread_result
- }
-}
-
-/// Starts a put client instance. Fulfills request with blob hash and holder.
-///
-/// `None` is returned if given `holder` already exists.
-///
-/// ## Example
-/// ```
-/// if let Some(mut client) =
-/// start_simple_uploader("my_holder", "my_hash").await? {
-/// let my_data = vec![1,2,3,4];
-/// let _ = client.put_data(my_data).await;
-///
-/// let status = client.terminate().await;
-/// }
-/// ```
-pub async fn start_simple_uploader(
- holder: &str,
- blob_hash: &str,
- blob_client: BlobClient,
-) -> Result<Option<BlobUploader>, Status> {
- // start upload request
- let mut uploader = BlobUploader::start(blob_client);
-
- // send holder
- uploader
- .put(PutRequest {
- data: Some(PutRequestData::Holder(holder.to_string())),
- })
- .await
- .map_err(|err| {
- error!("Failed to set blob holder: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- // send hash
- let PutResponse { data_exists } = uploader
- .put(PutRequest {
- data: Some(PutRequestData::BlobHash(blob_hash.to_string())),
- })
- .await
- .map_err(|err| {
- error!("Failed to set blob hash: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- // Blob with given holder already exists, nothing to do
- if data_exists {
- // the connection is already terminated by server,
- // but it's good to await it anyway
- uploader.terminate().await.map_err(|err| {
- error!("Put client task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- return Ok(None);
- }
- Ok(Some(uploader))
-}
diff --git a/services/backup/src/config.rs b/services/backup/src/config.rs
--- a/services/backup/src/config.rs
+++ b/services/backup/src/config.rs
@@ -3,16 +3,12 @@
use tracing::info;
use crate::constants::{
- DEFAULT_BLOB_SERVICE_URL, DEFAULT_GRPC_SERVER_PORT, DEFAULT_LOCALSTACK_URL,
- SANDBOX_ENV_VAR,
+ DEFAULT_BLOB_SERVICE_URL, DEFAULT_LOCALSTACK_URL, SANDBOX_ENV_VAR,
};
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct AppConfig {
- /// gRPC server listening port
- #[arg(long = "port", default_value_t = DEFAULT_GRPC_SERVER_PORT)]
- pub listening_port: u64,
/// Run the service in sandbox
#[arg(long = "sandbox", default_value_t = false)]
// support the env var for compatibility reasons
diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs
--- a/services/backup/src/constants.rs
+++ b/services/backup/src/constants.rs
@@ -31,8 +31,6 @@
pub const GRPC_METADATA_SIZE_PER_MESSAGE: usize = 5;
// Configuration defaults
-
-pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051;
pub const DEFAULT_LOCALSTACK_URL: &str = "http://localhost:4566";
pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053";
diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs
--- a/services/backup/src/main.rs
+++ b/services/backup/src/main.rs
@@ -1,17 +1,10 @@
use anyhow::Result;
-use std::net::SocketAddr;
-use tonic::transport::Server;
-use tracing::{info, Level};
+use tracing::Level;
use tracing_subscriber::EnvFilter;
-use crate::blob::BlobClient;
-use crate::service::{BackupServiceServer, MyBackupService};
-
-pub mod blob;
pub mod config;
pub mod constants;
pub mod database;
-pub mod service;
pub mod utils;
// re-export this to be available as crate::CONFIG
@@ -28,30 +21,13 @@
Ok(())
}
-async fn run_grpc_server(
- db: database::DatabaseClient,
- blob_client: BlobClient,
-) -> Result<()> {
- let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?;
- let backup_service = MyBackupService::new(db, blob_client);
-
- info!("Starting gRPC server listening at {}", addr.to_string());
- Server::builder()
- .add_service(BackupServiceServer::new(backup_service))
- .serve(addr)
- .await?;
-
- Ok(())
-}
-
#[tokio::main]
async fn main() -> Result<()> {
config::parse_cmdline_args();
configure_logging()?;
let aws_config = config::load_aws_config().await;
- let db = database::DatabaseClient::new(&aws_config);
- let blob_client = blob::init_blob_client();
+ let _db = database::DatabaseClient::new(&aws_config);
- run_grpc_server(db, blob_client).await
+ Ok(())
}
diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/add_attachments.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use tonic::Status;
-use tracing::debug;
-use tracing::error;
-
-use super::handle_db_error;
-use super::proto;
-use crate::{
- blob::BlobClient,
- constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
- database::{DatabaseClient, LogItem},
-};
-
-pub async fn handle_add_attachments(
- db: &DatabaseClient,
- blob_client: &BlobClient,
- request: proto::AddAttachmentsRequest,
-) -> Result<(), Status> {
- let proto::AddAttachmentsRequest {
- user_id,
- backup_id,
- log_id,
- holders,
- } = request;
-
- if user_id.is_empty() {
- return Err(Status::invalid_argument(
- "user id required but not provided",
- ));
- }
- if backup_id.is_empty() {
- return Err(Status::invalid_argument(
- "backup id required but not provided",
- ));
- }
- if holders.is_empty() {
- return Err(Status::invalid_argument(
- "holders required but not provided",
- ));
- }
-
- if log_id.is_empty() {
- let backup_item_result = db
- .find_backup_item(&user_id, &backup_id)
- .await
- .map_err(handle_db_error)?;
- let mut backup_item = backup_item_result.ok_or_else(|| {
- debug!("Backup item not found");
- Status::not_found("Backup item not found")
- })?;
-
- add_new_attachments(&mut backup_item.attachment_holders, &holders);
-
- db.put_backup_item(backup_item)
- .await
- .map_err(handle_db_error)?;
- } else {
- let log_item_result = db
- .find_log_item(&backup_id, &log_id)
- .await
- .map_err(handle_db_error)?;
- let mut log_item = log_item_result.ok_or_else(|| {
- debug!("Log item not found");
- Status::not_found("Log item not found")
- })?;
-
- add_new_attachments(&mut log_item.attachment_holders, &holders);
-
- // log item too large for database, move it to blob-service stroage
- if !log_item.persisted_in_blob
- && log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT
- {
- debug!("Log item too large. Persisting in blob service...");
- log_item = move_to_blob(log_item, blob_client).await?;
- }
-
- db.put_log_item(log_item).await.map_err(handle_db_error)?;
- }
-
- Ok(())
-}
-
-async fn move_to_blob(
- log_item: LogItem,
- blob_client: &BlobClient,
-) -> Result<LogItem, Status> {
- let holder = crate::utils::generate_blob_holder(
- &log_item.data_hash,
- &log_item.backup_id,
- Some(&log_item.log_id),
- );
-
- if let Some(mut uploader) = crate::blob::start_simple_uploader(
- &holder,
- &log_item.data_hash,
- blob_client.clone(),
- )
- .await?
- {
- let blob_chunk = log_item.value.into_bytes();
- uploader.put_data(blob_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- uploader.terminate().await.map_err(|err| {
- error!("Put client task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- } else {
- debug!("Blob holder for log ID={} already exists", &log_item.log_id);
- }
-
- Ok(LogItem {
- persisted_in_blob: true,
- value: holder,
- ..log_item
- })
-}
-
-/// Modifies the [`current_holders_str`] by appending attachment holders
-/// contained in [`new_holders`]. Removes duplicates. Both arguments
-/// are expected to be [`ATTACHMENT_HOLDER_SEPARATOR`] separated strings.
-fn add_new_attachments(current_holders_str: &mut String, new_holders: &str) {
- let mut current_holders = parse_attachment_holders(current_holders_str);
-
- let new_holders = parse_attachment_holders(new_holders);
- current_holders.extend(new_holders);
-
- *current_holders_str = current_holders
- .into_iter()
- .collect::<Vec<_>>()
- .join(ATTACHMENT_HOLDER_SEPARATOR);
-}
-
-/// Parses an [`ATTACHMENT_HOLDER_SEPARATOR`] separated string into a `HashSet`
-/// of attachment holder string slices.
-fn parse_attachment_holders(
- holders_str: &str,
-) -> std::collections::HashSet<&str> {
- holders_str
- .split(ATTACHMENT_HOLDER_SEPARATOR)
- .filter(|holder| !holder.is_empty())
- .collect()
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use std::collections::HashSet;
-
- #[test]
- fn test_parse_attachments() {
- let holders = "h1;h2;h3";
- let expected = HashSet::from(["h1", "h2", "h3"]);
- assert_eq!(parse_attachment_holders(holders), expected);
- }
-
- #[test]
- fn test_empty_attachments() {
- let actual = parse_attachment_holders("");
- let expected = HashSet::new();
- assert_eq!(actual, expected);
- }
-
- #[test]
- fn test_add_attachments() {
- let mut current_holders = "holder1;holder2".to_string();
- let new_holders = "holder3;holder4";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(¤t_holders),
- HashSet::from(["holder1", "holder2", "holder3", "holder4"])
- );
- }
-
- #[test]
- fn test_add_to_empty() {
- let mut current_holders = String::new();
- let new_holders = "holder3;holder4";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(¤t_holders),
- HashSet::from(["holder3", "holder4"])
- );
- }
-
- #[test]
- fn test_add_none() {
- let mut current_holders = "holder1;holder2".to_string();
- let new_holders = "";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(¤t_holders),
- HashSet::from(["holder1", "holder2"])
- );
- }
-
- #[test]
- fn test_remove_duplicates() {
- let mut current_holders = "holder1;holder2".to_string();
- let new_holders = "holder2;holder3";
- add_new_attachments(&mut current_holders, new_holders);
- assert_eq!(
- parse_attachment_holders(¤t_holders),
- HashSet::from(["holder1", "holder2", "holder3"])
- );
- }
-}
diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/create_backup.rs
+++ /dev/null
@@ -1,234 +0,0 @@
-use tonic::Status;
-use tracing::{debug, error, trace, warn};
-
-use crate::{
- blob::{start_simple_uploader, BlobClient, BlobUploader},
- database::{BackupItem, DatabaseClient},
- service::proto,
-};
-
-use super::handle_db_error;
-
-type CreateBackupResult = Result<proto::CreateNewBackupResponse, Status>;
-
-enum HandlerState {
- /// Initial state. Handler is receiving non-data inputs
- ReceivingParams,
- /// Handler is receiving data chunks
- ReceivingData { uploader: BlobUploader },
- /// A special case when Blob service claims that a blob with given
- /// [`CreateBackupHandler::data_hash`] already exists
- DataAlreadyExists,
-}
-
-pub struct CreateBackupHandler {
- // flow control
- pub should_close_stream: bool,
-
- // inputs
- user_id: Option<String>,
- device_id: Option<String>,
- key_entropy: Option<Vec<u8>>,
- data_hash: Option<String>,
-
- // client instances
- db: DatabaseClient,
- blob_client: BlobClient,
-
- // internal state
- state: HandlerState,
- backup_id: String,
- holder: Option<String>,
-}
-
-impl CreateBackupHandler {
- pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self {
- CreateBackupHandler {
- should_close_stream: false,
- user_id: None,
- device_id: None,
- key_entropy: None,
- data_hash: None,
- db,
- blob_client,
- state: HandlerState::ReceivingParams,
- backup_id: String::new(),
- holder: None,
- }
- }
-
- pub async fn handle_user_id(
- &mut self,
- user_id: String,
- ) -> CreateBackupResult {
- if self.user_id.is_some() {
- warn!("user ID already provided");
- return Err(Status::invalid_argument("User ID already provided"));
- }
- self.user_id = Some(user_id);
- self.handle_internal().await
- }
- pub async fn handle_device_id(
- &mut self,
- device_id: String,
- ) -> CreateBackupResult {
- if self.device_id.is_some() {
- warn!("Device ID already provided");
- return Err(Status::invalid_argument("Device ID already provided"));
- }
- tracing::Span::current().record("device_id", &device_id);
- self.device_id = Some(device_id);
- self.handle_internal().await
- }
- pub async fn handle_key_entropy(
- &mut self,
- key_entropy: Vec<u8>,
- ) -> CreateBackupResult {
- if self.key_entropy.is_some() {
- warn!("Key entropy already provided");
- return Err(Status::invalid_argument("Key entropy already provided"));
- }
- self.key_entropy = Some(key_entropy);
- self.handle_internal().await
- }
- pub async fn handle_data_hash(
- &mut self,
- data_hash: Vec<u8>,
- ) -> CreateBackupResult {
- if self.data_hash.is_some() {
- warn!("Data hash already provided");
- return Err(Status::invalid_argument("Data hash already provided"));
- }
- let hash_str = String::from_utf8(data_hash).map_err(|err| {
- error!("Failed to convert data_hash into string: {:?}", err);
- Status::aborted("Unexpected error")
- })?;
- debug!("Received data hash: {}", &hash_str);
- self.data_hash = Some(hash_str);
- self.handle_internal().await
- }
-
- pub async fn handle_data_chunk(
- &mut self,
- data_chunk: Vec<u8>,
- ) -> CreateBackupResult {
- let HandlerState::ReceivingData { ref mut uploader } = self.state else {
- self.should_close_stream = true;
- error!("Data chunk sent before other inputs");
- return Err(Status::invalid_argument(
- "Data chunk sent before other inputs",
- ));
- };
-
- // empty chunk ends transmission
- if data_chunk.is_empty() {
- self.should_close_stream = true;
- return Ok(proto::CreateNewBackupResponse {
- backup_id: self.backup_id.clone(),
- });
- }
-
- trace!("Received {} bytes of data", data_chunk.len());
- uploader.put_data(data_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- Ok(proto::CreateNewBackupResponse {
- // actual Backup ID should be sent only once, the time it is generated
- // see handle_internal()
- backup_id: String::new(),
- })
- }
-
- /// This function should be called after the input stream is finished.
- pub async fn finish(self) -> Result<(), Status> {
- match self.state {
- HandlerState::ReceivingParams => {
- // client probably aborted early
- trace!("Nothing to store in database. Finishing early");
- return Ok(());
- }
- HandlerState::ReceivingData { uploader } => {
- uploader.terminate().await.map_err(|err| {
- error!("Uploader task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- }
- HandlerState::DataAlreadyExists => (),
- }
-
- let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else {
- error!("Holder / UserID absent in data mode. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
- if self.backup_id.is_empty() {
- error!("Backup ID was not generated. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- }
- let backup_item = BackupItem::new(user_id, self.backup_id, holder);
-
- self
- .db
- .put_backup_item(backup_item)
- .await
- .map_err(handle_db_error)?;
-
- Ok(())
- }
-
- // internal param handler helper
- async fn handle_internal(&mut self) -> CreateBackupResult {
- if !matches!(self.state, HandlerState::ReceivingParams) {
- error!("CreateBackupHandler already received all non-data params.");
- return Err(Status::failed_precondition("Backup data chunk expected"));
- }
-
- // all non-data inputs must be set before receiving backup data chunks
- let (Some(data_hash), Some(device_id), Some(_), Some(_)) = (
- self.data_hash.as_ref(),
- self.device_id.as_ref(),
- self.user_id.as_ref(),
- self.key_entropy.as_ref(),
- ) else {
- // return empty backup ID when inputs are incomplete
- return Ok(proto::CreateNewBackupResponse {
- backup_id: "".to_string(),
- });
- };
-
- let backup_id = generate_backup_id(device_id);
- let holder =
- crate::utils::generate_blob_holder(data_hash, &backup_id, None);
- self.backup_id = backup_id.clone();
- self.holder = Some(holder.clone());
- tracing::Span::current().record("backup_id", &backup_id);
- tracing::Span::current().record("blob_holder", &holder);
-
- match start_simple_uploader(&holder, data_hash, self.blob_client.clone())
- .await?
- {
- Some(uploader) => {
- self.state = HandlerState::ReceivingData { uploader };
- trace!("Everything prepared, waiting for data...");
- }
- None => {
- // Blob with given data_hash already exists
- debug!("Blob already exists, finishing");
- self.should_close_stream = true;
- self.state = HandlerState::DataAlreadyExists;
- }
- };
-
- Ok(proto::CreateNewBackupResponse { backup_id })
- }
-}
-
-/// Generates ID for a new backup
-fn generate_backup_id(device_id: &str) -> String {
- format!(
- "{device_id}_{timestamp}",
- device_id = device_id,
- timestamp = chrono::Utc::now().timestamp_millis()
- )
-}
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ /dev/null
@@ -1,330 +0,0 @@
-use async_stream::try_stream;
-use tokio_stream::{Stream, StreamExt};
-use tonic::Status;
-use tracing::{debug, error, trace, warn};
-use tracing_futures::Instrument;
-
-use super::handle_db_error;
-use super::proto::{self, PullBackupResponse};
-use crate::{
- blob::{BlobClient, BlobDownloader},
- constants::{
- BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID,
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_LOG_ID,
- },
- database::{BackupItem, DatabaseClient, LogItem},
-};
-
-pub struct PullBackupHandler {
- blob_client: BlobClient,
- backup_item: BackupItem,
- logs: Vec<LogItem>,
-}
-
-impl PullBackupHandler {
- pub async fn new(
- db: &DatabaseClient,
- blob_client: &BlobClient,
- request: proto::PullBackupRequest,
- ) -> Result<Self, Status> {
- let proto::PullBackupRequest { user_id, backup_id } = request;
- let backup_item = db
- .find_backup_item(&user_id, &backup_id)
- .await
- .map_err(handle_db_error)?
- .ok_or_else(|| {
- debug!("Backup item not found");
- Status::not_found("Backup item not found")
- })?;
-
- let backup_id = backup_item.backup_id.as_str();
- let logs = db
- .find_log_items_for_backup(backup_id)
- .await
- .map_err(handle_db_error)?;
-
- Ok(PullBackupHandler {
- backup_item,
- logs,
- blob_client: blob_client.clone(),
- })
- }
-
- /// Consumes the handler and provides a response `Stream`. The stream will
- /// produce the following in order:
- /// - Backup compaction data chunks
- /// - Backup logs
- /// - Whole log, if stored in db
- /// - Log chunks, if stored in blob
- pub fn into_response_stream(
- self,
- ) -> impl Stream<Item = Result<PullBackupResponse, Status>> {
- use proto::pull_backup_response::*;
-
- try_stream! {
- debug!("Pulling backup...");
- {
- let compaction_stream = data_stream(&self.backup_item, self.blob_client.clone());
- tokio::pin!(compaction_stream);
- while let Some(response) = compaction_stream.try_next().await? {
- yield response;
- }
- }
- trace!("Backup data pull complete.");
-
- if self.logs.is_empty() {
- debug!("No logs to pull. Finishing");
- return;
- }
-
- debug!("Pulling logs...");
- for log in self.logs {
- trace!("Pulling log ID={}", &log.log_id);
- let span = tracing::trace_span!("log", log_id = &log.log_id);
-
- if log.persisted_in_blob {
- trace!(parent: &span, "Log persisted in blob");
- let log_data_stream = data_stream(&log, self.blob_client.clone()).instrument(span);
- tokio::pin!(log_data_stream);
- while let Some(response) = log_data_stream.try_next().await? {
- yield response;
- }
- } else {
- trace!(parent: &span, "Log persisted in database");
- yield proto::PullBackupResponse {
- attachment_holders: Some(log.attachment_holders),
- id: Some(Id::LogId(log.log_id)),
- data: Some(Data::LogChunk(log.value.into_bytes())),
- };
- }
- }
- trace!("Pulled all logs, done");
- }
- }
-}
-
-/// Downloads a blob-stored [`BlobStoredItem`] and streams its content into
-/// stream of [`PullBackupResponse`] objects, handles gRPC message size details.
-fn data_stream<Item>(
- item: &Item,
- blob_client: BlobClient,
-) -> impl Stream<Item = Result<PullBackupResponse, Status>> + '_
-where
- Item: BlobStoredItem,
-{
- try_stream! {
- let mut buffer = ResponseBuffer::default();
- let mut downloader =
- BlobDownloader::start(item.get_holder().to_string(), blob_client);
-
- let mut is_first_chunk = true;
- loop {
- if !buffer.is_saturated() {
- if let Some(data) = downloader.next_chunk().await {
- buffer.put(data);
- }
- }
- if buffer.is_empty() {
- break;
- }
-
- // get data chunk, shortened by length of metadata
- let padding = item.metadata_size(is_first_chunk);
- let chunk = buffer.get_chunk(padding);
-
- trace!(
- with_attachments = is_first_chunk,
- data_size = chunk.len(),
- "Sending data chunk"
- );
- yield item.to_response(chunk, is_first_chunk);
- is_first_chunk = false;
- }
-
- downloader.terminate().await.map_err(|err| {
- error!("Blob downloader failed: {:?}", err);
- Status::aborted("Internal error")
- })?;
- }
-}
-
-/// Represents downloadable item stored in Blob service
-trait BlobStoredItem {
- // Blob holder representing this item
- fn get_holder(&self) -> &str;
-
- /// Generates a gRPC response for given `data_chunk`.
- /// The response may be in extended version, with `include_extra_info`,
- /// ususally sent with first chunk
- fn to_response(
- &self,
- data_chunk: Vec<u8>,
- include_extra_info: bool,
- ) -> proto::PullBackupResponse;
-
- /// Size in bytes of non-data fields contained in response message.
- fn metadata_size(&self, include_extra_info: bool) -> usize;
-}
-
-impl BlobStoredItem for BackupItem {
- fn get_holder(&self) -> &str {
- &self.compaction_holder
- }
-
- fn to_response(
- &self,
- data_chunk: Vec<u8>,
- include_extra_info: bool,
- ) -> proto::PullBackupResponse {
- use proto::pull_backup_response::*;
- let attachment_holders = if include_extra_info {
- Some(self.attachment_holders.clone())
- } else {
- None
- };
- proto::PullBackupResponse {
- id: Some(Id::BackupId(self.backup_id.clone())),
- data: Some(Data::CompactionChunk(data_chunk)),
- attachment_holders,
- }
- }
-
- fn metadata_size(&self, include_extra_info: bool) -> usize {
- let mut extra_bytes: usize = 0;
- extra_bytes += BACKUP_TABLE_FIELD_BACKUP_ID.as_bytes().len();
- extra_bytes += self.backup_id.as_bytes().len();
- if include_extra_info {
- extra_bytes += BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len();
- extra_bytes += self.attachment_holders.as_bytes().len();
- }
- extra_bytes
- }
-}
-
-impl BlobStoredItem for LogItem {
- fn get_holder(&self) -> &str {
- &self.value
- }
-
- fn to_response(
- &self,
- data_chunk: Vec<u8>,
- include_extra_info: bool,
- ) -> proto::PullBackupResponse {
- use proto::pull_backup_response::*;
- let attachment_holders = if include_extra_info {
- Some(self.attachment_holders.clone())
- } else {
- None
- };
- proto::PullBackupResponse {
- id: Some(Id::LogId(self.log_id.clone())),
- data: Some(Data::LogChunk(data_chunk)),
- attachment_holders,
- }
- }
-
- fn metadata_size(&self, include_extra_info: bool) -> usize {
- let mut extra_bytes: usize = 0;
- extra_bytes += LOG_TABLE_FIELD_LOG_ID.as_bytes().len();
- extra_bytes += self.log_id.as_bytes().len();
- if include_extra_info {
- extra_bytes += LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len();
- extra_bytes += self.attachment_holders.as_bytes().len();
- }
- extra_bytes
- }
-}
-
-/// A utility structure that buffers downloaded data and allows to retrieve it
-/// as chunks of arbitrary size, not greater than provided `limit`.
-struct ResponseBuffer {
- buf: Vec<u8>,
- limit: usize,
-}
-
-impl Default for ResponseBuffer {
- /// Buffer size defaults to max usable gRPC message size
- fn default() -> Self {
- ResponseBuffer::new(GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE)
- }
-}
-
-impl ResponseBuffer {
- pub fn new(limit: usize) -> Self {
- ResponseBuffer {
- buf: Vec::new(),
- limit,
- }
- }
-
- pub fn put(&mut self, data: Vec<u8>) {
- if data.len() > self.limit {
- warn!("Data saved to buffer is larger than chunk limit.");
- }
-
- self.buf.extend(data);
- }
-
- /// Gets chunk of size `limit - padding` and leaves remainder in buffer
- pub fn get_chunk(&mut self, padding: usize) -> Vec<u8> {
- let mut chunk = std::mem::take(&mut self.buf);
-
- let target_size = self.limit - padding;
- if chunk.len() > target_size {
- // after this operation, chunk=0..target_size, self.buf=target_size..end
- self.buf = chunk.split_off(target_size);
- }
- return chunk;
- }
-
- /// Does buffer length exceed given limit
- pub fn is_saturated(&self) -> bool {
- self.buf.len() >= self.limit
- }
-
- pub fn is_empty(&self) -> bool {
- self.buf.is_empty()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- const LIMIT: usize = 100;
-
- #[test]
- fn test_response_buffer() {
- let mut buffer = ResponseBuffer::new(LIMIT);
- assert_eq!(buffer.is_empty(), true);
-
- // put 80 bytes of data
- buffer.put(vec![0u8; 80]);
- assert_eq!(buffer.is_empty(), false);
- assert_eq!(buffer.is_saturated(), false);
-
- // put next 80 bytes, should be saturated as 160 > 100
- buffer.put(vec![0u8; 80]);
- let buf_size = buffer.buf.len();
- assert_eq!(buffer.is_saturated(), true);
- assert_eq!(buf_size, 160);
-
- // get one chunk
- let padding: usize = 10;
- let expected_chunk_size = LIMIT - padding;
- let chunk = buffer.get_chunk(padding);
- assert_eq!(chunk.len(), expected_chunk_size); // 90
-
- // buffer should not be saturated now (160 - 90 < 100)
- let remaining_buf_size = buffer.buf.len();
- assert_eq!(remaining_buf_size, buf_size - expected_chunk_size);
- assert_eq!(buffer.is_saturated(), false);
-
- // get last chunk
- let chunk = buffer.get_chunk(padding);
- assert_eq!(chunk.len(), remaining_buf_size);
- assert_eq!(buffer.is_empty(), true);
- }
-}
diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
deleted file mode 100644
--- a/services/backup/src/service/handlers/send_log.rs
+++ /dev/null
@@ -1,272 +0,0 @@
-use tonic::Status;
-use tracing::{debug, error, trace, warn};
-use uuid::Uuid;
-
-use super::handle_db_error;
-use crate::{
- blob::{BlobClient, BlobUploader},
- constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
- database::{DatabaseClient, LogItem},
- service::proto::SendLogResponse,
-};
-
-enum LogPersistence {
- /// Log entirely stored in DynamoDB database
- DB,
- /// Log contents stored with Blob service
- BLOB { holder: String },
-}
-
-pub struct SendLogHandler {
- // flow control
- pub should_close_stream: bool,
-
- // inputs
- user_id: Option<String>,
- backup_id: Option<String>,
- log_hash: Option<String>,
-
- // internal state
- log_id: Option<String>,
- log_buffer: Vec<u8>,
- persistence_method: LogPersistence,
- should_receive_data: bool,
-
- // client instances
- db: DatabaseClient,
- blob_client: BlobClient,
- uploader: Option<BlobUploader>,
-}
-
-impl SendLogHandler {
- pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self {
- SendLogHandler {
- db: db.clone(),
- blob_client: blob_client.clone(),
- uploader: None,
- user_id: None,
- backup_id: None,
- log_hash: None,
- log_id: None,
- log_buffer: Vec::new(),
- persistence_method: LogPersistence::DB,
- should_receive_data: false,
- should_close_stream: false,
- }
- }
-
- pub async fn handle_user_id(
- &mut self,
- user_id: String,
- ) -> Result<(), Status> {
- if self.user_id.is_some() {
- warn!("user ID already provided");
- return Err(Status::invalid_argument("User ID already provided"));
- }
- self.user_id = Some(user_id);
- self.handle_internal().await
- }
- pub async fn handle_backup_id(
- &mut self,
- backup_id: String,
- ) -> Result<(), Status> {
- if self.backup_id.is_some() {
- warn!("backup ID already provided");
- return Err(Status::invalid_argument("Backup ID already provided"));
- }
- tracing::Span::current().record("backup_id", &backup_id);
- self.backup_id = Some(backup_id);
- self.handle_internal().await
- }
- pub async fn handle_log_hash(
- &mut self,
- log_hash: Vec<u8>,
- ) -> Result<(), Status> {
- if self.log_hash.is_some() {
- warn!("Log hash already provided");
- return Err(Status::invalid_argument("Log hash already provided"));
- }
- let hash_str = String::from_utf8(log_hash).map_err(|err| {
- error!("Failed to convert data_hash into string: {:?}", err);
- Status::aborted("Unexpected error")
- })?;
- debug!("Received log hash: {}", &hash_str);
- self.log_hash = Some(hash_str);
- self.handle_internal().await
- }
- pub async fn handle_log_data(
- &mut self,
- data_chunk: Vec<u8>,
- ) -> Result<(), Status> {
- if !self.should_receive_data || self.log_id.is_none() {
- self.should_close_stream = true;
- error!("Data chunk sent before other inputs");
- return Err(Status::invalid_argument(
- "Data chunk sent before other inputs",
- ));
- }
-
- // empty chunk ends transmission
- if data_chunk.is_empty() {
- self.should_close_stream = true;
- return Ok(());
- }
-
- match self.persistence_method {
- LogPersistence::DB => {
- self.log_buffer.extend(data_chunk);
- self.ensure_size_constraints().await?;
- }
- LogPersistence::BLOB { .. } => {
- let Some(client) = self.uploader.as_mut() else {
- self.should_close_stream = true;
- error!("Put client uninitialized. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
- client.put_data(data_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
- }
- }
- Ok(())
- }
-
- pub async fn finish(self) -> Result<SendLogResponse, Status> {
- if let Some(client) = self.uploader {
- client.terminate().await.map_err(|err| {
- error!("Put client task closed with error: {:?}", err);
- Status::aborted("Internal error")
- })?;
- } else {
- trace!("No uploader initialized. Skipping termination");
- }
-
- if !self.should_receive_data {
- // client probably aborted early
- trace!("Nothing to store in database. Finishing early");
- return Ok(SendLogResponse {
- log_checkpoint: "".to_string(),
- });
- }
-
- let (Some(backup_id), Some(log_id), Some(data_hash)) = (
- self.backup_id,
- self.log_id,
- self.log_hash
- ) else {
- error!("Log info absent in data mode. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
-
- let (log_value, persisted_in_blob) = match self.persistence_method {
- LogPersistence::BLOB { holder } => (holder, true),
- LogPersistence::DB => {
- let contents = String::from_utf8(self.log_buffer).map_err(|err| {
- error!("Failed to convert log contents data into string: {:?}", err);
- Status::aborted("Unexpected error")
- })?;
- (contents, false)
- }
- };
-
- let log_item = LogItem {
- backup_id,
- log_id: log_id.clone(),
- persisted_in_blob,
- value: log_value,
- attachment_holders: String::new(),
- data_hash,
- };
-
- self
- .db
- .put_log_item(log_item)
- .await
- .map_err(handle_db_error)?;
-
- Ok(SendLogResponse {
- log_checkpoint: log_id,
- })
- }
-
- // internal param handler helper
- async fn handle_internal(&mut self) -> Result<(), Status> {
- if self.should_receive_data {
- error!("SendLogHandler is already expecting data chunks");
- return Err(Status::failed_precondition("Log data chunk expected"));
- }
-
- // all non-data inputs must be set before receiving log contents
- let (Some(backup_id), Some(_), Some(_)) = (
- self.backup_id.as_ref(),
- self.user_id.as_ref(),
- self.log_hash.as_ref()
- ) else { return Ok(()); };
-
- let log_id = generate_log_id(backup_id);
- tracing::Span::current().record("log_id", &log_id);
- self.log_id = Some(log_id);
-
- trace!("Everything prepared, waiting for data...");
- self.should_receive_data = true;
- Ok(())
- }
-
- /// Ensures log fits db size constraints. If not, it is moved to blob
- /// persistence
- async fn ensure_size_constraints(&mut self) -> Result<(), Status> {
- let (Some(backup_id), Some(log_id), Some(log_hash)) = (
- self.backup_id.as_ref(),
- self.log_id.as_ref(),
- self.log_hash.as_ref()
- ) else {
- self.should_close_stream = true;
- error!("Log info absent in data mode. This should never happen!");
- return Err(Status::failed_precondition("Internal error"));
- };
-
- let log_size = LogItem::size_from_components(
- backup_id,
- log_id,
- log_hash,
- &self.log_buffer,
- );
- if log_size > LOG_DATA_SIZE_DATABASE_LIMIT {
- debug!("Log too large, switching persistence to Blob");
- let holder =
- crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id));
- match crate::blob::start_simple_uploader(
- &holder,
- &log_hash,
- self.blob_client.clone(),
- )
- .await?
- {
- Some(mut uploader) => {
- let blob_chunk = std::mem::take(&mut self.log_buffer);
- uploader.put_data(blob_chunk).await.map_err(|err| {
- error!("Failed to upload data chunk: {:?}", err);
- Status::aborted("Internal error")
- })?;
- self.uploader = Some(uploader);
- }
- None => {
- debug!("Log hash already exists");
- self.should_close_stream = true;
- }
- }
- self.persistence_method = LogPersistence::BLOB { holder };
- }
- Ok(())
- }
-}
-
-fn generate_log_id(backup_id: &str) -> String {
- format!(
- "{backup_id}{sep}{uuid}",
- backup_id = backup_id,
- sep = ID_SEPARATOR,
- uuid = Uuid::new_v4()
- )
-}
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
deleted file mode 100644
--- a/services/backup/src/service/mod.rs
+++ /dev/null
@@ -1,241 +0,0 @@
-use aws_sdk_dynamodb::Error as DynamoDBError;
-use comm_services_lib::database::Error as DBError;
-use proto::backup_service_server::BackupService;
-use std::pin::Pin;
-use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
-use tonic::{Request, Response, Status};
-use tracing::{debug, error, info, instrument, trace, warn};
-use tracing_futures::Instrument;
-
-use crate::{
- blob::BlobClient, constants::MPSC_CHANNEL_BUFFER_CAPACITY,
- database::DatabaseClient,
-};
-
-mod proto {
- tonic::include_proto!("backup");
-}
-pub use proto::backup_service_server::BackupServiceServer;
-
-/// submodule containing gRPC endpoint handler implementations
-mod handlers {
- pub(super) mod add_attachments;
- pub(super) mod create_backup;
- pub(super) mod pull_backup;
- pub(super) mod send_log;
-
- // re-exports for convenient usage in handlers
- pub(self) use super::handle_db_error;
- pub(self) use super::proto;
-}
-use self::handlers::create_backup::CreateBackupHandler;
-use self::handlers::pull_backup::PullBackupHandler;
-use self::handlers::send_log::SendLogHandler;
-
-pub struct MyBackupService {
- db: DatabaseClient,
- blob_client: BlobClient,
-}
-
-impl MyBackupService {
- pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self {
- MyBackupService {
- db: db_client,
- blob_client,
- }
- }
-}
-
-// gRPC implementation
-#[tonic::async_trait]
-impl BackupService for MyBackupService {
- type CreateNewBackupStream = Pin<
- Box<
- dyn Stream<Item = Result<proto::CreateNewBackupResponse, Status>> + Send,
- >,
- >;
-
- #[instrument(skip_all, fields(device_id, backup_id, blob_holder))]
- async fn create_new_backup(
- &self,
- request: Request<tonic::Streaming<proto::CreateNewBackupRequest>>,
- ) -> Result<Response<Self::CreateNewBackupStream>, Status> {
- use proto::create_new_backup_request::Data::*;
-
- info!("CreateNewBackup request: {:?}", request);
- let mut in_stream = request.into_inner();
- let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let db = self.db.clone();
- let blob_client = self.blob_client.clone();
- let worker = async move {
- let mut handler = CreateBackupHandler::new(db, blob_client);
- while let Some(message) = in_stream.next().await {
- let response = match message {
- Ok(proto::CreateNewBackupRequest {
- data: Some(UserId(user_id)),
- }) => handler.handle_user_id(user_id).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(DeviceId(device_id)),
- }) => handler.handle_device_id(device_id).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(KeyEntropy(key_entropy)),
- }) => handler.handle_key_entropy(key_entropy).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(NewCompactionHash(hash)),
- }) => handler.handle_data_hash(hash).await,
- Ok(proto::CreateNewBackupRequest {
- data: Some(NewCompactionChunk(chunk)),
- }) => handler.handle_data_chunk(chunk).await,
- unexpected => {
- error!("Received an unexpected request: {:?}", unexpected);
- Err(Status::unknown("unknown error"))
- }
- };
-
- trace!("Sending response: {:?}", response);
- if let Err(e) = tx.send(response).await {
- error!("Response was dropped: {}", e);
- break;
- }
- if handler.should_close_stream {
- trace!("Handler requested to close stream");
- break;
- }
- }
- if let Err(status) = handler.finish().await {
- trace!("Sending error response: {:?}", status);
- let _ = tx.send(Err(status)).await;
- }
- debug!("Request finished processing");
- };
- tokio::spawn(worker.in_current_span());
-
- let out_stream = ReceiverStream::new(rx);
- Ok(Response::new(
- Box::pin(out_stream) as Self::CreateNewBackupStream
- ))
- }
-
- #[instrument(skip_all, fields(backup_id, log_id))]
- async fn send_log(
- &self,
- request: Request<tonic::Streaming<proto::SendLogRequest>>,
- ) -> Result<Response<proto::SendLogResponse>, Status> {
- use proto::send_log_request::Data::*;
-
- info!("SendLog request: {:?}", request);
- let mut handler = SendLogHandler::new(&self.db, &self.blob_client);
-
- let mut in_stream = request.into_inner();
- while let Some(message) = in_stream.next().await {
- let result = match message {
- Ok(proto::SendLogRequest {
- data: Some(UserId(user_id)),
- }) => handler.handle_user_id(user_id).await,
- Ok(proto::SendLogRequest {
- data: Some(BackupId(backup_id)),
- }) => handler.handle_backup_id(backup_id).await,
- Ok(proto::SendLogRequest {
- data: Some(LogHash(log_hash)),
- }) => handler.handle_log_hash(log_hash).await,
- Ok(proto::SendLogRequest {
- data: Some(LogData(chunk)),
- }) => handler.handle_log_data(chunk).await,
- unexpected => {
- error!("Received an unexpected request: {:?}", unexpected);
- Err(Status::unknown("unknown error"))
- }
- };
-
- if let Err(err) = result {
- error!("An error occurred when processing request: {:?}", err);
- return Err(err);
- }
- if handler.should_close_stream {
- trace!("Handler requested to close request stream");
- break;
- }
- }
-
- let response = handler.finish().await;
- debug!("Finished. Sending response: {:?}", response);
- response.map(|response_body| Response::new(response_body))
- }
-
- type RecoverBackupKeyStream = Pin<
- Box<
- dyn Stream<Item = Result<proto::RecoverBackupKeyResponse, Status>> + Send,
- >,
- >;
-
- #[instrument(skip(self))]
- async fn recover_backup_key(
- &self,
- _request: Request<tonic::Streaming<proto::RecoverBackupKeyRequest>>,
- ) -> Result<Response<Self::RecoverBackupKeyStream>, Status> {
- Err(Status::unimplemented("unimplemented"))
- }
-
- type PullBackupStream = Pin<
- Box<dyn Stream<Item = Result<proto::PullBackupResponse, Status>> + Send>,
- >;
-
- #[instrument(skip_all, fields(backup_id = &request.get_ref().backup_id))]
- async fn pull_backup(
- &self,
- request: Request<proto::PullBackupRequest>,
- ) -> Result<Response<Self::PullBackupStream>, Status> {
- info!("PullBackup request: {:?}", request);
-
- let handler =
- PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner())
- .await?;
-
- let stream = handler.into_response_stream().in_current_span();
- Ok(Response::new(Box::pin(stream) as Self::PullBackupStream))
- }
-
- #[instrument(skip_all,
- fields(
- backup_id = &request.get_ref().backup_id,
- log_id = &request.get_ref().log_id)
- )]
- async fn add_attachments(
- &self,
- request: Request<proto::AddAttachmentsRequest>,
- ) -> Result<Response<()>, Status> {
- info!(
- "AddAttachment request. New holders: {}",
- &request.get_ref().holders
- );
-
- handlers::add_attachments::handle_add_attachments(
- &self.db,
- &self.blob_client,
- request.into_inner(),
- )
- .await?;
-
- info!("Request processed successfully");
- Ok(Response::new(()))
- }
-}
-
-/// A helper converting our Database errors into gRPC responses
-fn handle_db_error(db_error: DBError) -> Status {
- match db_error {
- DBError::AwsSdk(DynamoDBError::InternalServerError(_))
- | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException(
- _,
- ))
- | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => {
- warn!("AWS transient error occurred");
- Status::unavailable("please retry")
- }
- e => {
- error!("Encountered an unexpected error: {}", e);
- Status::failed_precondition("unexpected error")
- }
- }
-}
diff --git a/shared/protos/backup.proto b/shared/protos/backup.proto
deleted file mode 100644
--- a/shared/protos/backup.proto
+++ /dev/null
@@ -1,96 +0,0 @@
-syntax = "proto3";
-
-package backup;
-
-import "google/protobuf/empty.proto";
-
-/**
- * API - description
- * CreateNewBackup - This method is called when we want to create a new backup.
- * We send a new backup key encrypted with the user's password and also the
- * new compaction. New logs that will be sent from now on will be assigned to
- * this backup.
- * SendLog - User sends a new log to the backup service. The log is being
- * assigned to the latest(or desired) backup's compaction item.
- * RecoverBackupKey - Pulls data necessary for regenerating the backup key
- * on the client-side for the latest(or desired) backup
- * PullBackup - Fetches compaction + all logs assigned to it for the
- * specified backup(default is the last backup)
- */
-
-service BackupService {
- rpc CreateNewBackup(stream CreateNewBackupRequest) returns (stream CreateNewBackupResponse) {}
- rpc SendLog(stream SendLogRequest) returns (SendLogResponse) {}
- rpc RecoverBackupKey(stream RecoverBackupKeyRequest) returns (stream RecoverBackupKeyResponse) {}
- rpc PullBackup(PullBackupRequest) returns (stream PullBackupResponse) {}
- rpc AddAttachments(AddAttachmentsRequest) returns (google.protobuf.Empty) {}
-}
-
-// CreateNewBackup
-
-message CreateNewBackupRequest {
- oneof data {
- string userID = 1;
- string deviceID = 2;
- bytes keyEntropy = 3;
- bytes newCompactionHash = 4;
- bytes newCompactionChunk = 5;
- }
-}
-
-message CreateNewBackupResponse {
- string backupID = 1;
-}
-
-// SendLog
-
-message SendLogRequest {
- oneof data {
- string userID = 1;
- string backupID = 2;
- bytes logHash = 3;
- bytes logData = 4;
- }
-}
-
-message SendLogResponse {
- string logCheckpoint = 1;
-}
-
-// RecoverBackupKey
-
-message RecoverBackupKeyRequest {
- string userID = 1;
-}
-
-message RecoverBackupKeyResponse {
- string backupID = 4;
-}
-
-// PullBackup
-
-message PullBackupRequest {
- string userID = 1;
- string backupID = 2;
-}
-
-message PullBackupResponse {
- oneof id {
- string backupID = 1;
- string logID = 2;
- }
- oneof data {
- bytes compactionChunk = 3;
- bytes logChunk = 4;
- }
- optional string attachmentHolders = 5;
-}
-
-// AddAttachment
-
-message AddAttachmentsRequest {
- string userID = 1;
- string backupID = 2;
- string logID = 3;
- string holders = 4;
-}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 23, 8:42 AM (17 h, 56 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2567909
Default Alt Text
D8796.id30150.diff (60 KB)
Attached To
Mode
D8796: [backup] Remove gRPC from backup
Attached
Detach File
Event Timeline
Log In to Comment