diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs index 082ec0759..f7cd39ac2 100644 --- a/native/native_rust_library/src/backup.rs +++ b/native/native_rust_library/src/backup.rs @@ -1,513 +1,513 @@ mod compaction_upload_promises; mod file_info; mod upload_handler; use crate::argon2_tools::{compute_backup_key, compute_backup_key_str}; use crate::constants::{aes, secure_store}; use crate::ffi::{ create_main_compaction, get_backup_directory_path, get_backup_user_keys_file_path, get_siwe_backup_message_path, restore_from_backup_log, restore_from_main_compaction, secure_store_get, string_callback, void_callback, }; use crate::utils::future_manager; use crate::utils::jsi_callbacks::handle_string_result_as_callback; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; use backup_client::{ BackupClient, BackupDescriptor, LatestBackupIDResponse, RequestedData, TryStreamExt, UserIdentity, }; use serde::{Deserialize, Serialize}; use siwe::Message; use std::error::Error; use std::path::PathBuf; pub mod ffi { use super::*; pub use upload_handler::ffi::*; pub fn create_backup( backup_id: String, backup_secret: String, pickle_key: String, pickled_account: String, siwe_backup_msg: String, promise_id: u32, ) { compaction_upload_promises::insert(backup_id.clone(), promise_id); RUNTIME.spawn(async move { let result = create_userkeys_compaction( backup_id.clone(), backup_secret, pickle_key, pickled_account, ) .await .map_err(|err| err.to_string()); if let Err(err) = result { compaction_upload_promises::resolve(&backup_id, Err(err)); return; } if !siwe_backup_msg.is_empty() { if let Err(err) = create_siwe_backup_msg_compaction(&backup_id, siwe_backup_msg).await { compaction_upload_promises::resolve(&backup_id, Err(err.to_string())); return; } } let (future_id, future) = future_manager::new_future::<()>().await; create_main_compaction(&backup_id, future_id); if let Err(err) = future.await { compaction_upload_promises::resolve(&backup_id, Err(err)); tokio::spawn(upload_handler::compaction::cleanup_files(backup_id)); return; } trigger_backup_file_upload(); // The promise will be resolved when the backup is uploaded }); } pub fn restore_backup( backup_secret: String, backup_id: String, max_version: String, promise_id: u32, ) { RUNTIME.spawn(async move { let backup_id = Option::from(backup_id).filter(|it| !it.is_empty()); let result = download_backup(backup_secret, backup_id) .await .map_err(|err| err.to_string()); let result = match result { Ok(result) => result, Err(error) => { void_callback(error, promise_id); return; } }; let (future_id, future) = future_manager::new_future::<()>().await; restore_from_main_compaction( &result.backup_restoration_path.to_string_lossy(), &result.backup_data_key, &max_version, future_id, ); if let Err(error) = future.await { void_callback(error, promise_id); return; } if let Err(error) = download_and_apply_logs(&result.backup_id, result.backup_log_data_key) .await { void_callback(error.to_string(), promise_id); return; } void_callback(String::new(), promise_id); }); } pub fn retrieve_backup_keys(backup_secret: String, promise_id: u32) { RUNTIME.spawn(async move { let latest_backup_id_response = download_latest_backup_id() .await .map_err(|err| err.to_string()); let backup_id = match latest_backup_id_response { Ok(result) => result.backup_id, Err(error) => { string_callback(error, promise_id, "".to_string()); return; } }; let result = download_backup_keys(backup_id, backup_secret) .await .map_err(|err| err.to_string()); let result = match result { Ok(result) => result, Err(error) => { string_callback(error, promise_id, "".to_string()); return; } }; let serialize_result = serde_json::to_string(&result); handle_string_result_as_callback(serialize_result, promise_id); }); } pub fn restore_backup_data( backup_id: String, backup_data_key: String, backup_log_data_key: String, max_version: String, promise_id: u32, ) { RUNTIME.spawn(async move { let backup_keys = BackupKeysResult { backup_id, backup_data_key, backup_log_data_key, }; let result = download_backup_data(backup_keys) .await .map_err(|err| err.to_string()); let result = match result { Ok(result) => result, Err(error) => { void_callback(error, promise_id); return; } }; let (future_id, future) = future_manager::new_future::<()>().await; restore_from_main_compaction( &result.backup_restoration_path.to_string_lossy(), &result.backup_data_key, &max_version, future_id, ); if let Err(error) = future.await { void_callback(error, promise_id); return; } if let Err(error) = download_and_apply_logs(&result.backup_id, result.backup_log_data_key) .await { void_callback(error.to_string(), promise_id); return; } void_callback(String::new(), promise_id); }); } pub fn retrieve_latest_siwe_backup_data(promise_id: u32) { RUNTIME.spawn(async move { let result = download_latest_backup_id() .await .map_err(|err| err.to_string()); let result = match result { Ok(result) => result, Err(error) => { string_callback(error, promise_id, "".to_string()); return; } }; let LatestBackupIDResponse { backup_id, siwe_backup_msg, } = result; let siwe_backup_msg_string = match siwe_backup_msg { Some(siwe_backup_msg_value) => siwe_backup_msg_value, None => { string_callback( "Backup message unavailable".to_string(), promise_id, "".to_string(), ); return; } }; let siwe_backup_msg_obj: Message = match siwe_backup_msg_string.parse() { Ok(siwe_backup_msg_obj) => siwe_backup_msg_obj, Err(error) => { string_callback(error.to_string(), promise_id, "".to_string()); return; } }; let siwe_backup_msg_nonce = siwe_backup_msg_obj.nonce; let siwe_backup_msg_statement = match siwe_backup_msg_obj.statement { Some(statement) => statement, None => { string_callback( "Backup message invalid: missing statement".to_string(), promise_id, "".to_string(), ); return; } }; let siwe_backup_msg_issued_at = siwe_backup_msg_obj.issued_at.to_string(); let siwe_backup_data = SIWEBackupData { backup_id: backup_id, siwe_backup_msg: siwe_backup_msg_string, siwe_backup_msg_nonce: siwe_backup_msg_nonce, siwe_backup_msg_statement: siwe_backup_msg_statement, siwe_backup_msg_issued_at: siwe_backup_msg_issued_at, }; let serialize_result = serde_json::to_string(&siwe_backup_data); handle_string_result_as_callback(serialize_result, promise_id); }); } } pub async fn create_userkeys_compaction( backup_id: String, backup_secret: String, pickle_key: String, pickled_account: String, ) -> Result<(), Box> { let mut backup_key = compute_backup_key(backup_secret.as_bytes(), backup_id.as_bytes())?; let backup_data_key = secure_store_get(secure_store::SECURE_STORE_ENCRYPTION_KEY_ID)?; let backup_log_data_key = secure_store_get(secure_store::SECURE_STORE_BACKUP_LOGS_ENCRYPTION_KEY_ID)?; let user_keys = UserKeys { backup_data_key, backup_log_data_key, pickle_key, pickled_account, }; let encrypted_user_keys = user_keys.encrypt(&mut backup_key)?; let user_keys_file = get_backup_user_keys_file_path(&backup_id)?; tokio::fs::write(user_keys_file, encrypted_user_keys).await?; Ok(()) } pub async fn create_siwe_backup_msg_compaction( backup_id: &str, siwe_backup_msg: String, ) -> Result<(), Box> { let siwe_backup_msg_file = get_siwe_backup_message_path(backup_id)?; tokio::fs::write(siwe_backup_msg_file, siwe_backup_msg).await?; Ok(()) } async fn download_backup( backup_secret: String, backup_id: Option, ) -> Result> { let backup_id = match backup_id { Some(backup_id) => backup_id, None => { let latest_backup_id_response = download_latest_backup_id().await?; latest_backup_id_response.backup_id } }; let backup_keys = download_backup_keys(backup_id, backup_secret).await?; download_backup_data(backup_keys).await } async fn download_latest_backup_id( ) -> Result> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; let latest_backup_descriptor = BackupDescriptor::Latest { - username: user_identity.user_id.clone(), + user_identifier: user_identity.user_id.clone(), }; let backup_id_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupID) .await?; let LatestBackupIDResponse { backup_id, siwe_backup_msg, } = serde_json::from_slice(&backup_id_response)?; Ok(LatestBackupIDResponse { backup_id, siwe_backup_msg, }) } async fn download_backup_keys( backup_id: String, backup_secret: String, ) -> Result> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; let latest_backup_descriptor = BackupDescriptor::Latest { - username: user_identity.user_id.clone(), + user_identifier: user_identity.user_id.clone(), }; let mut encrypted_user_keys = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys) .await?; let mut backup_key = compute_backup_key_str(&backup_secret, &backup_id)?; let user_keys = UserKeys::from_encrypted(&mut encrypted_user_keys, &mut backup_key)?; Ok(BackupKeysResult { backup_id, backup_data_key: user_keys.backup_data_key, backup_log_data_key: user_keys.backup_log_data_key, }) } async fn download_backup_data( backup_keys: BackupKeysResult, ) -> Result> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; let BackupKeysResult { backup_id, backup_data_key, backup_log_data_key, } = backup_keys; let backup_data_descriptor = BackupDescriptor::BackupID { backup_id: backup_id.clone(), user_identity: user_identity.clone(), }; let encrypted_user_data = backup_client .download_backup_data(&backup_data_descriptor, RequestedData::UserData) .await?; let backup_restoration_path = PathBuf::from(get_backup_directory_path()?).join("restore_compaction"); tokio::fs::write(&backup_restoration_path, encrypted_user_data).await?; Ok(CompactionDownloadResult { backup_restoration_path, backup_id, backup_data_key, backup_log_data_key, }) } async fn download_and_apply_logs( backup_id: &str, backup_log_data_key: String, ) -> Result<(), Box> { let mut backup_log_data_key = backup_log_data_key.into_bytes(); let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; let stream = backup_client.download_logs(&user_identity, backup_id).await; let mut stream = Box::pin(stream); while let Some(mut log) = stream.try_next().await? { let data = decrypt( backup_log_data_key.as_mut_slice(), log.content.as_mut_slice(), )?; let (future_id, future) = future_manager::new_future::<()>().await; restore_from_backup_log(data, future_id); future.await?; } Ok(()) } fn get_user_identity_from_secure_store() -> Result { Ok(UserIdentity { user_id: secure_store_get(secure_store::USER_ID)?, access_token: secure_store_get(secure_store::COMM_SERVICES_ACCESS_TOKEN)?, device_id: secure_store_get(secure_store::DEVICE_ID)?, }) } // This struct should match `BackupKeys` in `lib/types/backup-types.js` #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct BackupKeysResult { #[serde(rename = "backupID")] backup_id: String, backup_data_key: String, backup_log_data_key: String, } #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct SIWEBackupData { #[serde(rename = "backupID")] backup_id: String, siwe_backup_msg: String, siwe_backup_msg_statement: String, siwe_backup_msg_nonce: String, siwe_backup_msg_issued_at: String, } struct CompactionDownloadResult { backup_id: String, backup_restoration_path: PathBuf, backup_data_key: String, backup_log_data_key: String, } #[derive(Debug, Serialize, Deserialize)] struct UserKeys { backup_data_key: String, backup_log_data_key: String, pickle_key: String, pickled_account: String, } impl UserKeys { fn encrypt(&self, backup_key: &mut [u8]) -> Result, Box> { let mut json = serde_json::to_vec(self)?; encrypt(backup_key, &mut json) } fn from_encrypted( data: &mut [u8], backup_key: &mut [u8], ) -> Result> { let decrypted = decrypt(backup_key, data)?; Ok(serde_json::from_slice(&decrypted)?) } } fn encrypt(key: &mut [u8], data: &mut [u8]) -> Result, Box> { let encrypted_len = data.len() + aes::IV_LENGTH + aes::TAG_LENGTH; let mut encrypted = vec![0; encrypted_len]; crate::ffi::encrypt(key, data, &mut encrypted)?; Ok(encrypted) } fn decrypt(key: &mut [u8], data: &mut [u8]) -> Result, Box> { let decrypted_len = data.len() - aes::IV_LENGTH - aes::TAG_LENGTH; let mut decrypted = vec![0; decrypted_len]; crate::ffi::decrypt(key, data, &mut decrypted)?; Ok(decrypted) } diff --git a/services/backup/src/http/handlers/backup.rs b/services/backup/src/http/handlers/backup.rs index 9f24f4b84..77268d3e9 100644 --- a/services/backup/src/http/handlers/backup.rs +++ b/services/backup/src/http/handlers/backup.rs @@ -1,331 +1,331 @@ use actix_web::{ error::ErrorBadRequest, web::{self, Bytes}, HttpResponse, Responder, }; use comm_lib::{ auth::UserIdentity, backup::LatestBackupIDResponse, blob::{client::BlobServiceClient, types::BlobInfo}, http::{ auth_service::Authenticated, multipart::{get_named_text_field, get_text_field}, }, tools::Defer, }; use std::convert::Infallible; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tracing::{info, instrument, trace, warn}; use crate::identity::find_user_id; use crate::{ database::{backup_item::BackupItem, DatabaseClient}, error::BackupError, }; #[instrument(skip_all, fields(backup_id))] pub async fn upload( user: UserIdentity, blob_client: Authenticated, db_client: web::Data, mut multipart: actix_multipart::Multipart, ) -> actix_web::Result { let backup_id = get_named_text_field("backup_id", &mut multipart).await?; let blob_client = blob_client.with_user_identity(user.clone()); tracing::Span::current().record("backup_id", &backup_id); info!("Backup data upload started"); let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_keys_hash", "user_keys", ) .await?; let (user_data_blob_info, user_data_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_data_hash", "user_data", ) .await?; let attachments_hashes: Vec = match get_text_field(&mut multipart).await? { Some((name, attachments)) => { if name != "attachments" { warn!( name, "Malformed request: 'attachments' text field expected." ); return Err(ErrorBadRequest("Bad request")); } attachments.lines().map(ToString::to_string).collect() } None => Vec::new(), }; let mut attachments = Vec::new(); let mut attachments_revokes = Vec::new(); for attachment_hash in attachments_hashes { let (holder, revoke) = create_attachment_holder(&attachment_hash, &blob_client).await?; attachments.push(BlobInfo { blob_hash: attachment_hash, holder, }); attachments_revokes.push(revoke); } let siwe_backup_msg_option: Option = match get_text_field(&mut multipart).await? { Some((name, siwe_backup_msg)) => { if name == "siwe_backup_msg" { Some(siwe_backup_msg) } else { None } } _ => None, }; let item = BackupItem::new( user.user_id.clone(), backup_id, user_keys_blob_info, user_data_blob_info, attachments, siwe_backup_msg_option, ); db_client .put_backup_item(item) .await .map_err(BackupError::from)?; user_keys_revoke.cancel(); user_data_revoke.cancel(); for attachment_revoke in attachments_revokes { attachment_revoke.cancel(); } db_client .remove_old_backups(&user.user_id, &blob_client) .await .map_err(BackupError::from)?; Ok(HttpResponse::Ok().finish()) } #[instrument(skip_all, fields(hash_field_name, data_field_name))] async fn forward_field_to_blob<'revoke, 'blob: 'revoke>( multipart: &mut actix_multipart::Multipart, blob_client: &'blob BlobServiceClient, hash_field_name: &str, data_field_name: &str, ) -> actix_web::Result<(BlobInfo, Defer<'revoke>)> { trace!("Reading blob fields: {hash_field_name:?}, {data_field_name:?}"); let blob_hash = get_named_text_field(hash_field_name, multipart).await?; let Some(mut field) = multipart.try_next().await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request"))?; }; if field.name() != data_field_name { warn!( hash_field_name, "Malformed request: '{data_field_name}' data field expected." ); return Err(ErrorBadRequest("Bad request"))?; } let blob_info = BlobInfo { blob_hash, holder: uuid::Uuid::new_v4().to_string(), }; // [`actix_multipart::Multipart`] isn't [`std::marker::Send`], and so we cannot pass it to the blob client directly. // Instead we have to forward it to a channel and create stream from the receiver. let (tx, rx) = tokio::sync::mpsc::channel(1); let receive_promise = async move { trace!("Receiving blob data"); // [`actix_multipart::MultipartError`] isn't [`std::marker::Send`] so we return it here, and pass [`Infallible`] // as the error to the channel while let Some(chunk) = field.try_next().await? { if let Err(err) = tx.send(Result::::Ok(chunk)).await { warn!("Error when sending data through a channel: '{err}'"); // Error here means that the channel has been closed from the blob client side. We don't want to return an error // here, because `tokio::try_join!` only returns the first error it receives and we want to prioritize the backup // client error. break; } } trace!("Finished receiving blob data"); Result::<(), actix_web::Error>::Ok(()) }; let data_stream = ReceiverStream::new(rx); let send_promise = async { blob_client .simple_put(&blob_info.blob_hash, &blob_info.holder, data_stream) .await .map_err(BackupError::from)?; Ok(()) }; tokio::try_join!(receive_promise, send_promise)?; let revoke_info = blob_info.clone(); let revoke_holder = Defer::new(|| { blob_client .schedule_revoke_holder(revoke_info.blob_hash, revoke_info.holder) }); Ok((blob_info, revoke_holder)) } #[instrument(skip_all)] async fn create_attachment_holder<'revoke, 'blob: 'revoke>( attachment: &str, blob_client: &'blob BlobServiceClient, ) -> Result<(String, Defer<'revoke>), BackupError> { let holder = uuid::Uuid::new_v4().to_string(); if !blob_client .assign_holder(attachment, &holder) .await .map_err(BackupError::from)? { warn!("Blob attachment with hash {attachment:?} doesn't exist"); } let revoke_hash = attachment.to_string(); let revoke_holder = holder.clone(); let revoke_holder = Defer::new(|| { blob_client.schedule_revoke_holder(revoke_hash, revoke_holder) }); Ok((holder, revoke_holder)) } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_keys( user: UserIdentity, path: web::Path, blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { info!("Download user keys request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_keys, &user.user_id, &backup_id, blob_client.into_inner(), db_client, ) .await } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_data( user: UserIdentity, path: web::Path, blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { info!("Download user data request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_data, &user.user_id, &backup_id, blob_client.into_inner(), db_client, ) .await } pub async fn download_user_blob( data_extractor: impl FnOnce(&BackupItem) -> &BlobInfo, user_id: &str, backup_id: &str, blob_client: BlobServiceClient, db_client: web::Data, ) -> actix_web::Result { let backup_item = db_client .find_backup_item(user_id, backup_id) .await .map_err(BackupError::from)? .ok_or(BackupError::NoBackup)?; let stream = blob_client .get(&data_extractor(&backup_item).blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } #[instrument(skip_all, fields(username = %path))] pub async fn get_latest_backup_id( path: web::Path, db_client: web::Data, ) -> actix_web::Result { - let username = path.into_inner(); - let user_id = find_user_id(&username).await?; + let user_identifier = path.into_inner(); + let user_id = find_user_id(&user_identifier).await?; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let response = LatestBackupIDResponse { backup_id: backup_item.backup_id, siwe_backup_msg: backup_item.siwe_backup_msg, }; Ok(web::Json(response)) } #[instrument(skip_all, fields(username = %path))] pub async fn download_latest_backup_keys( path: web::Path, db_client: web::Data, blob_client: Authenticated, ) -> actix_web::Result { - let username = path.into_inner(); - let user_id = find_user_id(&username).await?; + let user_identifier = path.into_inner(); + let user_id = find_user_id(&user_identifier).await?; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let stream = blob_client .get(&backup_item.user_keys.blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } diff --git a/services/backup/src/http/mod.rs b/services/backup/src/http/mod.rs index c9d54a69a..b01fc29aa 100644 --- a/services/backup/src/http/mod.rs +++ b/services/backup/src/http/mod.rs @@ -1,85 +1,85 @@ use actix_web::{web, App, HttpResponse, HttpServer}; use anyhow::Result; use comm_lib::{ auth::AuthService, blob::client::BlobServiceClient, http::auth::get_comm_authentication_middleware, }; use tracing::info; use crate::{database::DatabaseClient, http::handlers::log::handle_ws, CONFIG}; mod handlers { pub(super) mod backup; pub(super) mod log; pub(super) mod user_data; } pub async fn run_http_server( db_client: DatabaseClient, blob_client: BlobServiceClient, auth_service: AuthService, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); let db = web::Data::new(db_client); let blob = web::Data::new(blob_client); HttpServer::new(move || { App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(comm_lib::http::cors_config( CONFIG.localstack_endpoint.is_some(), )) .app_data(db.clone()) .app_data(blob.clone()) .app_data(auth_service.to_owned()) .route("/health", web::get().to(HttpResponse::Ok)) .service( // Backup services that don't require authetication web::scope("/backups/latest") .service( - web::resource("{username}/backup_id") + web::resource("{user_identifier}/backup_id") .route(web::get().to(handlers::backup::get_latest_backup_id)), ) - .service(web::resource("{username}/user_keys").route( + .service(web::resource("{user_identifier}/user_keys").route( web::get().to(handlers::backup::download_latest_backup_keys), )), ) .service( // Backup services requiring authetication web::scope("/backups") .wrap(get_comm_authentication_middleware()) .service( web::resource("").route(web::post().to(handlers::backup::upload)), ) .service( web::resource("{backup_id}/user_keys") .route(web::get().to(handlers::backup::download_user_keys)), ) .service( web::resource("{backup_id}/user_data") .route(web::get().to(handlers::backup::download_user_data)), ), ) .service( web::scope("/logs") .service(web::resource("").route(web::get().to(handle_ws))), ) .service( web::scope("/user_data") .wrap(get_comm_authentication_middleware()) .service( web::resource("{user_id}") .route(web::delete().to(handlers::user_data::delete_user_data)), ), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 3ad50c9cb..48351ca00 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,235 +1,235 @@ use backup_client::{ BackupClient, BackupData, BackupDescriptor, DownloadedLog, Error as BackupClientError, LogUploadConfirmation, RequestedData, SinkExt, StreamExt, TryStreamExt, }; use bytesize::ByteSize; use comm_lib::{ auth::UserIdentity, backup::{LatestBackupIDResponse, UploadLogRequest}, }; use commtest::identity::device::register_user_device; use commtest::{ service_addr, tools::{generate_stable_nbytes, Error}, }; use grpc_clients::identity::DeviceType; use reqwest::StatusCode; use std::collections::HashSet; use uuid::Uuid; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; let device_info = register_user_device(None, Some(DeviceType::Ios)).await; let user_identity = UserIdentity { user_id: device_info.user_id.clone(), access_token: device_info.access_token, device_id: device_info.device_id, }; let backup_datas = generate_backup_data(); // Upload backups for (backup_data, log_datas) in &backup_datas { backup_client .upload_backup(&user_identity, backup_data.clone()) .await?; let (mut tx, rx) = backup_client.upload_logs(&user_identity).await?; for log_data in log_datas { tx.send(log_data.clone()).await?; } let result: HashSet = rx.take(log_datas.len()).try_collect().await?; let expected = log_datas .iter() .map(|data| LogUploadConfirmation { backup_id: data.backup_id.clone(), log_id: data.log_id, }) .collect(); assert_eq!(result, expected); } // Test direct lookup let (backup_data, log_datas) = &backup_datas[1]; let second_backup_descriptor = BackupDescriptor::BackupID { backup_id: backup_data.backup_id.clone(), user_identity: user_identity.clone(), }; let user_keys = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); let user_data = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserData) .await?; assert_eq!(user_data, backup_data.user_data); // Test latest backup lookup for nonexistent user let latest_backup_descriptor = BackupDescriptor::Latest { - username: "nonexistent_user".to_string(), + user_identifier: "nonexistent_user".to_string(), }; let nonexistent_user_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupID) .await; match nonexistent_user_response { Ok(_) => panic!("Expected error, but got success response"), Err(BackupClientError::ReqwestError(error)) => { assert_eq!( error.status(), Some(StatusCode::BAD_REQUEST), "Expected bad request status" ); } Err(_) => panic!("Unexpected error type"), } // Test latest backup lookup let latest_backup_descriptor = BackupDescriptor::Latest { - username: device_info.username, + user_identifier: device_info.username, }; let backup_id_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupID) .await?; let response: LatestBackupIDResponse = serde_json::from_slice(&backup_id_response)?; assert_eq!(response.backup_id, backup_data.backup_id); let user_keys = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); // Test log download let log_stream = backup_client .download_logs(&user_identity, &backup_data.backup_id) .await; let downloaded_logs: Vec = log_stream.try_collect().await?; let expected_logs: Vec = log_datas .iter() .map(|data| DownloadedLog { content: data.content.clone(), attachments: data.attachments.clone(), }) .collect(); assert_eq!(downloaded_logs, expected_logs); // Test backup cleanup let (removed_backup, _) = &backup_datas[0]; let removed_backup_descriptor = BackupDescriptor::BackupID { backup_id: removed_backup.backup_id.clone(), user_identity: user_identity.clone(), }; let response = backup_client .download_backup_data(&removed_backup_descriptor, RequestedData::UserKeys) .await; let Err(BackupClientError::ReqwestError(error)) = response else { panic!("First backup should have been removed, instead got response: {response:?}"); }; assert_eq!( error.status(), Some(StatusCode::NOT_FOUND), "Expected status 'not found'" ); // Test log cleanup let log_stream = backup_client .download_logs(&user_identity, &removed_backup.backup_id) .await; let downloaded_logs: Vec = log_stream.try_collect().await?; if !downloaded_logs.is_empty() { panic!( "Logs for first backup should have been removed, \ instead got: {downloaded_logs:?}" ) } Ok(()) } fn generate_backup_data() -> [(BackupData, Vec); 2] { [ ( BackupData { backup_id: "b1".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'a'), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'A'), ), attachments: vec![], siwe_backup_msg: None, }, generate_log_data("b1", b'a'), ), ( BackupData { backup_id: "b2".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'b'), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'B'), ), attachments: vec![], siwe_backup_msg: None, }, generate_log_data("b2", b'b'), ), ] } fn generate_log_data(backup_id: &str, value: u8) -> Vec { const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize; const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize; (1..30) .map(|log_id| { let size = if log_id % 2 == 0 { IN_DB_SIZE } else { IN_BLOB_SIZE }; let attachments = if log_id % 10 == 0 { Some(vec![Uuid::new_v4().to_string()]) } else { None }; let mut content = generate_stable_nbytes(size, Some(value)); let unique_suffix = log_id.to_string(); content.extend(unique_suffix.as_bytes()); UploadLogRequest { backup_id: backup_id.to_string(), log_id, content, attachments, } }) .collect() } diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs index c3e7c96a6..8b0b74ed6 100644 --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -1,188 +1,189 @@ use backup_client::{ BackupClient, BackupData, BackupDescriptor, RequestedData, }; use bytesize::ByteSize; use comm_lib::{auth::UserIdentity, backup::LatestBackupIDResponse}; use commtest::identity::device::register_user_device; use commtest::{ service_addr, tools::{generate_stable_nbytes, obtain_number_of_threads, Error}, }; use grpc_clients::identity::DeviceType; use tokio::{runtime::Runtime, task::JoinSet}; #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; let number_of_threads = obtain_number_of_threads(); let rt = Runtime::new().unwrap(); println!( "Running performance tests for backup, number of threads: {}", number_of_threads ); let mut backup_data = vec![]; for i in 0..number_of_threads { backup_data.push(BackupData { backup_id: format!("b{i}"), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(i as u8), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(i as u8), ), attachments: vec![], siwe_backup_msg: None, }); } let device_info_1 = register_user_device(None, Some(DeviceType::Ios)).await; let device_info_2 = register_user_device(None, Some(DeviceType::Ios)).await; let user_identities = [ UserIdentity { user_id: device_info_1.user_id.clone(), access_token: device_info_1.access_token, device_id: device_info_1.device_id, }, UserIdentity { user_id: device_info_2.user_id.clone(), access_token: device_info_2.access_token, device_id: device_info_2.device_id, }, ]; tokio::task::spawn_blocking(move || { println!("Creating new backups"); rt.block_on(async { let mut set = JoinSet::new(); for (i, item) in backup_data.iter().cloned().enumerate() { let backup_client = backup_client.clone(); let user = user_identities[i % user_identities.len()].clone(); set.spawn(async move { backup_client.upload_backup(&user, item).await.unwrap(); }); } while let Some(result) = set.join_next().await { result.unwrap(); } }); let mut latest_ids_for_user = vec![]; println!("Reading latest ids"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let backup_client = backup_client.clone(); - let username = if user.user_id == device_info_1.user_id { + let user_identifier = if user.user_id == device_info_1.user_id { device_info_1.username.clone() } else { device_info_2.username.clone() }; - let descriptor = BackupDescriptor::Latest { username }; + let descriptor = BackupDescriptor::Latest { user_identifier }; handlers.push(tokio::spawn(async move { let response = backup_client .download_backup_data(&descriptor, RequestedData::BackupID) .await .unwrap(); serde_json::from_slice::(&response).unwrap() })); } for handler in handlers { latest_ids_for_user.push(handler.await.unwrap().backup_id); } }); assert_eq!(latest_ids_for_user.len(), user_identities.len()); let mut latest_user_keys_for_user = vec![]; println!("Reading latest user keys"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let backup_client = backup_client.clone(); - let username = if user.user_id == device_info_1.user_id { + + let user_identifier = if user.user_id == device_info_1.user_id { device_info_1.username.clone() } else { device_info_2.username.clone() }; - let descriptor = BackupDescriptor::Latest { username }; + let descriptor = BackupDescriptor::Latest { user_identifier }; handlers.push(tokio::spawn(async move { backup_client .download_backup_data(&descriptor, RequestedData::UserKeys) .await .unwrap() })); } for handler in handlers { latest_user_keys_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_keys_for_user.len(), user_identities.len()); for (backup_id, user_keys) in latest_ids_for_user.iter().zip(latest_user_keys_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_keys, user_keys); } let mut latest_user_data_for_user = vec![]; println!("Reading latest user data"); rt.block_on(async { let mut handlers = vec![]; for (i, backup_id) in latest_ids_for_user.iter().enumerate() { let backup_client = backup_client.clone(); let descriptor = BackupDescriptor::BackupID { backup_id: backup_id.clone(), user_identity: user_identities[i % user_identities.len()].clone(), }; handlers.push(tokio::spawn(async move { backup_client .download_backup_data(&descriptor, RequestedData::UserData) .await .unwrap() })); } for handler in handlers { latest_user_data_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_data_for_user.len(), user_identities.len()); for (backup_id, user_data) in latest_ids_for_user.iter().zip(latest_user_data_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_data, user_data); } }) .await .expect("Task panicked"); Ok(()) } diff --git a/shared/backup_client/src/lib.rs b/shared/backup_client/src/lib.rs index 038cfa720..9cc9cdf90 100644 --- a/shared/backup_client/src/lib.rs +++ b/shared/backup_client/src/lib.rs @@ -1,385 +1,385 @@ #[cfg(target_arch = "wasm32")] mod web; use async_stream::{stream, try_stream}; pub use comm_lib::auth::UserIdentity; pub use comm_lib::backup::{ DownloadLogsRequest, LatestBackupIDResponse, LogWSRequest, LogWSResponse, UploadLogRequest, }; pub use futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt}; use hex::ToHex; use reqwest::{ header::InvalidHeaderValue, multipart::{Form, Part}, Body, }; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::time::Duration; use tokio_tungstenite_wasm::{ connect, Error as TungsteniteError, Message::Binary, }; const LOG_DOWNLOAD_RETRY_DELAY: Duration = Duration::from_secs(5); const LOG_DOWNLOAD_MAX_RETRY: usize = 3; #[cfg(target_arch = "wasm32")] use wasm_bindgen::prelude::wasm_bindgen; #[cfg_attr(target_arch = "wasm32", wasm_bindgen)] #[derive(Debug, Clone)] pub struct BackupClient { url: reqwest::Url, } impl BackupClient { pub fn new>(url: T) -> Result { Ok(BackupClient { url: url.try_into()?, }) } } /// Backup functions impl BackupClient { pub async fn upload_backup( &self, user_identity: &UserIdentity, backup_data: BackupData, ) -> Result<(), Error> { let BackupData { backup_id, user_keys, user_data, attachments, siwe_backup_msg, } = backup_data; let client = reqwest::Client::new(); let mut form = Form::new() .text("backup_id", backup_id) .text( "user_keys_hash", Sha256::digest(&user_keys).encode_hex::(), ) .part("user_keys", Part::stream(Body::from(user_keys))) .text( "user_data_hash", Sha256::digest(&user_data).encode_hex::(), ) .part("user_data", Part::stream(Body::from(user_data))) .text("attachments", attachments.join("\n")); if let Some(siwe_backup_msg_value) = siwe_backup_msg { form = form.text("siwe_backup_msg", siwe_backup_msg_value); } let response = client .post(self.url.join("backups")?) .bearer_auth(user_identity.as_authorization_token()?) .multipart(form) .send() .await?; response.error_for_status()?; Ok(()) } pub async fn download_backup_data( &self, backup_descriptor: &BackupDescriptor, requested_data: RequestedData, ) -> Result, Error> { let client = reqwest::Client::new(); let url = self.url.join("backups/")?; let url = match backup_descriptor { BackupDescriptor::BackupID { backup_id, .. } => { url.join(&format!("{backup_id}/"))? } - BackupDescriptor::Latest { username } => { - url.join(&format!("latest/{username}/"))? + BackupDescriptor::Latest { user_identifier } => { + url.join(&format!("latest/{user_identifier}/"))? } }; let url = match &requested_data { RequestedData::BackupID => url.join("backup_id")?, RequestedData::UserKeys => url.join("user_keys")?, RequestedData::UserData => url.join("user_data")?, }; let mut request = client.get(url); if let BackupDescriptor::BackupID { user_identity, .. } = backup_descriptor { request = request.bearer_auth(user_identity.as_authorization_token()?) } let response = request.send().await?; let result = response.error_for_status()?.bytes().await?.to_vec(); Ok(result) } } /// Log functions impl BackupClient { pub async fn upload_logs( &self, user_identity: &UserIdentity, ) -> Result< ( impl Sink, impl Stream>, ), Error, > { let (tx, rx) = self.create_log_ws_connection(user_identity).await?; let rx = rx.map(|response| match response? { LogWSResponse::LogUploaded { backup_id, log_id } => { Ok(LogUploadConfirmation { backup_id, log_id }) } LogWSResponse::ServerError => Err(Error::ServerError), msg => Err(Error::InvalidBackupMessage(msg)), }); Ok((tx, rx)) } /// Handles complete log download. /// It will try and retry download a few times, but if the issues persist /// the next item returned will be the last received error and the stream /// will be closed. pub async fn download_logs<'this>( &'this self, user_identity: &'this UserIdentity, backup_id: &'this str, ) -> impl Stream> + 'this { stream! { let mut last_downloaded_log = None; let mut fail_count = 0; 'retry: loop { let stream = self.log_download_stream(user_identity, backup_id, &mut last_downloaded_log).await; let mut stream = Box::pin(stream); while let Some(item) = stream.next().await { match item { Ok(log) => yield Ok(log), Err(err) => { println!("Error when downloading logs: {err:?}"); fail_count += 1; if fail_count >= LOG_DOWNLOAD_MAX_RETRY { yield Err(err); break 'retry; } #[cfg(target_arch = "wasm32")] let _ = web::sleep(LOG_DOWNLOAD_RETRY_DELAY).await; #[cfg(not(target_arch = "wasm32"))] tokio::time::sleep(LOG_DOWNLOAD_RETRY_DELAY).await; continue 'retry; } } } // Everything downloaded return; } println!("Log download failed!"); } } /// Handles singular connection websocket connection. Returns error in case /// anything goes wrong e.g. missing log or connection error. async fn log_download_stream<'stream>( &'stream self, user_identity: &'stream UserIdentity, backup_id: &'stream str, last_downloaded_log: &'stream mut Option, ) -> impl Stream> + 'stream { try_stream! { let (mut tx, mut rx) = self.create_log_ws_connection(user_identity).await?; tx.send(DownloadLogsRequest { backup_id: backup_id.to_string(), from_id: *last_downloaded_log, }) .await?; while let Some(response) = rx.try_next().await? { let expected_log_id = last_downloaded_log.unwrap_or(0); match response { LogWSResponse::LogDownload { content, attachments, log_id, } if log_id == expected_log_id + 1 => { *last_downloaded_log = Some(log_id); yield DownloadedLog { content, attachments, }; } LogWSResponse::LogDownload { .. } => { Err(Error::LogMissing)?; } LogWSResponse::LogDownloadFinished { last_log_id: Some(log_id), } if log_id == expected_log_id => { tx.send(DownloadLogsRequest { backup_id: backup_id.to_string(), from_id: *last_downloaded_log, }) .await? } LogWSResponse::LogDownloadFinished { last_log_id: None } => return, LogWSResponse::LogDownloadFinished { .. } => { Err(Error::LogMissing)?; } msg => Err(Error::InvalidBackupMessage(msg))?, } } Err(Error::WSClosed)?; } } async fn create_log_ws_connection>( &self, user_identity: &UserIdentity, ) -> Result< ( impl Sink, impl Stream>, ), Error, > { let url = self.create_ws_url()?; let stream = connect(url).await?; let (mut tx, rx) = stream.split(); tx.send(Binary(bincode::serialize(&LogWSRequest::Authenticate( user_identity.clone(), ))?)) .await?; let tx = tx.with(|request: Request| async { let request: LogWSRequest = request.into(); let request = bincode::serialize(&request)?; Ok(Binary(request)) }); let rx = rx.filter_map(|msg| async { let bytes = match msg { Ok(Binary(bytes)) => bytes, Ok(_) => return Some(Err(Error::InvalidWSMessage)), Err(err) => return Some(Err(err.into())), }; match bincode::deserialize(&bytes) { Ok(response) => Some(Ok(response)), Err(err) => Some(Err(err.into())), } }); let tx = Box::pin(tx); let mut rx = Box::pin(rx); if let Some(response) = rx.try_next().await? { match response { LogWSResponse::AuthSuccess => {} LogWSResponse::Unauthenticated => Err(Error::Unauthenticated)?, msg => Err(Error::InvalidBackupMessage(msg))?, } } Ok((tx, rx)) } fn create_ws_url(&self) -> Result { let mut url = self.url.clone(); match url.scheme() { "http" => url.set_scheme("ws").map_err(|_| Error::UrlSchemaError)?, "https" => url.set_scheme("wss").map_err(|_| Error::UrlSchemaError)?, _ => (), }; let url = url.join("logs")?; Ok(url) } } #[derive(Debug, Clone)] pub struct BackupData { pub backup_id: String, pub user_keys: Vec, pub user_data: Vec, pub attachments: Vec, pub siwe_backup_msg: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum BackupDescriptor { BackupID { #[serde(rename = "backupID")] backup_id: String, #[serde(rename = "userIdentity")] user_identity: UserIdentity, }, Latest { - username: String, + user_identifier: String, }, } #[cfg_attr(target_arch = "wasm32", wasm_bindgen)] #[derive(Debug, Clone)] pub enum RequestedData { BackupID, UserKeys, UserData, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogUploadConfirmation { pub backup_id: String, pub log_id: usize, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct DownloadedLog { pub content: Vec, pub attachments: Option>, } #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { InvalidAuthorizationHeader, UrlSchemaError, UrlError(url::ParseError), ReqwestError(reqwest::Error), TungsteniteError(TungsteniteError), JsonError(serde_json::Error), BincodeError(bincode::Error), InvalidWSMessage, #[display(fmt = "Error::InvalidBackupMessage({:?})", _0)] InvalidBackupMessage(LogWSResponse), ServerError, LogMissing, WSClosed, Unauthenticated, } impl std::error::Error for Error {} impl From for Error { fn from(_: InvalidHeaderValue) -> Self { Self::InvalidAuthorizationHeader } }