diff --git a/native/cpp/CommonCpp/NativeModules/CommCoreModule.h b/native/cpp/CommonCpp/NativeModules/CommCoreModule.h --- a/native/cpp/CommonCpp/NativeModules/CommCoreModule.h +++ b/native/cpp/CommonCpp/NativeModules/CommCoreModule.h @@ -123,6 +123,8 @@ jsi::Runtime &rt, jsi::String accessToken) override; virtual jsi::Value clearCommServicesAccessToken(jsi::Runtime &rt) override; + virtual void startBackupHandler(jsi::Runtime &rt) override; + virtual void stopBackupHandler(jsi::Runtime &rt) override; virtual jsi::Value createNewBackup( jsi::Runtime &rt, jsi::String backupSecret, diff --git a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp --- a/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp +++ b/native/cpp/CommonCpp/NativeModules/CommCoreModule.cpp @@ -1242,6 +1242,22 @@ }); } +void CommCoreModule::startBackupHandler(jsi::Runtime &rt) { + try { + ::startBackupHandler(); + } catch (const std::exception &e) { + throw jsi::JSError(rt, e.what()); + } +} + +void CommCoreModule::stopBackupHandler(jsi::Runtime &rt) { + try { + ::stopBackupHandler(); + } catch (const std::exception &e) { + throw jsi::JSError(rt, e.what()); + } +} + jsi::Value CommCoreModule::createNewBackup( jsi::Runtime &rt, jsi::String backupSecret, diff --git a/native/cpp/CommonCpp/_generated/commJSI-generated.cpp b/native/cpp/CommonCpp/_generated/commJSI-generated.cpp --- a/native/cpp/CommonCpp/_generated/commJSI-generated.cpp +++ b/native/cpp/CommonCpp/_generated/commJSI-generated.cpp @@ -140,6 +140,14 @@ static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_clearCommServicesAccessToken(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { return static_cast(&turboModule)->clearCommServicesAccessToken(rt); } +static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_startBackupHandler(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { + static_cast(&turboModule)->startBackupHandler(rt); + return jsi::Value::undefined(); +} +static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_stopBackupHandler(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { + static_cast(&turboModule)->stopBackupHandler(rt); + return jsi::Value::undefined(); +} static jsi::Value __hostFunction_CommCoreModuleSchemaCxxSpecJSI_createNewBackup(jsi::Runtime &rt, TurboModule &turboModule, const jsi::Value* args, size_t count) { return static_cast(&turboModule)->createNewBackup(rt, args[0].asString(rt), args[1].asString(rt)); } @@ -190,6 +198,8 @@ methodMap_["getCommServicesAuthMetadata"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_getCommServicesAuthMetadata}; methodMap_["setCommServicesAccessToken"] = MethodMetadata {1, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_setCommServicesAccessToken}; methodMap_["clearCommServicesAccessToken"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_clearCommServicesAccessToken}; + methodMap_["startBackupHandler"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_startBackupHandler}; + methodMap_["stopBackupHandler"] = MethodMetadata {0, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_stopBackupHandler}; methodMap_["createNewBackup"] = MethodMetadata {2, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_createNewBackup}; methodMap_["restoreBackup"] = MethodMetadata {1, __hostFunction_CommCoreModuleSchemaCxxSpecJSI_restoreBackup}; } diff --git a/native/cpp/CommonCpp/_generated/commJSI.h b/native/cpp/CommonCpp/_generated/commJSI.h --- a/native/cpp/CommonCpp/_generated/commJSI.h +++ b/native/cpp/CommonCpp/_generated/commJSI.h @@ -61,6 +61,8 @@ virtual jsi::Value getCommServicesAuthMetadata(jsi::Runtime &rt) = 0; virtual jsi::Value setCommServicesAccessToken(jsi::Runtime &rt, jsi::String accessToken) = 0; virtual jsi::Value clearCommServicesAccessToken(jsi::Runtime &rt) = 0; + virtual void startBackupHandler(jsi::Runtime &rt) = 0; + virtual void stopBackupHandler(jsi::Runtime &rt) = 0; virtual jsi::Value createNewBackup(jsi::Runtime &rt, jsi::String backupSecret, jsi::String userData) = 0; virtual jsi::Value restoreBackup(jsi::Runtime &rt, jsi::String backupSecret) = 0; @@ -412,6 +414,22 @@ return bridging::callFromJs( rt, &T::clearCommServicesAccessToken, jsInvoker_, instance_); } + void startBackupHandler(jsi::Runtime &rt) override { + static_assert( + bridging::getParameterCount(&T::startBackupHandler) == 1, + "Expected startBackupHandler(...) to have 1 parameters"); + + return bridging::callFromJs( + rt, &T::startBackupHandler, jsInvoker_, instance_); + } + void stopBackupHandler(jsi::Runtime &rt) override { + static_assert( + bridging::getParameterCount(&T::stopBackupHandler) == 1, + "Expected stopBackupHandler(...) to have 1 parameters"); + + return bridging::callFromJs( + rt, &T::stopBackupHandler, jsInvoker_, instance_); + } jsi::Value createNewBackup(jsi::Runtime &rt, jsi::String backupSecret, jsi::String userData) override { static_assert( bridging::getParameterCount(&T::createNewBackup) == 3, diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs --- a/native/native_rust_library/src/backup.rs +++ b/native/native_rust_library/src/backup.rs @@ -1,9 +1,13 @@ +mod upload_handler; + +use std::error::Error; + use crate::argon2_tools::{compute_backup_key, compute_backup_key_str}; use crate::constants::{aes, secure_store}; use crate::ffi::secure_store_get; -use crate::handle_string_result_as_callback; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; +use crate::{handle_string_result_as_callback, handle_void_result_as_callback}; use backup_client::{ BackupClient, BackupData, BackupDescriptor, DownloadLogsRequest, LatestBackupIDResponse, LogUploadConfirmation, LogWSResponse, RequestedData, @@ -11,13 +15,12 @@ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::error::Error; pub mod ffi { - use crate::handle_void_result_as_callback; - use super::*; + pub use upload_handler::ffi::*; + pub fn create_backup_sync( backup_id: String, backup_secret: String, diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs new file mode 100644 --- /dev/null +++ b/native/native_rust_library/src/backup/upload_handler.rs @@ -0,0 +1,105 @@ +use super::get_user_identity_from_secure_store; +use crate::constants::BACKUP_SERVICE_CONNECTION_RETRY_DELAY; +use crate::BACKUP_SOCKET_ADDR; +use crate::RUNTIME; +use backup_client::{ + BackupClient, Error as BackupError, LogUploadConfirmation, Stream, StreamExt, + WSError, +}; +use lazy_static::lazy_static; +use std::convert::Infallible; +use std::error::Error; +use std::future::{self, Future}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use tokio::task::JoinHandle; + +lazy_static! { + pub static ref UPLOAD_HANDLER: Arc>>> = + Arc::new(Mutex::new(None)); +} + +pub mod ffi { + use super::*; + + pub fn start_backup_handler() -> Result<(), Box> { + let mut handle = UPLOAD_HANDLER.lock()?; + match handle.take() { + // Don't start backup handler if it's already running + Some(handle) if !handle.is_finished() => (), + _ => { + *handle = Some(RUNTIME.spawn(super::start()?)); + } + } + + Ok(()) + } + + pub fn stop_backup_handler() -> Result<(), Box> { + let Some(handler) = UPLOAD_HANDLER.lock()?.take() else { + return Ok(()); + }; + if handler.is_finished() { + return Ok(()); + } + + handler.abort(); + Ok(()) + } +} + +pub fn start() -> Result, Box> { + let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; + let user_identity = get_user_identity_from_secure_store()?; + + Ok(async move { + loop { + let (tx, rx) = match backup_client.upload_logs(&user_identity).await { + Ok(ws) => ws, + Err(err) => { + println!( + "Backup handler error when estabilishing connection: '{err:?}'" + ); + tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; + continue; + } + }; + + let mut _tx = Box::pin(tx); + let mut rx = Box::pin(rx); + + let err = tokio::select! { + Err(err) = watch_and_upload_files() => err, + Err(err) = delete_confirmed_logs(&mut rx) => err, + }; + + println!("Backup handler error: '{err:?}'"); + + tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; + println!("Retrying backup log upload"); + } + }) +} + +async fn watch_and_upload_files() -> Result { + loop { + let () = future::pending().await; + } +} + +async fn delete_confirmed_logs( + rx: &mut Pin>>>, +) -> Result { + while let Some(_) = rx.next().await.transpose()? {} + + Err(BackupHandlerError::WSClosed) +} + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +enum BackupHandlerError { + BackupError(BackupError), + BackupWSError(WSError), + WSClosed, +} diff --git a/native/native_rust_library/src/constants.rs b/native/native_rust_library/src/constants.rs --- a/native/native_rust_library/src/constants.rs +++ b/native/native_rust_library/src/constants.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + pub mod aes { pub const KEY_SIZE: usize = 32; // bytes pub const IV_LENGTH: usize = 12; // bytes - unique Initialization Vector (nonce) @@ -10,3 +12,6 @@ pub const USER_ID: &str = "userID"; pub const DEVICE_ID: &str = "deviceID"; } + +pub const BACKUP_SERVICE_CONNECTION_RETRY_DELAY: Duration = + Duration::from_secs(5); diff --git a/native/native_rust_library/src/lib.rs b/native/native_rust_library/src/lib.rs --- a/native/native_rust_library/src/lib.rs +++ b/native/native_rust_library/src/lib.rs @@ -240,6 +240,11 @@ // Backup extern "Rust" { + #[cxx_name = "startBackupHandler"] + fn start_backup_handler() -> Result<()>; + #[cxx_name = "stopBackupHandler"] + fn stop_backup_handler() -> Result<()>; + #[cxx_name = "createBackup"] fn create_backup_sync( backup_id: String, diff --git a/native/schema/CommCoreModuleSchema.js b/native/schema/CommCoreModuleSchema.js --- a/native/schema/CommCoreModuleSchema.js +++ b/native/schema/CommCoreModuleSchema.js @@ -122,6 +122,8 @@ +getCommServicesAuthMetadata: () => Promise; +setCommServicesAccessToken: (accessToken: string) => Promise; +clearCommServicesAccessToken: () => Promise; + +startBackupHandler: () => void; + +stopBackupHandler: () => void; +createNewBackup: (backupSecret: string, userData: string) => Promise; +restoreBackup: (backupSecret: string) => Promise; } diff --git a/shared/backup_client/src/lib.rs b/shared/backup_client/src/lib.rs --- a/shared/backup_client/src/lib.rs +++ b/shared/backup_client/src/lib.rs @@ -3,9 +3,8 @@ DownloadLogsRequest, LatestBackupIDResponse, LogWSRequest, LogWSResponse, UploadLogRequest, }; -pub use futures_util::{SinkExt, StreamExt, TryStreamExt}; +pub use futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt}; -use futures_util::{Sink, Stream}; use hex::ToHex; use reqwest::{ header::InvalidHeaderValue,