diff --git a/native/cpp/CommonCpp/NativeModules/CommCoreModule.h b/native/cpp/CommonCpp/NativeModules/CommCoreModule.h --- a/native/cpp/CommonCpp/NativeModules/CommCoreModule.h +++ b/native/cpp/CommonCpp/NativeModules/CommCoreModule.h @@ -123,6 +123,8 @@ jsi::Runtime &rt, jsi::String accessToken) override; virtual jsi::Value clearCommServicesAccessToken(jsi::Runtime &rt) override; + virtual void startBackupHandler(jsi::Runtime &rt) override; + virtual jsi::Value stopBackupHandler(jsi::Runtime &rt) override; virtual jsi::Value createNewBackup( jsi::Runtime &rt, jsi::String backupSecret, diff --git a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp --- a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp +++ b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp @@ -1242,6 +1242,23 @@ }); } +void CommCoreModule::startBackupHandler(jsi::Runtime &rt) { + try { + ::startBackupHandler(); + } catch (const std::exception &e) { + throw jsi::JSError(rt, e.what()); + } +} + +jsi::Value CommCoreModule::stopBackupHandler(jsi::Runtime &rt) { + return createPromiseAsJSIValue( + rt, [=](jsi::Runtime &innerRt, std::shared_ptr promise) { + auto currentID = RustPromiseManager::instance.addPromise( + promise, this->jsInvoker_, innerRt); + ::stopBackupHandler(currentID); + }); +} + jsi::Value CommCoreModule::createNewBackup( jsi::Runtime &rt, jsi::String backupSecret, diff --git a/native/cpp/CommonCpp/_generated/commJSI-generated.cpp b/native/cpp/CommonCpp/_generated/commJSI-generated.cpp --- a/native/cpp/CommonCpp/_generated/commJSI-generated.cpp +++ b/native/cpp/CommonCpp/_generated/commJSI-generated.cpp @@ -140,6 +140,13 @@ static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_clearCommServicesAccessToken(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { return static_cast(&turboModule)->clearCommServicesAccessToken(rt); } +static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_startBackupHandler(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { + static_cast(&turboModule)->startBackupHandler(rt); + return jsi::Value::undefined(); +} +static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_stopBackupHandler(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { + return static_cast(&turboModule)->stopBackupHandler(rt); +} static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_createNewBackup(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { return static_cast(&turboModule)->createNewBackup(rt, args[0].asString(rt), args[1].asString(rt)); } diff --git a/native/cpp/CommonCpp/_generated/commJSI.h b/native/cpp/CommonCpp/_generated/commJSI.h --- a/native/cpp/CommonCpp/_generated/commJSI.h +++ b/native/cpp/CommonCpp/_generated/commJSI.h @@ -61,6 +61,8 @@ virtual jsi::Value getCommServicesAuthMetadata(jsi::Runtime &rt) = 0; virtual jsi::Value setCommServicesAccessToken(jsi::Runtime &rt, jsi::String accessToken) = 0; virtual jsi::Value clearCommServicesAccessToken(jsi::Runtime &rt) = 0; + virtual void startBackupHandler(jsi::Runtime &rt) = 0; + virtual jsi::Value stopBackupHandler(jsi::Runtime &rt) = 0; virtual jsi::Value createNewBackup(jsi::Runtime &rt, jsi::String backupSecret, jsi::String userData) = 0; virtual jsi::Value restoreBackup(jsi::Runtime &rt, jsi::String backupSecret) = 0; diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs --- a/native/native_rust_library/src/backup.rs +++ b/native/native_rust_library/src/backup.rs @@ -1,23 +1,50 @@ use crate::argon2_tools::{compute_backup_key, compute_backup_key_str}; -use crate::constants::{aes, secure_store}; +use crate::constants::{ + aes, secure_store, BACKUP_SERVICE_CONNECTION_RETRY_DELAY, +}; use crate::ffi::secure_store_get; -use crate::handle_string_result_as_callback; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; +use crate::{handle_string_result_as_callback, handle_void_result_as_callback}; use backup_client::{ BackupClient, BackupData, BackupDescriptor, DownloadLogsRequest, - LatestBackupIDResponse, LogUploadConfirmation, LogWSResponse, RequestedData, - SinkExt, StreamExt, UploadLogRequest, UserIdentity, + Error as BackupError, LatestBackupIDResponse, LogUploadConfirmation, + LogWSResponse, RequestedData, SinkExt, Stream, StreamExt, UploadLogRequest, + UserIdentity, WSError, }; +use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::convert::Infallible; use std::error::Error; +use std::future::{self, Future}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use tokio::task::JoinHandle; pub mod ffi { - use crate::handle_void_result_as_callback; - use super::*; + pub fn start_backup_handler() -> Result<(), Box> { + let mut handle = UPLOAD_HANDLER.lock()?; + match handle.take() { + // Don't start backup handler if it's already running + Some(handle) if !handle.is_finished() => (), + _ => { + *handle = Some(RUNTIME.spawn(backup_upload_handler()?)); + } + } + + Ok(()) + } + + pub fn stop_backup_handler_sync(promise_id: u32) { + RUNTIME.spawn(async move { + let result = stop_backup_handler().await; + handle_void_result_as_callback(result, promise_id); + }); + } + pub fn create_backup_sync( backup_id: String, backup_secret: String, @@ -47,6 +74,77 @@ } } +lazy_static! { + static ref UPLOAD_HANDLER: Arc>>> = + Arc::new(Mutex::new(None)); +} + +async fn stop_backup_handler() -> Result<(), Box> { + let Some(handler) = UPLOAD_HANDLER.lock()?.take() else { + return Ok(()); + }; + if handler.is_finished() { + return Ok(()); + } + + handler.abort(); + Ok(()) +} + +fn backup_upload_handler( +) -> Result, Box> { + let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; + let user_identity = get_user_identity_from_secure_store()?; + + Ok(async move { + loop { + let (tx, rx) = match backup_client.upload_logs(&user_identity).await { + Ok(ws) => ws, + Err(err) => { + println!("Backup handler error: '{err:?}'"); + tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; + continue; + } + }; + + let mut _tx = Box::pin(tx); + let mut rx = Box::pin(rx); + + let err = tokio::select! { + Err(err) = backup_data_sender() => err, + Err(err) = backup_confirmation_receiver(&mut rx) => err, + }; + + println!("Backup handler error: '{err:?}'"); + + tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; + } + }) +} + +async fn backup_data_sender() -> Result { + loop { + let () = future::pending().await; + } +} + +async fn backup_confirmation_receiver( + rx: &mut Pin>>>, +) -> Result { + while let Some(_) = rx.next().await.transpose()? {} + + Err(BackupHandlerError::WSClosed) +} + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +enum BackupHandlerError { + BackupError(BackupError), + BackupWSError(WSError), + WSClosed, +} + pub async fn create_backup( backup_id: String, backup_secret: String, diff --git a/native/native_rust_library/src/constants.rs b/native/native_rust_library/src/constants.rs --- a/native/native_rust_library/src/constants.rs +++ b/native/native_rust_library/src/constants.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + pub mod aes { pub const KEY_SIZE: usize = 32; // bytes pub const IV_LENGTH: usize = 12; // bytes - unique Initialization Vector (nonce) @@ -10,3 +12,6 @@ pub const USER_ID: &str = "userID"; pub const DEVICE_ID: &str = "deviceID"; } + +pub const BACKUP_SERVICE_CONNECTION_RETRY_DELAY: Duration = + Duration::from_secs(5); diff --git a/native/native_rust_library/src/lib.rs b/native/native_rust_library/src/lib.rs --- a/native/native_rust_library/src/lib.rs +++ b/native/native_rust_library/src/lib.rs @@ -227,6 +227,11 @@ // Backup extern "Rust" { + #[cxx_name = "startBackupHandler"] + fn start_backup_handler() -> Result<()>; + #[cxx_name = "stopBackupHandler"] + fn stop_backup_handler_sync(promise_id: u32); + #[cxx_name = "createBackup"] fn create_backup_sync( backup_id: String, diff --git a/native/schema/CommCoreModuleSchema.js b/native/schema/CommCoreModuleSchema.js --- a/native/schema/CommCoreModuleSchema.js +++ b/native/schema/CommCoreModuleSchema.js @@ -122,6 +122,8 @@ +getCommServicesAuthMetadata: () => Promise; +setCommServicesAccessToken: (accessToken: string) => Promise; +clearCommServicesAccessToken: () => Promise; + +startBackupHandler: () => void; + +stopBackupHandler: () => Promise; +createNewBackup: (backupSecret: string, userData: string) => Promise; +restoreBackup: (backupSecret: string) => Promise; } diff --git a/shared/backup_client/src/lib.rs b/shared/backup_client/src/lib.rs --- a/shared/backup_client/src/lib.rs +++ b/shared/backup_client/src/lib.rs @@ -3,9 +3,8 @@ DownloadLogsRequest, LatestBackupIDResponse, LogWSRequest, LogWSResponse, UploadLogRequest, }; -pub use futures_util::{SinkExt, StreamExt, TryStreamExt}; +pub use futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt}; -use futures_util::{Sink, Stream}; use hex::ToHex; use reqwest::{ header::InvalidHeaderValue,