diff --git a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp index c199adf7b..efc0a472d 100644 --- a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp +++ b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.cpp @@ -1,54 +1,57 @@ #include "BackupOperationsExecutor.h" #include "DatabaseManager.h" #include "GlobalDBSingleton.h" #include "Logger.h" #include "WorkerThread.h" +#include "lib.rs.h" namespace comm { -void BackupOperationsExecutor::createMainCompaction(std::string backupID) { - taskType job = [backupID]() { +void BackupOperationsExecutor::createMainCompaction( + std::string backupID, + size_t futureID) { + taskType job = [backupID, futureID]() { try { DatabaseManager::getQueryExecutor().createMainCompaction(backupID); + ::resolveUnitFuture(futureID); } catch (const std::exception &e) { - // TODO: Inform Rust networking about main - // compaction creation failure + ::rejectFuture(futureID, rust::String(e.what())); Logger::log( "Main compaction creation failed. Details: " + std::string(e.what())); } }; GlobalDBSingleton::instance.scheduleOrRunCancellable(job); } void BackupOperationsExecutor::restoreFromMainCompaction( std::string mainCompactionPath, std::string mainCompactionEncryptionKey) { taskType job = [mainCompactionPath, mainCompactionEncryptionKey]() { try { DatabaseManager::getQueryExecutor().restoreFromMainCompaction( mainCompactionPath, mainCompactionEncryptionKey); } catch (const std::exception &e) { // TODO: Inform Rust networking about failure // of restoration from main compaction. Logger::log( "Restore from main compaction failed. Details: " + std::string(e.what())); } }; GlobalDBSingleton::instance.scheduleOrRunCancellable(job); } void BackupOperationsExecutor::restoreFromBackupLog( const std::vector &backupLog) { taskType job = [backupLog]() { try { DatabaseManager::getQueryExecutor().restoreFromBackupLog(backupLog); } catch (const std::exception &e) { // TODO: Inform Rust networking about failure // of restoration from backup log. Logger::log( "Restore from backup log failed. Details: " + std::string(e.what())); } }; GlobalDBSingleton::instance.scheduleOrRunCancellable(job); } } // namespace comm diff --git a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h index 7acb24d0f..d9e50075c 100644 --- a/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h +++ b/native/cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h @@ -1,15 +1,15 @@ #pragma once #include #include namespace comm { class BackupOperationsExecutor { public: - static void createMainCompaction(std::string backupID); + static void createMainCompaction(std::string backupID, size_t futureID); static void restoreFromMainCompaction( std::string mainCompactionPath, std::string mainCompactionEncryptionKey); static void restoreFromBackupLog(const std::vector &backupLog); }; } // namespace comm diff --git a/native/native_rust_library/RustBackupExecutor.cpp b/native/native_rust_library/RustBackupExecutor.cpp index 9721e8a35..8c9d69df0 100644 --- a/native/native_rust_library/RustBackupExecutor.cpp +++ b/native/native_rust_library/RustBackupExecutor.cpp @@ -1,45 +1,46 @@ #include "RustBackupExecutor.h" #include "../cpp/CommonCpp/NativeModules/PersistentStorageUtilities/BackupOperationsUtilities/BackupOperationsExecutor.h" #include "../cpp/CommonCpp/Tools/PlatformSpecificTools.h" #include namespace comm { rust::String getBackupDirectoryPath() { return rust::String(PlatformSpecificTools::getBackupDirectoryPath()); } rust::String getBackupFilePath(rust::Str backupID, bool isAttachments) { return rust::String(PlatformSpecificTools::getBackupFilePath( std::string(backupID), isAttachments)); } rust::String getBackupLogFilePath(rust::Str backupID, rust::Str logID, bool isAttachments) { return rust::String(PlatformSpecificTools::getBackupLogFilePath( std::string(backupID), std::string(logID), isAttachments)); } rust::String getBackupUserKeysFilePath(rust::Str backupID) { return rust::String( PlatformSpecificTools::getBackupUserKeysFilePath(std::string(backupID))); } -void createMainCompaction(rust::String backupID) { - BackupOperationsExecutor::createMainCompaction(std::string(backupID)); +void createMainCompaction(rust::Str backupID, size_t futureID) { + BackupOperationsExecutor::createMainCompaction( + std::string(backupID), futureID); } void restoreFromMainCompaction( rust::String mainCompactionPath, rust::String mainCompactionEncryptionKey) { BackupOperationsExecutor::restoreFromMainCompaction( std::string(mainCompactionPath), std::string(mainCompactionEncryptionKey)); } void restoreFromBackupLog(rust::Vec backupLog) { BackupOperationsExecutor::restoreFromBackupLog( std::move(std::vector(backupLog.begin(), backupLog.end()))); } } // namespace comm diff --git a/native/native_rust_library/RustBackupExecutor.h b/native/native_rust_library/RustBackupExecutor.h index 0e340a3bf..53f156af9 100644 --- a/native/native_rust_library/RustBackupExecutor.h +++ b/native/native_rust_library/RustBackupExecutor.h @@ -1,18 +1,18 @@ #pragma once #include "cxx.h" namespace comm { rust::String getBackupDirectoryPath(); rust::String getBackupFilePath(rust::Str backupID, bool isAttachments); rust::String getBackupLogFilePath(rust::Str backupID, rust::Str logID, bool isAttachments); rust::String getBackupUserKeysFilePath(rust::Str backupID); -void createMainCompaction(rust::String backupID); +void createMainCompaction(rust::Str backupID, size_t futureID); void restoreFromMainCompaction( rust::String mainCompactionPath, rust::String mainCompactionEncryptionKey); void restoreFromBackupLog(rust::Vec backupLog); } // namespace comm diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs index 0ad5ac8d3..10e5d4944 100644 --- a/native/native_rust_library/src/backup.rs +++ b/native/native_rust_library/src/backup.rs @@ -1,252 +1,253 @@ +mod compaction_upload_promises; mod file_info; mod upload_handler; use crate::argon2_tools::{compute_backup_key, compute_backup_key_str}; use crate::constants::{aes, secure_store}; use crate::ffi::secure_store_get; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; use crate::{handle_string_result_as_callback, handle_void_result_as_callback}; use backup_client::{ BackupClient, BackupData, BackupDescriptor, DownloadLogsRequest, LatestBackupIDResponse, LogUploadConfirmation, LogWSResponse, RequestedData, SinkExt, StreamExt, UploadLogRequest, UserIdentity, }; use serde::{Deserialize, Serialize}; use serde_json::json; use std::error::Error; pub mod ffi { use super::*; pub use upload_handler::ffi::*; pub fn create_backup_sync( backup_id: String, backup_secret: String, pickle_key: String, pickled_account: String, user_data: String, promise_id: u32, ) { RUNTIME.spawn(async move { let result = create_backup( backup_id, backup_secret, pickle_key, pickled_account, user_data, ) .await; handle_void_result_as_callback(result, promise_id); }); } pub fn restore_backup_sync(backup_secret: String, promise_id: u32) { RUNTIME.spawn(async move { let result = restore_backup(backup_secret).await; handle_string_result_as_callback(result, promise_id); }); } } pub async fn create_backup( backup_id: String, backup_secret: String, pickle_key: String, pickled_account: String, user_data: String, ) -> Result<(), Box> { let mut backup_key = compute_backup_key(backup_secret.as_bytes(), backup_id.as_bytes())?; let mut user_data = user_data.into_bytes(); let mut backup_data_key = [0; aes::KEY_SIZE]; crate::ffi::generate_key(&mut backup_data_key)?; let encrypted_user_data = encrypt(&mut backup_data_key, &mut user_data)?; let user_keys = UserKeys { backup_data_key, pickle_key, pickled_account, }; let encrypted_user_keys = user_keys.encrypt(&mut backup_key)?; let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; let backup_data = BackupData { backup_id: backup_id.clone(), user_data: encrypted_user_data, user_keys: encrypted_user_keys, attachments: Vec::new(), }; backup_client .upload_backup(&user_identity, backup_data) .await?; let (tx, rx) = backup_client.upload_logs(&user_identity).await?; tokio::pin!(tx); tokio::pin!(rx); let log_data = UploadLogRequest { backup_id: backup_id.clone(), log_id: 1, content: (1..100).collect(), attachments: None, }; tx.send(log_data.clone()).await?; match rx.next().await { Some(Ok(LogUploadConfirmation { backup_id: response_backup_id, log_id: 1, })) if backup_id == response_backup_id => { // Correctly uploaded } response => { return Err(Box::new(InvalidWSLogResponse(format!("{response:?}")))) } }; Ok(()) } pub async fn restore_backup( backup_secret: String, ) -> Result> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; let latest_backup_descriptor = BackupDescriptor::Latest { username: user_identity.user_id.clone(), }; let backup_id_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupID) .await?; let LatestBackupIDResponse { backup_id } = serde_json::from_slice(&backup_id_response)?; let mut backup_key = compute_backup_key_str(&backup_secret, &backup_id)?; let mut encrypted_user_keys = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys) .await?; let mut user_keys = UserKeys::from_encrypted(&mut encrypted_user_keys, &mut backup_key)?; let backup_data_descriptor = BackupDescriptor::BackupID { backup_id: backup_id.clone(), user_identity: user_identity.clone(), }; let mut encrypted_user_data = backup_client .download_backup_data(&backup_data_descriptor, RequestedData::UserData) .await?; let user_data = decrypt(&mut user_keys.backup_data_key, &mut encrypted_user_data)?; let user_data: serde_json::Value = serde_json::from_slice(&user_data)?; let (tx, rx) = backup_client.download_logs(&user_identity).await?; tokio::pin!(tx); tokio::pin!(rx); tx.send(DownloadLogsRequest { backup_id: backup_id.clone(), from_id: None, }) .await?; match rx.next().await { Some(Ok(LogWSResponse::LogDownload { log_id: 1, content, attachments: None, })) if content == (1..100).collect::>() => {} response => { return Err(Box::new(InvalidWSLogResponse(format!("{response:?}")))) } }; match rx.next().await { Some(Ok(LogWSResponse::LogDownloadFinished { last_log_id: None })) => {} response => { return Err(Box::new(InvalidWSLogResponse(format!("{response:?}")))) } }; Ok( json!({ "userData": user_data, "pickleKey": user_keys.pickle_key, "pickledAccount": user_keys.pickled_account, }) .to_string(), ) } fn get_user_identity_from_secure_store() -> Result { Ok(UserIdentity { user_id: secure_store_get(secure_store::USER_ID)?, access_token: secure_store_get(secure_store::COMM_SERVICES_ACCESS_TOKEN)?, device_id: secure_store_get(secure_store::DEVICE_ID)?, }) } #[derive(Debug, Serialize, Deserialize)] struct UserKeys { backup_data_key: [u8; 32], pickle_key: String, pickled_account: String, } impl UserKeys { fn encrypt(&self, backup_key: &mut [u8]) -> Result, Box> { let mut json = serde_json::to_vec(self)?; encrypt(backup_key, &mut json) } fn from_encrypted( data: &mut [u8], backup_key: &mut [u8], ) -> Result> { let decrypted = decrypt(backup_key, data)?; Ok(serde_json::from_slice(&decrypted)?) } } fn encrypt(key: &mut [u8], data: &mut [u8]) -> Result, Box> { let encrypted_len = data.len() + aes::IV_LENGTH + aes::TAG_LENGTH; let mut encrypted = vec![0; encrypted_len]; crate::ffi::encrypt(key, data, &mut encrypted)?; Ok(encrypted) } fn decrypt(key: &mut [u8], data: &mut [u8]) -> Result, Box> { let decrypted_len = data.len() - aes::IV_LENGTH - aes::TAG_LENGTH; let mut decrypted = vec![0; decrypted_len]; crate::ffi::decrypt(key, data, &mut decrypted)?; Ok(decrypted) } #[derive(Debug, derive_more::Display)] struct InvalidWSLogResponse(String); impl Error for InvalidWSLogResponse {} diff --git a/native/native_rust_library/src/backup/compaction_upload_promises.rs b/native/native_rust_library/src/backup/compaction_upload_promises.rs new file mode 100644 index 000000000..6f84d4855 --- /dev/null +++ b/native/native_rust_library/src/backup/compaction_upload_promises.rs @@ -0,0 +1,24 @@ +use crate::handle_void_result_as_callback; +use lazy_static::lazy_static; +use std::{collections::HashMap, sync::Mutex}; + +lazy_static! { + static ref COMPACTION_UPLOAD_PROMISES: Mutex> = + Default::default(); +} + +pub fn insert(backup_id: String, promise_id: u32) { + if let Ok(mut backups_to_promises) = COMPACTION_UPLOAD_PROMISES.lock() { + backups_to_promises.insert(backup_id, promise_id); + }; +} + +pub fn resolve(backup_id: &str, result: Result<(), String>) { + let Ok(mut backups_to_promises) = COMPACTION_UPLOAD_PROMISES.lock() else { + return; + }; + let Some(promise_id) = backups_to_promises.remove(backup_id) else { + return; + }; + handle_void_result_as_callback(result, promise_id); +} diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs index 243f138cd..82ad07f7d 100644 --- a/native/native_rust_library/src/backup/upload_handler.rs +++ b/native/native_rust_library/src/backup/upload_handler.rs @@ -1,317 +1,319 @@ use super::file_info::BackupFileInfo; use super::get_user_identity_from_secure_store; +use crate::backup::compaction_upload_promises; use crate::constants::BACKUP_SERVICE_CONNECTION_RETRY_DELAY; use crate::ffi::{ get_backup_directory_path, get_backup_file_path, get_backup_log_file_path, get_backup_user_keys_file_path, }; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; use backup_client::UserIdentity; use backup_client::{ BackupClient, Error as BackupError, LogUploadConfirmation, Stream, StreamExt, WSError, }; use backup_client::{BackupData, Sink, UploadLogRequest}; use lazy_static::lazy_static; use std::collections::HashSet; use std::convert::Infallible; use std::error::Error; use std::future::Future; use std::io::BufRead; use std::io::ErrorKind; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::sync::Notify; use tokio::task::JoinHandle; lazy_static! { pub static ref UPLOAD_HANDLER: Arc>>> = Arc::new(Mutex::new(None)); static ref TRIGGER_BACKUP_FILE_UPLOAD: Arc = Arc::new(Notify::new()); static ref BACKUP_FOLDER_PATH: PathBuf = PathBuf::from( get_backup_directory_path().expect("Getting backup directory path failed") ); } pub mod ffi { use super::*; pub fn start_backup_handler() -> Result<(), Box> { let mut handle = UPLOAD_HANDLER.lock()?; match handle.take() { // Don't start backup handler if it's already running Some(handle) if !handle.is_finished() => (), _ => { *handle = Some(RUNTIME.spawn(super::start()?)); } } Ok(()) } pub fn stop_backup_handler() -> Result<(), Box> { let Some(handler) = UPLOAD_HANDLER.lock()?.take() else { return Ok(()); }; if handler.is_finished() { return Ok(()); } handler.abort(); Ok(()) } pub fn trigger_backup_file_upload() { TRIGGER_BACKUP_FILE_UPLOAD.notify_one(); } } pub fn start() -> Result, Box> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; Ok(async move { loop { let (tx, rx) = match backup_client.upload_logs(&user_identity).await { Ok(ws) => ws, Err(err) => { println!( "Backup handler error when estabilishing connection: '{err:?}'" ); tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; continue; } }; let mut tx = Box::pin(tx); let mut rx = Box::pin(rx); let logs_waiting_for_confirmation = Mutex::new(HashSet::::new()); loop { let err = tokio::select! { Err(err) = watch_and_upload_files(&backup_client, &user_identity, &mut tx, &logs_waiting_for_confirmation) => err, Err(err) = delete_confirmed_logs(&mut rx, &logs_waiting_for_confirmation) => err, }; println!("Backup handler error: '{err:?}'"); match err { BackupHandlerError::BackupError(_) | BackupHandlerError::BackupWSError(_) | BackupHandlerError::WSClosed | BackupHandlerError::LockError => break, BackupHandlerError::IoError(_) | BackupHandlerError::CxxException(_) => continue, } } tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; println!("Retrying backup log upload"); } }) } async fn watch_and_upload_files( backup_client: &BackupClient, user_identity: &UserIdentity, tx: &mut Pin>>, logs_waiting_for_confirmation: &Mutex>, ) -> Result { loop { let mut file_stream = match tokio::fs::read_dir(&*BACKUP_FOLDER_PATH).await { Ok(file_stream) => file_stream, Err(err) if err.kind() == ErrorKind::NotFound => { TRIGGER_BACKUP_FILE_UPLOAD.notified().await; continue; } Err(err) => return Err(err.into()), }; while let Some(file) = file_stream.next_entry().await? { let path = file.path(); if logs_waiting_for_confirmation.lock()?.contains(&path) { continue; } let Ok(BackupFileInfo { backup_id, log_id, additional_data, }) = path.clone().try_into() else { continue; }; // Skip additional data files (attachments, user keys). They will be // handled when we iterate over the corresponding files with the // main content if additional_data.is_some() { continue; } if let Some(log_id) = log_id { log::upload_files(tx, backup_id, log_id).await?; logs_waiting_for_confirmation.lock()?.insert(path); } else { compaction::upload_files(backup_client, user_identity, backup_id) .await?; } } TRIGGER_BACKUP_FILE_UPLOAD.notified().await; } } async fn delete_confirmed_logs( rx: &mut Pin>>>, logs_waiting_for_confirmation: &Mutex>, ) -> Result { while let Some(LogUploadConfirmation { backup_id, log_id }) = rx.next().await.transpose()? { let path = get_backup_log_file_path(&backup_id, &log_id.to_string(), false)?; logs_waiting_for_confirmation .lock()? .remove(&PathBuf::from(path)); tokio::spawn(log::cleanup_files(backup_id, log_id)); } Err(BackupHandlerError::WSClosed) } -mod compaction { +pub mod compaction { use super::*; pub async fn upload_files( backup_client: &BackupClient, user_identity: &UserIdentity, backup_id: String, ) -> Result<(), BackupHandlerError> { let user_data_path = get_backup_file_path(&backup_id, false)?; let user_data = tokio::fs::read(&user_data_path).await?; let user_keys_path = get_backup_user_keys_file_path(&backup_id)?; let user_keys = tokio::fs::read(&user_keys_path).await?; let attachments_path = get_backup_file_path(&backup_id, true)?; let attachments = match tokio::fs::read(&attachments_path).await { Ok(data) => data.lines().collect::>()?, Err(err) if err.kind() == ErrorKind::NotFound => Vec::new(), Err(err) => return Err(err.into()), }; let backup_data = BackupData { backup_id: backup_id.clone(), user_data, user_keys, attachments, }; backup_client .upload_backup(user_identity, backup_data) .await?; + compaction_upload_promises::resolve(&backup_id, Ok(())); tokio::spawn(cleanup_files(backup_id)); Ok(()) } pub async fn cleanup_files(backup_id: String) { let backup_files_cleanup = async { let user_data_path = get_backup_file_path(&backup_id, false)?; tokio::fs::remove_file(&user_data_path).await?; let user_keys_path = get_backup_user_keys_file_path(&backup_id)?; tokio::fs::remove_file(&user_keys_path).await?; let attachments_path = get_backup_file_path(&backup_id, true)?; match tokio::fs::remove_file(&attachments_path).await { Ok(()) => Result::<_, Box>::Ok(()), Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), } }; if let Err(err) = backup_files_cleanup.await { println!("Error when cleaning up the backup files: {err:?}"); } } } mod log { use backup_client::SinkExt; use super::*; pub async fn upload_files( tx: &mut Pin>>, backup_id: String, log_id: usize, ) -> Result<(), BackupHandlerError> { let log_id_string = log_id.to_string(); let content_path = get_backup_log_file_path(&backup_id, &log_id_string, false)?; let content = tokio::fs::read(&content_path).await?; let attachments_path = get_backup_log_file_path(&backup_id, &log_id_string, true)?; let attachments = match tokio::fs::read(&attachments_path).await { Ok(data) => Some(data.lines().collect::>()?), Err(err) if err.kind() == ErrorKind::NotFound => None, Err(err) => return Err(err.into()), }; let log_data = UploadLogRequest { backup_id, log_id, content, attachments, }; tx.send(log_data.clone()).await?; Ok(()) } pub async fn cleanup_files(backup_id: String, log_id: usize) { let backup_files_cleanup = async { let log_id = log_id.to_string(); let path = get_backup_log_file_path(&backup_id, &log_id, false)?; tokio::fs::remove_file(&path).await?; let attachments_path = get_backup_log_file_path(&backup_id, &log_id, true)?; match tokio::fs::remove_file(&attachments_path).await { Ok(()) => Result::<_, Box>::Ok(()), Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), } }; if let Err(err) = backup_files_cleanup.await { println!("{err:?}"); } } } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BackupHandlerError { BackupError(BackupError), BackupWSError(WSError), WSClosed, IoError(std::io::Error), CxxException(cxx::Exception), LockError, } impl From> for BackupHandlerError { fn from(_: std::sync::PoisonError) -> Self { Self::LockError } } diff --git a/native/native_rust_library/src/future_manager.rs b/native/native_rust_library/src/future_manager.rs new file mode 100644 index 000000000..507b6eea6 --- /dev/null +++ b/native/native_rust_library/src/future_manager.rs @@ -0,0 +1,77 @@ +use crate::RUNTIME; +use lazy_static::lazy_static; +use std::any::Any; +use std::convert::Infallible; +use std::{collections::HashMap, future::Future}; +use tokio::sync::oneshot; +use tokio::sync::Mutex; + +lazy_static! { + static ref FUTURE_MANAGER: Mutex = Default::default(); +} + +pub mod ffi { + use super::*; + + pub fn resolve_unit_future(future_id: usize) { + RUNTIME.spawn(async move { + super::resolve_future(future_id, Ok(())).await; + }); + } + + pub fn reject_future(future_id: usize, error: String) { + RUNTIME.spawn(async move { + super::resolve_future::(future_id, Err(error)).await; + }); + } +} + +#[derive(Debug, Default)] +pub struct FutureManager { + promises: + HashMap, String>>>, + next_id: usize, +} + +pub async fn new_future( +) -> (usize, impl Future>) { + let mut manager = FUTURE_MANAGER.lock().await; + + let id = manager.next_id; + manager.next_id += 1; + + let (tx, rx) = oneshot::channel(); + manager.promises.insert(id, tx); + + let future = async move { + match rx.await { + Ok(Ok(any_value)) => { + if let Ok(boxed_value) = any_value.downcast() { + Ok(*boxed_value) + } else { + Err("Type mismatch".to_string()) + } + } + Ok(Err(err)) => Err(err), + Err(_) => Err("Promise sender dropped".to_string()), + } + }; + + (id, future) +} + +pub async fn resolve_future( + id: usize, + value: Result, +) { + let mut manager = FUTURE_MANAGER.lock().await; + + let tx = match manager.promises.remove(&id) { + Some(tx) => tx, + None => return, + }; + + let boxed_value = + value.map(|value| Box::new(value) as Box); + let _ = tx.send(boxed_value); +} diff --git a/native/native_rust_library/src/lib.rs b/native/native_rust_library/src/lib.rs index fa9f67092..3015a663a 100644 --- a/native/native_rust_library/src/lib.rs +++ b/native/native_rust_library/src/lib.rs @@ -1,1499 +1,1510 @@ use backup::ffi::*; use comm_opaque2::client::{Login, Registration}; use comm_opaque2::grpc::opaque_error_to_grpc_status as handle_error; use exact_user_search::find_user_id_for_wallet_address; use ffi::{bool_callback, string_callback, void_callback}; +use future_manager::ffi::*; use grpc_clients::identity::protos::auth::{ GetDeviceListRequest, UpdateDeviceListRequest, }; use grpc_clients::identity::protos::authenticated::{ InboundKeyInfo, InboundKeysForUserRequest, KeyserverKeysResponse, OutboundKeyInfo, OutboundKeysForUserRequest, RefreshUserPrekeysRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UploadOneTimeKeysRequest, }; use grpc_clients::identity::protos::unauth::{ AuthResponse, DeviceKeyUpload, DeviceType, Empty, IdentityKeyInfo, OpaqueLoginFinishRequest, OpaqueLoginStartRequest, Prekey, RegistrationFinishRequest, RegistrationStartRequest, SecondaryDeviceKeysUploadRequest, WalletAuthRequest, }; use grpc_clients::identity::{ get_auth_client, get_unauthenticated_client, REQUEST_METADATA_COOKIE_KEY, RESPONSE_METADATA_COOKIE_KEY, }; use lazy_static::lazy_static; use serde::Serialize; use std::sync::Arc; use tokio::runtime::{Builder, Runtime}; use tonic::{Request, Status}; use tracing::instrument; use wallet_registration::register_wallet_user; mod argon2_tools; mod backup; mod constants; mod exact_user_search; +mod future_manager; mod wallet_registration; use argon2_tools::compute_backup_key_str; mod generated { // We get the CODE_VERSION from this generated file include!(concat!(env!("OUT_DIR"), "/version.rs")); // We get the IDENTITY_SOCKET_ADDR from this generated file include!(concat!(env!("OUT_DIR"), "/socket_config.rs")); } pub use generated::CODE_VERSION; pub use generated::{BACKUP_SOCKET_ADDR, IDENTITY_SOCKET_ADDR}; #[cfg(not(target_os = "android"))] pub const DEVICE_TYPE: DeviceType = DeviceType::Ios; #[cfg(target_os = "android")] pub const DEVICE_TYPE: DeviceType = DeviceType::Android; lazy_static! { static ref RUNTIME: Arc = Arc::new(Builder::new_multi_thread().enable_all().build().unwrap()); } #[cxx::bridge] mod ffi { extern "Rust" { #[cxx_name = "identityRegisterPasswordUser"] fn register_password_user( username: String, password: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ); #[cxx_name = "identityLogInPasswordUser"] fn log_in_password_user( username: String, password: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ); #[cxx_name = "identityRegisterWalletUser"] fn register_wallet_user( siwe_message: String, siwe_signature: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ); #[cxx_name = "identityLogInWalletUser"] fn log_in_wallet_user( siwe_message: String, siwe_signature: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ); #[cxx_name = "identityUpdateUserPassword"] fn update_user_password( user_id: String, device_id: String, access_token: String, password: String, promise_id: u32, ); #[cxx_name = "identityDeleteUser"] fn delete_user( user_id: String, device_id: String, access_token: String, promise_id: u32, ); #[cxx_name = "identityGetOutboundKeysForUser"] fn get_outbound_keys_for_user( auth_user_id: String, auth_device_id: String, auth_access_token: String, user_id: String, promise_id: u32, ); #[cxx_name = "identityGetInboundKeysForUser"] fn get_inbound_keys_for_user( auth_user_id: String, auth_device_id: String, auth_access_token: String, user_id: String, promise_id: u32, ); #[cxx_name = "identityRefreshUserPrekeys"] fn refresh_user_prekeys( auth_user_id: String, auth_device_id: String, auth_access_token: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, promise_id: u32, ); #[cxx_name = "identityGenerateNonce"] fn generate_nonce(promise_id: u32); #[cxx_name = "identityVersionSupported"] fn version_supported(promise_id: u32); #[cxx_name = "identityUploadOneTimeKeys"] fn upload_one_time_keys( auth_user_id: String, auth_device_id: String, auth_access_token: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ); #[cxx_name = "identityGetKeyserverKeys"] fn get_keyserver_keys( user_id: String, device_id: String, access_token: String, keyserver_id: String, promise_id: u32, ); #[cxx_name = "identityGetDeviceListForUser"] fn get_device_list_for_user( auth_user_id: String, auth_device_id: String, auth_access_token: String, user_id: String, since_timestamp: i64, promise_id: u32, ); #[cxx_name = "identityUpdateDeviceList"] fn update_device_list( auth_user_id: String, auth_device_id: String, auth_access_token: String, update_payload: String, promise_id: u32, ); #[cxx_name = "identityUploadSecondaryDeviceKeysAndLogIn"] fn upload_secondary_device_keys_and_log_in( user_id: String, challenge_response: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ); #[cxx_name = "identityFindUserIDForWalletAddress"] fn find_user_id_for_wallet_address(wallet_address: String, promise_id: u32); // Argon2 #[cxx_name = "compute_backup_key"] fn compute_backup_key_str( password: &str, backup_id: &str, ) -> Result<[u8; 32]>; } unsafe extern "C++" { include!("RustCallback.h"); #[namespace = "comm"] #[cxx_name = "stringCallback"] fn string_callback(error: String, promise_id: u32, ret: String); #[namespace = "comm"] #[cxx_name = "voidCallback"] fn void_callback(error: String, promise_id: u32); #[namespace = "comm"] #[cxx_name = "boolCallback"] fn bool_callback(error: String, promise_id: u32, ret: bool); } // AES cryptography #[namespace = "comm"] unsafe extern "C++" { include!("RustAESCrypto.h"); #[allow(unused)] #[cxx_name = "aesGenerateKey"] fn generate_key(buffer: &mut [u8]) -> Result<()>; /// The first two argument aren't mutated but creation of Java ByteBuffer /// requires the underlying bytes to be mutable. #[allow(unused)] #[cxx_name = "aesEncrypt"] fn encrypt( key: &mut [u8], plaintext: &mut [u8], sealed_data: &mut [u8], ) -> Result<()>; /// The first two argument aren't mutated but creation of Java ByteBuffer /// requires the underlying bytes to be mutable. #[allow(unused)] #[cxx_name = "aesDecrypt"] fn decrypt( key: &mut [u8], sealed_data: &mut [u8], plaintext: &mut [u8], ) -> Result<()>; } // Comm Services Auth Metadata Emission #[namespace = "comm"] unsafe extern "C++" { include!("RustCSAMetadataEmitter.h"); #[allow(unused)] #[cxx_name = "sendAuthMetadataToJS"] fn send_auth_metadata_to_js( access_token: String, user_id: String, ) -> Result<()>; } // Backup extern "Rust" { #[cxx_name = "startBackupHandler"] fn start_backup_handler() -> Result<()>; #[cxx_name = "stopBackupHandler"] fn stop_backup_handler() -> Result<()>; #[cxx_name = "triggerBackupFileUpload"] fn trigger_backup_file_upload(); #[cxx_name = "createBackup"] fn create_backup_sync( backup_id: String, backup_secret: String, pickle_key: String, pickled_account: String, user_data: String, promise_id: u32, ); #[cxx_name = "restoreBackup"] fn restore_backup_sync(backup_secret: String, promise_id: u32); } // Secure store #[namespace = "comm"] unsafe extern "C++" { include!("RustSecureStore.h"); #[allow(unused)] #[cxx_name = "secureStoreSet"] fn secure_store_set(key: &str, value: String) -> Result<()>; #[cxx_name = "secureStoreGet"] fn secure_store_get(key: &str) -> Result; } // C++ Backup creation #[namespace = "comm"] unsafe extern "C++" { include!("RustBackupExecutor.h"); #[allow(unused)] #[cxx_name = "getBackupDirectoryPath"] fn get_backup_directory_path() -> Result; #[allow(unused)] #[cxx_name = "getBackupFilePath"] fn get_backup_file_path( backup_id: &str, is_attachments: bool, ) -> Result; #[allow(unused)] #[cxx_name = "getBackupLogFilePath"] fn get_backup_log_file_path( backup_id: &str, log_id: &str, is_attachments: bool, ) -> Result; #[allow(unused)] #[cxx_name = "getBackupUserKeysFilePath"] fn get_backup_user_keys_file_path(backup_id: &str) -> Result; #[cxx_name = "createMainCompaction"] - fn create_main_compaction(backup_id: String) -> Result<()>; + fn create_main_compaction(backup_id: &str, future_id: usize); #[allow(unused)] #[cxx_name = "restoreFromMainCompaction"] fn restore_from_main_compaction( main_compaction_path: String, main_compaction_encryption_key: String, ) -> Result<()>; #[allow(unused)] #[cxx_name = "restoreFromBackupLog"] fn restore_from_backup_log(backup_log: Vec) -> Result<()>; } + + // Future handling from C++ + extern "Rust" { + #[cxx_name = "resolveUnitFuture"] + fn resolve_unit_future(future_id: usize); + + #[cxx_name = "rejectFuture"] + fn reject_future(future_id: usize, error: String); + } } fn handle_string_result_as_callback( result: Result, promise_id: u32, ) where E: std::fmt::Display, { match result { Err(e) => string_callback(e.to_string(), promise_id, "".to_string()), Ok(r) => string_callback("".to_string(), promise_id, r), } } fn handle_void_result_as_callback(result: Result<(), E>, promise_id: u32) where E: std::fmt::Display, { match result { Err(e) => void_callback(e.to_string(), promise_id), Ok(_) => void_callback("".to_string(), promise_id), } } fn handle_bool_result_as_callback(result: Result, promise_id: u32) where E: std::fmt::Display, { match result { Err(e) => bool_callback(e.to_string(), promise_id, false), Ok(r) => bool_callback("".to_string(), promise_id, r), } } fn generate_nonce(promise_id: u32) { RUNTIME.spawn(async move { let result = fetch_nonce().await; handle_string_result_as_callback(result, promise_id); }); } async fn fetch_nonce() -> Result { let mut identity_client = get_unauthenticated_client( IDENTITY_SOCKET_ADDR, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let nonce = identity_client .generate_nonce(Empty {}) .await? .into_inner() .nonce; Ok(nonce) } fn version_supported(promise_id: u32) { RUNTIME.spawn(async move { let result = version_supported_helper().await; handle_bool_result_as_callback(result, promise_id); }); } async fn version_supported_helper() -> Result { let mut identity_client = get_unauthenticated_client( IDENTITY_SOCKET_ADDR, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client.ping(Empty {}).await; match response { Ok(_) => Ok(true), Err(e) => { if grpc_clients::error::is_version_unsupported(&e) { Ok(false) } else { Err(e.into()) } } } } fn get_keyserver_keys( user_id: String, device_id: String, access_token: String, keyserver_id: String, promise_id: u32, ) { RUNTIME.spawn(async move { let get_keyserver_keys_request = OutboundKeysForUserRequest { user_id: keyserver_id, }; let auth_info = AuthInfo { access_token, user_id, device_id, }; let result = get_keyserver_keys_helper(get_keyserver_keys_request, auth_info).await; handle_string_result_as_callback(result, promise_id); }); } async fn get_keyserver_keys_helper( get_keyserver_keys_request: OutboundKeysForUserRequest, auth_info: AuthInfo, ) -> Result { let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client .get_keyserver_keys(get_keyserver_keys_request) .await? .into_inner(); let keyserver_keys = OutboundKeyInfoResponse::try_from(response)?; Ok(serde_json::to_string(&keyserver_keys)?) } struct AuthInfo { user_id: String, device_id: String, access_token: String, } #[instrument] fn register_password_user( username: String, password: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ) { RUNTIME.spawn(async move { let password_user_info = PasswordUserInfo { username, password, key_payload, key_payload_signature, content_prekey, content_prekey_signature, notif_prekey, notif_prekey_signature, content_one_time_keys, notif_one_time_keys, }; let result = register_password_user_helper(password_user_info).await; handle_string_result_as_callback(result, promise_id); }); } struct PasswordUserInfo { username: String, password: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct UserIDAndDeviceAccessToken { #[serde(rename = "userID")] user_id: String, access_token: String, } impl From for UserIDAndDeviceAccessToken { fn from(value: AuthResponse) -> Self { let AuthResponse { user_id, access_token, } = value; Self { user_id, access_token, } } } async fn register_password_user_helper( password_user_info: PasswordUserInfo, ) -> Result { let mut client_registration = Registration::new(); let opaque_registration_request = client_registration .start(&password_user_info.password) .map_err(handle_error)?; let registration_start_request = RegistrationStartRequest { opaque_registration_request, username: password_user_info.username, device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: password_user_info.key_payload, payload_signature: password_user_info.key_payload_signature, social_proof: None, }), content_upload: Some(Prekey { prekey: password_user_info.content_prekey, prekey_signature: password_user_info.content_prekey_signature, }), notif_upload: Some(Prekey { prekey: password_user_info.notif_prekey, prekey_signature: password_user_info.notif_prekey_signature, }), one_time_content_prekeys: password_user_info.content_one_time_keys, one_time_notif_prekeys: password_user_info.notif_one_time_keys, device_type: DEVICE_TYPE.into(), }), }; let mut identity_client = get_unauthenticated_client( IDENTITY_SOCKET_ADDR, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client .register_password_user_start(registration_start_request) .await?; // We need to get the load balancer cookie from from the response and send it // in the subsequent request to ensure it is routed to the same identity // service instance as the first request let cookie = response .metadata() .get(RESPONSE_METADATA_COOKIE_KEY) .cloned(); let registration_start_response = response.into_inner(); let opaque_registration_upload = client_registration .finish( &password_user_info.password, ®istration_start_response.opaque_registration_response, ) .map_err(handle_error)?; let registration_finish_request = RegistrationFinishRequest { session_id: registration_start_response.session_id, opaque_registration_upload, }; let mut finish_request = Request::new(registration_finish_request); // Cookie won't be available in local dev environments if let Some(cookie_metadata) = cookie { finish_request .metadata_mut() .insert(REQUEST_METADATA_COOKIE_KEY, cookie_metadata); } let registration_finish_response = identity_client .register_password_user_finish(finish_request) .await? .into_inner(); let user_id_and_access_token = UserIDAndDeviceAccessToken::from(registration_finish_response); Ok(serde_json::to_string(&user_id_and_access_token)?) } #[instrument] fn log_in_password_user( username: String, password: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ) { RUNTIME.spawn(async move { let password_user_info = PasswordUserInfo { username, password, key_payload, key_payload_signature, content_prekey, content_prekey_signature, notif_prekey, notif_prekey_signature, content_one_time_keys, notif_one_time_keys, }; let result = log_in_password_user_helper(password_user_info).await; handle_string_result_as_callback(result, promise_id); }); } async fn log_in_password_user_helper( password_user_info: PasswordUserInfo, ) -> Result { let mut client_login = Login::new(); let opaque_login_request = client_login .start(&password_user_info.password) .map_err(handle_error)?; let login_start_request = OpaqueLoginStartRequest { opaque_login_request, username: password_user_info.username, device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: password_user_info.key_payload, payload_signature: password_user_info.key_payload_signature, social_proof: None, }), content_upload: Some(Prekey { prekey: password_user_info.content_prekey, prekey_signature: password_user_info.content_prekey_signature, }), notif_upload: Some(Prekey { prekey: password_user_info.notif_prekey, prekey_signature: password_user_info.notif_prekey_signature, }), one_time_content_prekeys: password_user_info.content_one_time_keys, one_time_notif_prekeys: password_user_info.notif_one_time_keys, device_type: DEVICE_TYPE.into(), }), }; let mut identity_client = get_unauthenticated_client( IDENTITY_SOCKET_ADDR, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client .log_in_password_user_start(login_start_request) .await?; // We need to get the load balancer cookie from from the response and send it // in the subsequent request to ensure it is routed to the same identity // service instance as the first request let cookie = response .metadata() .get(RESPONSE_METADATA_COOKIE_KEY) .cloned(); let login_start_response = response.into_inner(); let opaque_login_upload = client_login .finish(&login_start_response.opaque_login_response) .map_err(handle_error)?; let login_finish_request = OpaqueLoginFinishRequest { session_id: login_start_response.session_id, opaque_login_upload, }; let mut finish_request = Request::new(login_finish_request); // Cookie won't be available in local dev environments if let Some(cookie_metadata) = cookie { finish_request .metadata_mut() .insert(REQUEST_METADATA_COOKIE_KEY, cookie_metadata); } let login_finish_response = identity_client .log_in_password_user_finish(finish_request) .await? .into_inner(); let user_id_and_access_token = UserIDAndDeviceAccessToken::from(login_finish_response); Ok(serde_json::to_string(&user_id_and_access_token)?) } struct WalletUserInfo { siwe_message: String, siwe_signature: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, } #[instrument] fn log_in_wallet_user( siwe_message: String, siwe_signature: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ) { RUNTIME.spawn(async move { let wallet_user_info = WalletUserInfo { siwe_message, siwe_signature, key_payload, key_payload_signature, content_prekey, content_prekey_signature, notif_prekey, notif_prekey_signature, content_one_time_keys, notif_one_time_keys, }; let result = log_in_wallet_user_helper(wallet_user_info).await; handle_string_result_as_callback(result, promise_id); }); } async fn log_in_wallet_user_helper( wallet_user_info: WalletUserInfo, ) -> Result { let login_request = WalletAuthRequest { siwe_message: wallet_user_info.siwe_message, siwe_signature: wallet_user_info.siwe_signature, device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: wallet_user_info.key_payload, payload_signature: wallet_user_info.key_payload_signature, social_proof: None, // The SIWE message and signature are the social proof }), content_upload: Some(Prekey { prekey: wallet_user_info.content_prekey, prekey_signature: wallet_user_info.content_prekey_signature, }), notif_upload: Some(Prekey { prekey: wallet_user_info.notif_prekey, prekey_signature: wallet_user_info.notif_prekey_signature, }), one_time_content_prekeys: wallet_user_info.content_one_time_keys, one_time_notif_prekeys: wallet_user_info.notif_one_time_keys, device_type: DEVICE_TYPE.into(), }), }; let mut identity_client = get_unauthenticated_client( IDENTITY_SOCKET_ADDR, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let login_response = identity_client .log_in_wallet_user(login_request) .await? .into_inner(); let user_id_and_access_token = UserIDAndDeviceAccessToken::from(login_response); Ok(serde_json::to_string(&user_id_and_access_token)?) } struct UpdatePasswordInfo { user_id: String, device_id: String, access_token: String, password: String, } fn update_user_password( user_id: String, device_id: String, access_token: String, password: String, promise_id: u32, ) { RUNTIME.spawn(async move { let update_password_info = UpdatePasswordInfo { access_token, user_id, device_id, password, }; let result = update_user_password_helper(update_password_info).await; handle_void_result_as_callback(result, promise_id); }); } async fn update_user_password_helper( update_password_info: UpdatePasswordInfo, ) -> Result<(), Error> { let mut client_registration = Registration::new(); let opaque_registration_request = client_registration .start(&update_password_info.password) .map_err(handle_error)?; let update_password_start_request = UpdateUserPasswordStartRequest { opaque_registration_request, }; let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, update_password_info.user_id, update_password_info.device_id, update_password_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client .update_user_password_start(update_password_start_request) .await?; // We need to get the load balancer cookie from from the response and send it // in the subsequent request to ensure it is routed to the same identity // service instance as the first request let cookie = response .metadata() .get(RESPONSE_METADATA_COOKIE_KEY) .cloned(); let update_password_start_response = response.into_inner(); let opaque_registration_upload = client_registration .finish( &update_password_info.password, &update_password_start_response.opaque_registration_response, ) .map_err(handle_error)?; let update_password_finish_request = UpdateUserPasswordFinishRequest { session_id: update_password_start_response.session_id, opaque_registration_upload, }; let mut finish_request = Request::new(update_password_finish_request); // Cookie won't be available in local dev environments if let Some(cookie_metadata) = cookie { finish_request .metadata_mut() .insert(REQUEST_METADATA_COOKIE_KEY, cookie_metadata); } identity_client .update_user_password_finish(finish_request) .await?; Ok(()) } fn delete_user( user_id: String, device_id: String, access_token: String, promise_id: u32, ) { RUNTIME.spawn(async move { let auth_info = AuthInfo { access_token, user_id, device_id, }; let result = delete_user_helper(auth_info).await; handle_void_result_as_callback(result, promise_id); }); } async fn delete_user_helper(auth_info: AuthInfo) -> Result<(), Error> { let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; identity_client.delete_user(Empty {}).await?; Ok(()) } struct GetOutboundKeysRequestInfo { user_id: String, } struct GetInboundKeysRequestInfo { user_id: String, } // This struct should not be altered without also updating // OutboundKeyInfoResponse in lib/types/identity-service-types.js #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct OutboundKeyInfoResponse { pub payload: String, pub payload_signature: String, pub social_proof: Option, pub content_prekey: String, pub content_prekey_signature: String, pub notif_prekey: String, pub notif_prekey_signature: String, pub one_time_content_prekey: Option, pub one_time_notif_prekey: Option, } // This struct should not be altered without also updating // InboundKeyInfoResponse in lib/types/identity-service-types.js #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct InboundKeyInfoResponse { pub payload: String, pub payload_signature: String, pub social_proof: Option, pub content_prekey: String, pub content_prekey_signature: String, pub notif_prekey: String, pub notif_prekey_signature: String, } impl TryFrom for OutboundKeyInfoResponse { type Error = Error; fn try_from(key_info: OutboundKeyInfo) -> Result { let identity_info = key_info.identity_info.ok_or(Error::MissingResponseData)?; let IdentityKeyInfo { payload, payload_signature, social_proof, } = identity_info; let content_prekey = key_info.content_prekey.ok_or(Error::MissingResponseData)?; let Prekey { prekey: content_prekey_value, prekey_signature: content_prekey_signature, } = content_prekey; let notif_prekey = key_info.notif_prekey.ok_or(Error::MissingResponseData)?; let Prekey { prekey: notif_prekey_value, prekey_signature: notif_prekey_signature, } = notif_prekey; let one_time_content_prekey = key_info.one_time_content_prekey; let one_time_notif_prekey = key_info.one_time_notif_prekey; Ok(Self { payload, payload_signature, social_proof, content_prekey: content_prekey_value, content_prekey_signature, notif_prekey: notif_prekey_value, notif_prekey_signature, one_time_content_prekey, one_time_notif_prekey, }) } } impl TryFrom for OutboundKeyInfoResponse { type Error = Error; fn try_from(response: KeyserverKeysResponse) -> Result { let key_info = response.keyserver_info.ok_or(Error::MissingResponseData)?; Self::try_from(key_info) } } fn get_outbound_keys_for_user( auth_user_id: String, auth_device_id: String, auth_access_token: String, user_id: String, promise_id: u32, ) { RUNTIME.spawn(async move { let get_outbound_keys_request_info = GetOutboundKeysRequestInfo { user_id }; let auth_info = AuthInfo { access_token: auth_access_token, user_id: auth_user_id, device_id: auth_device_id, }; let result = get_outbound_keys_for_user_helper( get_outbound_keys_request_info, auth_info, ) .await; handle_string_result_as_callback(result, promise_id); }); } async fn get_outbound_keys_for_user_helper( get_outbound_keys_request_info: GetOutboundKeysRequestInfo, auth_info: AuthInfo, ) -> Result { let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client .get_outbound_keys_for_user(OutboundKeysForUserRequest { user_id: get_outbound_keys_request_info.user_id, }) .await? .into_inner(); let outbound_key_info: Vec = response .devices .into_values() .map(OutboundKeyInfoResponse::try_from) .collect::, _>>()?; Ok(serde_json::to_string(&outbound_key_info)?) } impl TryFrom for InboundKeyInfoResponse { type Error = Error; fn try_from(key_info: InboundKeyInfo) -> Result { let identity_info = key_info.identity_info.ok_or(Error::MissingResponseData)?; let IdentityKeyInfo { payload, payload_signature, social_proof, } = identity_info; let content_prekey = key_info.content_prekey.ok_or(Error::MissingResponseData)?; let Prekey { prekey: content_prekey_value, prekey_signature: content_prekey_signature, } = content_prekey; let notif_prekey = key_info.notif_prekey.ok_or(Error::MissingResponseData)?; let Prekey { prekey: notif_prekey_value, prekey_signature: notif_prekey_signature, } = notif_prekey; Ok(Self { payload, payload_signature, social_proof, content_prekey: content_prekey_value, content_prekey_signature, notif_prekey: notif_prekey_value, notif_prekey_signature, }) } } fn get_inbound_keys_for_user( auth_user_id: String, auth_device_id: String, auth_access_token: String, user_id: String, promise_id: u32, ) { RUNTIME.spawn(async move { let get_inbound_keys_request_info = GetInboundKeysRequestInfo { user_id }; let auth_info = AuthInfo { access_token: auth_access_token, user_id: auth_user_id, device_id: auth_device_id, }; let result = get_inbound_keys_for_user_helper( get_inbound_keys_request_info, auth_info, ) .await; handle_string_result_as_callback(result, promise_id); }); } async fn get_inbound_keys_for_user_helper( get_inbound_keys_request_info: GetInboundKeysRequestInfo, auth_info: AuthInfo, ) -> Result { let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client .get_inbound_keys_for_user(InboundKeysForUserRequest { user_id: get_inbound_keys_request_info.user_id, }) .await? .into_inner(); let inbound_key_info: Vec = response .devices .into_values() .map(InboundKeyInfoResponse::try_from) .collect::, _>>()?; Ok(serde_json::to_string(&inbound_key_info)?) } fn refresh_user_prekeys( auth_user_id: String, auth_device_id: String, auth_access_token: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, promise_id: u32, ) { RUNTIME.spawn(async move { let refresh_request = RefreshUserPrekeysRequest { new_content_prekeys: Some(Prekey { prekey: content_prekey, prekey_signature: content_prekey_signature, }), new_notif_prekeys: Some(Prekey { prekey: notif_prekey, prekey_signature: notif_prekey_signature, }), }; let auth_info = AuthInfo { access_token: auth_access_token, user_id: auth_user_id, device_id: auth_device_id, }; let result = refresh_user_prekeys_helper(refresh_request, auth_info).await; handle_void_result_as_callback(result, promise_id); }); } async fn refresh_user_prekeys_helper( refresh_request: RefreshUserPrekeysRequest, auth_info: AuthInfo, ) -> Result<(), Error> { get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await? .refresh_user_prekeys(refresh_request) .await?; Ok(()) } #[instrument] fn upload_one_time_keys( auth_user_id: String, auth_device_id: String, auth_access_token: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ) { RUNTIME.spawn(async move { let upload_request = UploadOneTimeKeysRequest { content_one_time_prekeys: content_one_time_keys, notif_one_time_prekeys: notif_one_time_keys, }; let auth_info = AuthInfo { access_token: auth_access_token, user_id: auth_user_id, device_id: auth_device_id, }; let result = upload_one_time_keys_helper(auth_info, upload_request).await; handle_void_result_as_callback(result, promise_id); }); } async fn upload_one_time_keys_helper( auth_info: AuthInfo, upload_request: UploadOneTimeKeysRequest, ) -> Result<(), Error> { let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; identity_client.upload_one_time_keys(upload_request).await?; Ok(()) } fn get_device_list_for_user( auth_user_id: String, auth_device_id: String, auth_access_token: String, user_id: String, since_timestamp: i64, promise_id: u32, ) { RUNTIME.spawn(async move { let auth_info = AuthInfo { access_token: auth_access_token, user_id: auth_user_id, device_id: auth_device_id, }; let since_timestamp = Option::from(since_timestamp).filter(|&t| t > 0); let result = get_device_list_for_user_helper(auth_info, user_id, since_timestamp) .await; handle_string_result_as_callback(result, promise_id); }); } async fn get_device_list_for_user_helper( auth_info: AuthInfo, user_id: String, since_timestamp: Option, ) -> Result { let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let response = identity_client .get_device_list_for_user(GetDeviceListRequest { user_id, since_timestamp, }) .await? .into_inner(); let payload = serde_json::to_string(&response.device_list_updates)?; Ok(payload) } fn update_device_list( auth_user_id: String, auth_device_id: String, auth_access_token: String, update_payload: String, promise_id: u32, ) { RUNTIME.spawn(async move { let auth_info = AuthInfo { access_token: auth_access_token, user_id: auth_user_id, device_id: auth_device_id, }; let result = update_device_list_helper(auth_info, update_payload).await; handle_void_result_as_callback(result, promise_id); }); } async fn update_device_list_helper( auth_info: AuthInfo, update_payload: String, ) -> Result<(), Error> { let mut identity_client = get_auth_client( IDENTITY_SOCKET_ADDR, auth_info.user_id, auth_info.device_id, auth_info.access_token, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let update_request = UpdateDeviceListRequest { new_device_list: update_payload, }; identity_client .update_device_list_for_user(update_request) .await?; Ok(()) } fn upload_secondary_device_keys_and_log_in( user_id: String, challenge_response: String, key_payload: String, key_payload_signature: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, promise_id: u32, ) { RUNTIME.spawn(async move { let device_key_upload = DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: key_payload, payload_signature: key_payload_signature, social_proof: None, }), content_upload: Some(Prekey { prekey: content_prekey, prekey_signature: content_prekey_signature, }), notif_upload: Some(Prekey { prekey: notif_prekey, prekey_signature: notif_prekey_signature, }), one_time_content_prekeys: content_one_time_keys, one_time_notif_prekeys: notif_one_time_keys, device_type: DEVICE_TYPE.into(), }; let result = upload_secondary_device_keys_and_log_in_helper( user_id, challenge_response, device_key_upload, ) .await; handle_string_result_as_callback(result, promise_id); }); } async fn upload_secondary_device_keys_and_log_in_helper( user_id: String, challenge_response: String, device_key_upload: DeviceKeyUpload, ) -> Result { let mut identity_client = get_unauthenticated_client( IDENTITY_SOCKET_ADDR, CODE_VERSION, DEVICE_TYPE.as_str_name().to_lowercase(), ) .await?; let request = SecondaryDeviceKeysUploadRequest { user_id, challenge_response, device_key_upload: Some(device_key_upload), }; let response = identity_client .upload_keys_for_registered_device_and_log_in(request) .await? .into_inner(); let user_id_and_access_token = UserIDAndDeviceAccessToken::from(response); Ok(serde_json::to_string(&user_id_and_access_token)?) } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(fmt = "{}", "_0.message()")] TonicGRPC(Status), #[display(fmt = "{}", "_0")] SerdeJson(serde_json::Error), #[display(fmt = "Missing response data")] MissingResponseData, #[display(fmt = "{}", "_0")] GRPClient(grpc_clients::error::Error), } #[cfg(test)] mod tests { use super::{BACKUP_SOCKET_ADDR, CODE_VERSION, IDENTITY_SOCKET_ADDR}; #[test] fn test_code_version_exists() { assert!(CODE_VERSION > 0); } #[test] fn test_identity_socket_addr_exists() { assert!(IDENTITY_SOCKET_ADDR.len() > 0); assert!(BACKUP_SOCKET_ADDR.len() > 0); } }