diff --git a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h --- a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h +++ b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h @@ -6,7 +6,7 @@ namespace comm { class BackupOperationsExecutor { public: - static void createMainCompaction(std::string backupID); + static void createMainCompaction(std::string backupID, size_t futureID); static void restoreFromMainCompaction( std::string mainCompactionPath, std::string mainCompactionEncryptionKey); diff --git a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp --- a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp +++ b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp @@ -3,15 +3,18 @@ #include "GlobalDBSingleton.h" #include "Logger.h" #include "WorkerThread.h" +#include "lib.rs.h" namespace comm { -void BackupOperationsExecutor::createMainCompaction(std::string backupID) { - taskType job = [backupID]() { +void BackupOperationsExecutor::createMainCompaction( + std::string backupID, + size_t futureID) { + taskType job = [backupID, futureID]() { try { DatabaseManager::getQueryExecutor().createMainCompaction(backupID); + ::resolveUnitFuture(futureID); } catch (const std::exception &e) { - // TODO: Inform Rust networking about main - // compaction creation failure + ::rejectFuture(futureID, rust::String(e.what())); Logger::log( "Main compaction creation failed. Details: " + std::string(e.what())); } diff --git a/native/native_rust_library/RustBackupExecutor.h b/native/native_rust_library/RustBackupExecutor.h --- a/native/native_rust_library/RustBackupExecutor.h +++ b/native/native_rust_library/RustBackupExecutor.h @@ -9,7 +9,7 @@ rust::String getBackupLogFilePath(rust::Str backupID, rust::Str logID, bool isAttachments); rust::String getBackupUserKeysFilePath(rust::Str backupID); -void createMainCompaction(rust::String backupID); +void createMainCompaction(rust::Str backupID, size_t futureID); void restoreFromMainCompaction( rust::String mainCompactionPath, rust::String mainCompactionEncryptionKey); diff --git a/native/native_rust_library/RustBackupExecutor.cpp b/native/native_rust_library/RustBackupExecutor.cpp --- a/native/native_rust_library/RustBackupExecutor.cpp +++ b/native/native_rust_library/RustBackupExecutor.cpp @@ -26,8 +26,9 @@ PlatformSpecificTools::getBackupUserKeysFilePath(std::string(backupID))); } -void createMainCompaction(rust::String backupID) { - BackupOperationsExecutor::createMainCompaction(std::string(backupID)); +void createMainCompaction(rust::Str backupID, size_t futureID) { + BackupOperationsExecutor::createMainCompaction( + std::string(backupID), futureID); } void restoreFromMainCompaction( diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs --- a/native/native_rust_library/src/backup.rs +++ b/native/native_rust_library/src/backup.rs @@ -1,3 +1,4 @@ +mod compaction_upload_promises; mod file_info; mod upload_handler; diff --git a/native/native_rust_library/src/backup/compaction_upload_promises.rs b/native/native_rust_library/src/backup/compaction_upload_promises.rs new file mode 100644 --- /dev/null +++ b/native/native_rust_library/src/backup/compaction_upload_promises.rs @@ -0,0 +1,24 @@ +use crate::handle_void_result_as_callback; +use lazy_static::lazy_static; +use std::{collections::HashMap, sync::Mutex}; + +lazy_static! { + static ref COMPACTION_UPLOAD_PROMISES: Mutex> = + Default::default(); +} + +pub fn insert(backup_id: String, promise_id: u32) { + if let Ok(mut backups_to_promises) = COMPACTION_UPLOAD_PROMISES.lock() { + backups_to_promises.insert(backup_id, promise_id); + }; +} + +pub fn resolve(backup_id: &str, result: Result<(), String>) { + let Ok(mut backups_to_promises) = COMPACTION_UPLOAD_PROMISES.lock() else { + return; + }; + let Some(promise_id) = backups_to_promises.remove(backup_id) else { + return; + }; + handle_void_result_as_callback(result, promise_id); +} diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs --- a/native/native_rust_library/src/backup/upload_handler.rs +++ b/native/native_rust_library/src/backup/upload_handler.rs @@ -1,5 +1,6 @@ use super::file_info::BackupFileInfo; use super::get_user_identity_from_secure_store; +use crate::backup::compaction_upload_promises; use crate::constants::BACKUP_SERVICE_CONNECTION_RETRY_DELAY; use crate::ffi::{ get_backup_directory_path, get_backup_file_path, get_backup_log_file_path, @@ -185,7 +186,7 @@ Err(BackupHandlerError::WSClosed) } -mod compaction { +pub mod compaction { use super::*; pub async fn upload_files( @@ -217,6 +218,7 @@ .upload_backup(user_identity, backup_data) .await?; + compaction_upload_promises::resolve(&backup_id, Ok(())); tokio::spawn(cleanup_files(backup_id)); Ok(()) diff --git a/native/native_rust_library/src/future_manager.rs b/native/native_rust_library/src/future_manager.rs new file mode 100644 --- /dev/null +++ b/native/native_rust_library/src/future_manager.rs @@ -0,0 +1,77 @@ +use crate::RUNTIME; +use lazy_static::lazy_static; +use std::any::Any; +use std::convert::Infallible; +use std::{collections::HashMap, future::Future}; +use tokio::sync::oneshot; +use tokio::sync::Mutex; + +lazy_static! { + static ref FUTURE_MANAGER: Mutex = Default::default(); +} + +pub mod ffi { + use super::*; + + pub fn resolve_unit_future(future_id: usize) { + RUNTIME.spawn(async move { + super::resolve_future(future_id, Ok(())).await; + }); + } + + pub fn reject_future(future_id: usize, error: String) { + RUNTIME.spawn(async move { + super::resolve_future::(future_id, Err(error)).await; + }); + } +} + +#[derive(Debug, Default)] +pub struct FutureManager { + promises: + HashMap, String>>>, + next_id: usize, +} + +pub async fn new_future( +) -> (usize, impl Future>) { + let mut manager = FUTURE_MANAGER.lock().await; + + let id = manager.next_id; + manager.next_id += 1; + + let (tx, rx) = oneshot::channel(); + manager.promises.insert(id, tx); + + let future = async move { + match rx.await { + Ok(Ok(any_value)) => { + if let Ok(boxed_value) = any_value.downcast() { + Ok(*boxed_value) + } else { + Err("Type mismatch".to_string()) + } + } + Ok(Err(err)) => Err(err), + Err(_) => Err("Promise sender dropped".to_string()), + } + }; + + (id, future) +} + +pub async fn resolve_future( + id: usize, + value: Result, +) { + let mut manager = FUTURE_MANAGER.lock().await; + + let tx = match manager.promises.remove(&id) { + Some(tx) => tx, + None => return, + }; + + let boxed_value = + value.map(|value| Box::new(value) as Box); + let _ = tx.send(boxed_value); +} diff --git a/native/native_rust_library/src/lib.rs b/native/native_rust_library/src/lib.rs --- a/native/native_rust_library/src/lib.rs +++ b/native/native_rust_library/src/lib.rs @@ -3,6 +3,7 @@ use comm_opaque2::grpc::opaque_error_to_grpc_status as handle_error; use exact_user_search::find_user_id_for_wallet_address; use ffi::{bool_callback, string_callback, void_callback}; +use future_manager::ffi::*; use grpc_clients::identity::protos::auth::{ GetDeviceListRequest, UpdateDeviceListRequest, }; @@ -34,6 +35,7 @@ mod backup; mod constants; mod exact_user_search; +mod future_manager; mod wallet_registration; use argon2_tools::compute_backup_key_str; @@ -363,7 +365,7 @@ fn get_backup_user_keys_file_path(backup_id: &str) -> Result; #[cxx_name = "createMainCompaction"] - fn create_main_compaction(backup_id: String) -> Result<()>; + fn create_main_compaction(backup_id: &str, future_id: usize); #[allow(unused)] #[cxx_name = "restoreFromMainCompaction"] @@ -376,6 +378,15 @@ #[cxx_name = "restoreFromBackupLog"] fn restore_from_backup_log(backup_log: Vec) -> Result<()>; } + + // Future handling from C++ + extern "Rust" { + #[cxx_name = "resolveUnitFuture"] + fn resolve_unit_future(future_id: usize); + + #[cxx_name = "rejectFuture"] + fn reject_future(future_id: usize, error: String); + } } fn handle_string_result_as_callback(