diff --git a/native/cpp/CommonCpp/NativeModules/CommCoreModule.h b/native/cpp/CommonCpp/NativeModules/CommCoreModule.h --- a/native/cpp/CommonCpp/NativeModules/CommCoreModule.h +++ b/native/cpp/CommonCpp/NativeModules/CommCoreModule.h @@ -123,10 +123,10 @@ jsi::Runtime &rt, jsi::String accessToken) override; virtual jsi::Value clearCommServicesAccessToken(jsi::Runtime &rt) override; - virtual jsi::Value createNewBackup( - jsi::Runtime &rt, - jsi::String backupSecret, - jsi::String userData) override; + virtual void startBackupHandler(jsi::Runtime &rt) override; + virtual jsi::Value stopBackupHandler(jsi::Runtime &rt) override; + virtual jsi::Value + createNewBackup(jsi::Runtime &rt, jsi::String backupSecret) override; virtual jsi::Value restoreBackup(jsi::Runtime &rt, jsi::String backupSecret) override; diff --git a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp --- a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp +++ b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp @@ -1242,12 +1242,26 @@ }); } -jsi::Value CommCoreModule::createNewBackup( - jsi::Runtime &rt, - jsi::String backupSecret, - jsi::String userData) { +void CommCoreModule::startBackupHandler(jsi::Runtime &rt) { + try { + ::startBackupHandler(); + } catch (const std::exception &e) { + throw jsi::JSError(rt, e.what()); + } +} + +jsi::Value CommCoreModule::stopBackupHandler(jsi::Runtime &rt) { + return createPromiseAsJSIValue( + rt, [=](jsi::Runtime &innerRt, std::shared_ptr promise) { + auto currentID = RustPromiseManager::instance.addPromise( + promise, this->jsInvoker_, innerRt); + ::stopBackupHandler(currentID); + }); +} + +jsi::Value +CommCoreModule::createNewBackup(jsi::Runtime &rt, jsi::String backupSecret) { std::string backupSecretStr = backupSecret.utf8(rt); - std::string userDataStr = userData.utf8(rt); return createPromiseAsJSIValue( rt, [=](jsi::Runtime &innerRt, std::shared_ptr promise) { this->cryptoThread->scheduleTask([=, &innerRt]() { @@ -1282,7 +1296,6 @@ rust::string(backupSecretStr), rust::string(pickleKey), rust::string(pickledAccount), - rust::string(userDataStr), currentID); } else { this->jsInvoker_->invokeAsync( diff --git a/native/cpp/CommonCpp/_generated/commJSI-generated.cpp b/native/cpp/CommonCpp/_generated/commJSI-generated.cpp --- a/native/cpp/CommonCpp/_generated/commJSI-generated.cpp +++ b/native/cpp/CommonCpp/_generated/commJSI-generated.cpp @@ -140,8 +140,15 @@ static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_clearCommServicesAccessToken(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { return static_cast(&turboModule)->clearCommServicesAccessToken(rt); } +static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_startBackupHandler(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { + static_cast(&turboModule)->startBackupHandler(rt); + return jsi::Value::undefined(); +} +static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_stopBackupHandler(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { + return static_cast(&turboModule)->stopBackupHandler(rt); +} static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_createNewBackup(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { - return static_cast(&turboModule)->createNewBackup(rt, args[0].asString(rt), args[1].asString(rt)); + return static_cast(&turboModule)->createNewBackup(rt, args[0].asString(rt)); } static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_restoreBackup(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { return static_cast(&turboModule)->restoreBackup(rt, args[0].asString(rt)); @@ -190,7 +197,9 @@ methodMap_["getCommServicesAuthMetadata"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_getCommServicesAuthMetadata}; methodMap_["setCommServicesAccessToken"] = MethodMetadata {1, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_setCommServicesAccessToken}; methodMap_["clearCommServicesAccessToken"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_clearCommServicesAccessToken}; - methodMap_["createNewBackup"] = MethodMetadata {2, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_createNewBackup}; + methodMap_["startBackupHandler"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_startBackupHandler}; + methodMap_["stopBackupHandler"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_stopBackupHandler}; + methodMap_["createNewBackup"] = MethodMetadata {1, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_createNewBackup}; methodMap_["restoreBackup"] = MethodMetadata {1, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_restoreBackup}; } diff --git a/native/cpp/CommonCpp/_generated/commJSI.h b/native/cpp/CommonCpp/_generated/commJSI.h --- a/native/cpp/CommonCpp/_generated/commJSI.h +++ b/native/cpp/CommonCpp/_generated/commJSI.h @@ -61,7 +61,9 @@ virtual jsi::Value getCommServicesAuthMetadata(jsi::Runtime &rt) = 0; virtual jsi::Value setCommServicesAccessToken(jsi::Runtime &rt, jsi::String accessToken) = 0; virtual jsi::Value clearCommServicesAccessToken(jsi::Runtime &rt) = 0; - virtual jsi::Value createNewBackup(jsi::Runtime &rt, jsi::String backupSecret, jsi::String userData) = 0; + virtual void startBackupHandler(jsi::Runtime &rt) = 0; + virtual jsi::Value stopBackupHandler(jsi::Runtime &rt) = 0; + virtual jsi::Value createNewBackup(jsi::Runtime &rt, jsi::String backupSecret) = 0; virtual jsi::Value restoreBackup(jsi::Runtime &rt, jsi::String backupSecret) = 0; }; @@ -412,13 +414,29 @@ return bridging::callFromJs( rt, &T::clearCommServicesAccessToken, jsInvoker_, instance_); } - jsi::Value createNewBackup(jsi::Runtime &rt, jsi::String backupSecret, jsi::String userData) override { + void startBackupHandler(jsi::Runtime &rt) override { static_assert( - bridging::getParameterCount(&T::createNewBackup) == 3, - "Expected createNewBackup(...) to have 3 parameters"); + bridging::getParameterCount(&T::startBackupHandler) == 1, + "Expected startBackupHandler(...) to have 1 parameters"); + + return bridging::callFromJs( + rt, &T::startBackupHandler, jsInvoker_, instance_); + } + jsi::Value stopBackupHandler(jsi::Runtime &rt) override { + static_assert( + bridging::getParameterCount(&T::stopBackupHandler) == 1, + "Expected stopBackupHandler(...) to have 1 parameters"); + + return bridging::callFromJs( + rt, &T::stopBackupHandler, jsInvoker_, instance_); + } + jsi::Value createNewBackup(jsi::Runtime &rt, jsi::String backupSecret) override { + static_assert( + bridging::getParameterCount(&T::createNewBackup) == 2, + "Expected createNewBackup(...) to have 2 parameters"); return bridging::callFromJs( - rt, &T::createNewBackup, jsInvoker_, instance_, std::move(backupSecret), std::move(userData)); + rt, &T::createNewBackup, jsInvoker_, instance_, std::move(backupSecret)); } jsi::Value restoreBackup(jsi::Runtime &rt, jsi::String backupSecret) override { static_assert( diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs --- a/native/native_rust_library/src/backup.rs +++ b/native/native_rust_library/src/backup.rs @@ -1,23 +1,50 @@ use crate::argon2_tools::{compute_backup_key, compute_backup_key_str}; -use crate::constants::{aes, secure_store}; +use crate::constants::{ + aes, secure_store, BACKUP_SERVICE_CONNECTION_RETRY_DELAY, +}; use crate::ffi::secure_store_get; -use crate::handle_string_result_as_callback; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; +use crate::{handle_string_result_as_callback, handle_void_result_as_callback}; use backup_client::{ BackupClient, BackupData, BackupDescriptor, DownloadLogsRequest, - LatestBackupIDResponse, LogUploadConfirmation, LogWSResponse, RequestedData, - SinkExt, StreamExt, UploadLogRequest, UserIdentity, + Error as BackupError, LatestBackupIDResponse, LogUploadConfirmation, + LogWSResponse, RequestedData, SinkExt, Stream, StreamExt, UploadLogRequest, + UserIdentity, WSError, }; +use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::convert::Infallible; use std::error::Error; +use std::future::{self, Future}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use tokio::task::JoinHandle; pub mod ffi { - use crate::handle_void_result_as_callback; - use super::*; + pub fn start_backup_handler() -> Result<(), Box> { + let mut handle = UPLOAD_HANDLER.lock()?; + match handle.take() { + // Don't start backup handler if it's already running + Some(handle) if !handle.is_finished() => (), + _ => { + *handle = Some(RUNTIME.spawn(backup_upload_handler()?)); + } + } + + Ok(()) + } + + pub fn stop_backup_handler_sync(promise_id: u32) { + RUNTIME.spawn(async move { + let result = stop_backup_handler().await; + handle_void_result_as_callback(result, promise_id); + }); + } + pub fn create_backup_sync( backup_id: String, backup_secret: String, @@ -47,6 +74,77 @@ } } +lazy_static! { + static ref UPLOAD_HANDLER: Arc>>> = + Arc::new(Mutex::new(None)); +} + +async fn stop_backup_handler() -> Result<(), Box> { + let Some(handler) = UPLOAD_HANDLER.lock()?.take() else { + return Ok(()); + }; + if handler.is_finished() { + return Ok(()); + } + + handler.abort(); + Ok(()) +} + +fn backup_upload_handler( +) -> Result, Box> { + let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; + let user_identity = get_user_identity_from_secure_store()?; + + Ok(async move { + loop { + let (tx, rx) = match backup_client.upload_logs(&user_identity).await { + Ok(ws) => ws, + Err(err) => { + println!("Backup handler error: '{err:?}'"); + tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; + continue; + } + }; + + let mut _tx = Box::pin(tx); + let mut rx = Box::pin(rx); + + let err = tokio::select! { + Err(err) = backup_data_sender() => err, + Err(err) = backup_confirmation_receiver(&mut rx) => err, + }; + + println!("Backup handler error: '{err:?}'"); + + tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; + } + }) +} + +async fn backup_data_sender() -> Result { + loop { + let () = future::pending().await; + } +} + +async fn backup_confirmation_receiver( + rx: &mut Pin>>>, +) -> Result { + while let Some(_) = rx.next().await.transpose()? {} + + Err(BackupHandlerError::WSClosed) +} + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +enum BackupHandlerError { + BackupError(BackupError), + BackupWSError(WSError), + WSClosed, +} + pub async fn create_backup( backup_id: String, backup_secret: String, diff --git a/native/native_rust_library/src/constants.rs b/native/native_rust_library/src/constants.rs --- a/native/native_rust_library/src/constants.rs +++ b/native/native_rust_library/src/constants.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + pub mod aes { pub const KEY_SIZE: usize = 32; // bytes pub const IV_LENGTH: usize = 12; // bytes - unique Initialization Vector (nonce) @@ -10,3 +12,6 @@ pub const USER_ID: &str = "userID"; pub const DEVICE_ID: &str = "deviceID"; } + +pub const BACKUP_SERVICE_CONNECTION_RETRY_DELAY: Duration = + Duration::from_secs(5); diff --git a/native/native_rust_library/src/lib.rs b/native/native_rust_library/src/lib.rs --- a/native/native_rust_library/src/lib.rs +++ b/native/native_rust_library/src/lib.rs @@ -227,6 +227,11 @@ // Backup extern "Rust" { + #[cxx_name = "startBackupHandler"] + fn start_backup_handler() -> Result<()>; + #[cxx_name = "stopBackupHandler"] + fn stop_backup_handler_sync(promise_id: u32); + #[cxx_name = "createBackup"] fn create_backup_sync( backup_id: String, diff --git a/native/schema/CommCoreModuleSchema.js b/native/schema/CommCoreModuleSchema.js --- a/native/schema/CommCoreModuleSchema.js +++ b/native/schema/CommCoreModuleSchema.js @@ -122,7 +122,9 @@ +getCommServicesAuthMetadata: () => Promise; +setCommServicesAccessToken: (accessToken: string) => Promise; +clearCommServicesAccessToken: () => Promise; - +createNewBackup: (backupSecret: string, userData: string) => Promise; + +startBackupHandler: () => void; + +stopBackupHandler: () => Promise; + +createNewBackup: (backupSecret: string) => Promise; +restoreBackup: (backupSecret: string) => Promise; } diff --git a/shared/backup_client/src/lib.rs b/shared/backup_client/src/lib.rs --- a/shared/backup_client/src/lib.rs +++ b/shared/backup_client/src/lib.rs @@ -3,9 +3,8 @@ DownloadLogsRequest, LatestBackupIDResponse, LogWSRequest, LogWSResponse, UploadLogRequest, }; -pub use futures_util::{SinkExt, StreamExt, TryStreamExt}; +pub use futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt}; -use futures_util::{Sink, Stream}; use hex::ToHex; use reqwest::{ header::InvalidHeaderValue,