diff --git a/native/backup/use-client-backup.js b/native/backup/use-client-backup.js index 320e38b51..bc6a87226 100644 --- a/native/backup/use-client-backup.js +++ b/native/backup/use-client-backup.js @@ -1,101 +1,122 @@ // @flow import * as React from 'react'; +import { useInvalidCSATLogOut } from 'lib/actions/user-actions.js'; import { isLoggedIn } from 'lib/selectors/user-selectors.js'; import { - latestBackupInfoResponseValidator, type LatestBackupInfo, + latestBackupInfoResponseValidator, type UserKeys, userKeysResponseValidator, } from 'lib/types/backup-types.js'; +import { getMessageForException } from 'lib/utils/errors.js'; import { assertWithValidator } from 'lib/utils/validation-utils.js'; import { useGetBackupSecretForLoggedInUser } from './use-get-backup-secret.js'; import { commCoreModule } from '../native-modules.js'; import { useSelector } from '../redux/redux-utils.js'; type ClientBackup = { +createFullBackup: () => Promise, +createUserKeysBackup: () => Promise, +retrieveLatestBackupInfo: () => Promise<{ +latestBackupInfo: LatestBackupInfo, +userIdentifier: string, }>, +getBackupUserKeys: ( userIdentifier: string, backupSecret: string, backupID: string, ) => Promise, }; async function getBackupUserKeys( userIdentifier: string, backupSecret: string, backupID: string, ): Promise { const userKeysResponse = await commCoreModule.getBackupUserKeys( userIdentifier, backupSecret, backupID, ); return assertWithValidator( JSON.parse(userKeysResponse), userKeysResponseValidator, ); } function useClientBackup(): ClientBackup { const currentUserID = useSelector( state => state.currentUserInfo && state.currentUserInfo.id, ); const currentUserInfo = useSelector(state => state.currentUserInfo); const loggedIn = useSelector(isLoggedIn); const getBackupSecret = useGetBackupSecretForLoggedInUser(); + const invalidTokenLogOut = useInvalidCSATLogOut(); + const authVerifiedEndpoint: (backupCallPromise: Promise) => Promise = + React.useCallback( + async backupCallPromise => { + try { + return await backupCallPromise; + } catch (e) { + const message = getMessageForException(e); + if (message === 'Unauthenticated') { + void invalidTokenLogOut(); + } + throw e; + } + }, + [invalidTokenLogOut], + ); + const createFullBackup = React.useCallback(async () => { if (!loggedIn || !currentUserID) { throw new Error('Attempt to upload backup for not logged in user.'); } const backupSecret = await getBackupSecret(); - return commCoreModule.createFullBackup(backupSecret); - }, [loggedIn, currentUserID, getBackupSecret]); + return authVerifiedEndpoint(commCoreModule.createFullBackup(backupSecret)); + }, [loggedIn, currentUserID, getBackupSecret, authVerifiedEndpoint]); const createUserKeysBackup = React.useCallback(async () => { if (!loggedIn || !currentUserID) { throw new Error('Attempt to upload User Keys for not logged in user.'); } const backupSecret = await getBackupSecret(); - return commCoreModule.createUserKeysBackup(backupSecret); - }, [loggedIn, currentUserID, getBackupSecret]); + return authVerifiedEndpoint( + commCoreModule.createUserKeysBackup(backupSecret), + ); + }, [loggedIn, currentUserID, getBackupSecret, authVerifiedEndpoint]); const retrieveLatestBackupInfo = React.useCallback(async () => { if (!loggedIn || !currentUserID || !currentUserInfo?.username) { throw new Error('Attempt to restore backup for not logged in user.'); } const userIdentifier = currentUserInfo?.username; const response = await commCoreModule.retrieveLatestBackupInfo(userIdentifier); const latestBackupInfo = assertWithValidator( JSON.parse(response), latestBackupInfoResponseValidator, ); return { latestBackupInfo, userIdentifier }; }, [currentUserID, currentUserInfo, loggedIn]); return React.useMemo( () => ({ createFullBackup, createUserKeysBackup, retrieveLatestBackupInfo, getBackupUserKeys, }), [createFullBackup, createUserKeysBackup, retrieveLatestBackupInfo], ); } export { useClientBackup }; diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs index 872d0f5b9..2c0f1358a 100644 --- a/native/native_rust_library/src/backup/upload_handler.rs +++ b/native/native_rust_library/src/backup/upload_handler.rs @@ -1,356 +1,357 @@ use super::file_info::BackupFileInfo; use super::get_user_identity_from_secure_store; use crate::backup::compaction_upload_promises; use crate::constants::BACKUP_SERVICE_CONNECTION_RETRY_DELAY; use crate::ffi::{ get_backup_directory_path, get_backup_file_path, get_backup_log_file_path, get_backup_user_keys_file_path, get_siwe_backup_message_path, }; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; use backup_client::UserIdentity; use backup_client::{ BackupClient, Error as BackupError, LogUploadConfirmation, Stream, StreamExt, }; use backup_client::{BackupData, Sink, UploadLogRequest}; use lazy_static::lazy_static; use std::collections::HashSet; use std::convert::Infallible; use std::error::Error; use std::future::Future; use std::io::BufRead; use std::io::ErrorKind; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::sync::Notify; use tokio::task::JoinHandle; lazy_static! { pub static ref UPLOAD_HANDLER: Arc>>> = Arc::new(Mutex::new(None)); static ref TRIGGER_BACKUP_FILE_UPLOAD: Arc = Arc::new(Notify::new()); static ref BACKUP_FOLDER_PATH: PathBuf = PathBuf::from( get_backup_directory_path().expect("Getting backup directory path failed") ); } pub mod ffi { use super::*; pub fn start_backup_handler() -> Result<(), Box> { let mut handle = UPLOAD_HANDLER.lock()?; match handle.take() { // Don't start backup handler if it's already running Some(handle) if !handle.is_finished() => (), _ => { *handle = Some(RUNTIME.spawn(super::start()?)); } } Ok(()) } pub fn stop_backup_handler() -> Result<(), Box> { let Some(handler) = UPLOAD_HANDLER.lock()?.take() else { return Ok(()); }; if handler.is_finished() { return Ok(()); } handler.abort(); Ok(()) } pub fn trigger_backup_file_upload() { TRIGGER_BACKUP_FILE_UPLOAD.notify_one(); } } pub fn start() -> Result, Box> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; Ok(async move { loop { let (tx, rx) = match backup_client.upload_logs(&user_identity).await { Ok(ws) => ws, Err(err) => { println!( "Backup handler error when estabilishing connection: '{err:?}'" ); tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; continue; } }; let mut tx = Box::pin(tx); let mut rx = Box::pin(rx); let logs_waiting_for_confirmation = Mutex::new(HashSet::::new()); loop { let err = tokio::select! { Err(err) = watch_and_upload_files(&backup_client, &user_identity, &mut tx, &logs_waiting_for_confirmation) => err, Err(err) = delete_confirmed_logs(&mut rx, &logs_waiting_for_confirmation) => err, }; println!("Backup handler error: '{err:?}'"); match err { BackupHandlerError::BackupError(_) | BackupHandlerError::WSClosed | BackupHandlerError::LockError => break, BackupHandlerError::IoError(_) | BackupHandlerError::CxxException(_) => continue, BackupHandlerError::FromUtf8Error(_) => break, } } tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; println!("Retrying backup log upload"); } }) } async fn watch_and_upload_files( backup_client: &BackupClient, user_identity: &UserIdentity, tx: &mut Pin>>, logs_waiting_for_confirmation: &Mutex>, ) -> Result { loop { let mut file_stream = match tokio::fs::read_dir(&*BACKUP_FOLDER_PATH).await { Ok(file_stream) => file_stream, Err(err) if err.kind() == ErrorKind::NotFound => { TRIGGER_BACKUP_FILE_UPLOAD.notified().await; continue; } Err(err) => return Err(err.into()), }; let mut compaction_ids = HashSet::new(); let mut logs = Vec::new(); while let Some(file) = file_stream.next_entry().await? { let path = file.path(); if let Ok(BackupFileInfo { backup_id, log_id, additional_data, }) = path.clone().try_into() { // Skip additional data files (attachments). They will be // handled when we iterate over the corresponding files with the // main content if additional_data.is_some() { continue; } match log_id { Some(id) => logs.push((path, backup_id, id)), None => { compaction_ids.insert(backup_id); } } } } for backup_id in compaction_ids { compaction::upload_files(backup_client, user_identity, backup_id).await?; } for (path, backup_id, log_id) in logs { if logs_waiting_for_confirmation.lock()?.contains(&path) { continue; } log::upload_files(tx, backup_id, log_id).await?; logs_waiting_for_confirmation.lock()?.insert(path.clone()); } TRIGGER_BACKUP_FILE_UPLOAD.notified().await; } } async fn delete_confirmed_logs( rx: &mut Pin< Box>>, >, logs_waiting_for_confirmation: &Mutex>, ) -> Result { while let Some(LogUploadConfirmation { backup_id, log_id }) = rx.next().await.transpose()? { let path = get_backup_log_file_path(&backup_id, &log_id.to_string(), false)?; logs_waiting_for_confirmation .lock()? .remove(&PathBuf::from(path)); tokio::spawn(log::cleanup_files(backup_id, log_id)); } Err(BackupHandlerError::WSClosed) } pub mod compaction { use super::*; pub async fn upload_files( backup_client: &BackupClient, user_identity: &UserIdentity, backup_id: String, ) -> Result<(), BackupHandlerError> { let user_data_path = get_backup_file_path(&backup_id, false)?; let user_data = match tokio::fs::read(&user_data_path).await { Ok(data) => Some(data), Err(err) if err.kind() == ErrorKind::NotFound => None, Err(err) => return Err(err.into()), }; let user_keys_path = get_backup_user_keys_file_path(&backup_id)?; let user_keys = match tokio::fs::read(&user_keys_path).await { Ok(data) => Some(data), Err(err) if err.kind() == ErrorKind::NotFound => None, Err(err) => return Err(err.into()), }; let attachments_path = get_backup_file_path(&backup_id, true)?; let attachments = match tokio::fs::read(&attachments_path).await { Ok(data) => data.lines().collect::>()?, Err(err) if err.kind() == ErrorKind::NotFound => Vec::new(), Err(err) => return Err(err.into()), }; let siwe_backup_msg_path = get_siwe_backup_message_path(&backup_id)?; let siwe_backup_msg = match tokio::fs::read(&siwe_backup_msg_path).await { Ok(data) => match String::from_utf8(data) { Ok(valid_string) => Some(valid_string), Err(err) => return Err(err.into()), }, Err(err) if err.kind() == ErrorKind::NotFound => None, Err(err) => return Err(err.into()), }; let backup_data = BackupData { backup_id: backup_id.clone(), user_data, user_keys, attachments, siwe_backup_msg, }; - backup_client + let result = backup_client .upload_backup(user_identity, backup_data) - .await?; + .await + .map_err(|e| e.to_string()); - compaction_upload_promises::resolve(&backup_id, Ok(())); + compaction_upload_promises::resolve(&backup_id, result); tokio::spawn(cleanup_files(backup_id)); Ok(()) } async fn remove_file_if_exists(path: &String) -> Result<(), Box> { match tokio::fs::remove_file(path).await { Ok(()) => Ok(()), Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), } } pub async fn cleanup_files(backup_id: String) { let backup_files_cleanup = async { let paths_to_remove = vec![ get_backup_file_path(&backup_id, false)?, get_backup_user_keys_file_path(&backup_id)?, get_backup_file_path(&backup_id, true)?, get_siwe_backup_message_path(&backup_id)?, ]; for path in paths_to_remove { if let Err(e) = remove_file_if_exists(&path).await { println!("Error occurred while removing a file: {:?}", e); } } Ok::<(), Box>(()) }; if let Err(err) = backup_files_cleanup.await { println!("Error when cleaning up the backup files: {:?}", err); } } } mod log { use backup_client::SinkExt; use super::*; pub async fn upload_files( tx: &mut Pin>>, backup_id: String, log_id: usize, ) -> Result<(), BackupHandlerError> { let log_id_string = log_id.to_string(); let content_path = get_backup_log_file_path(&backup_id, &log_id_string, false)?; let content = tokio::fs::read(&content_path).await?; let attachments_path = get_backup_log_file_path(&backup_id, &log_id_string, true)?; let attachments = match tokio::fs::read(&attachments_path).await { Ok(data) => Some(data.lines().collect::>()?), Err(err) if err.kind() == ErrorKind::NotFound => None, Err(err) => return Err(err.into()), }; let log_data = UploadLogRequest { backup_id, log_id, content, attachments, }; tx.send(log_data.clone()).await?; Ok(()) } pub async fn cleanup_files(backup_id: String, log_id: usize) { let backup_files_cleanup = async { let log_id = log_id.to_string(); let path = get_backup_log_file_path(&backup_id, &log_id, false)?; tokio::fs::remove_file(&path).await?; let attachments_path = get_backup_log_file_path(&backup_id, &log_id, true)?; match tokio::fs::remove_file(&attachments_path).await { Ok(()) => Result::<_, Box>::Ok(()), Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), } }; if let Err(err) = backup_files_cleanup.await { println!("{err:?}"); } } } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BackupHandlerError { BackupError(BackupError), WSClosed, IoError(std::io::Error), CxxException(cxx::Exception), LockError, FromUtf8Error(std::string::FromUtf8Error), } impl From> for BackupHandlerError { fn from(_: std::sync::PoisonError) -> Self { Self::LockError } } diff --git a/shared/backup_client/src/lib.rs b/shared/backup_client/src/lib.rs index 40d1c1697..296f7e731 100644 --- a/shared/backup_client/src/lib.rs +++ b/shared/backup_client/src/lib.rs @@ -1,400 +1,407 @@ #[cfg(target_arch = "wasm32")] mod web; use async_stream::{stream, try_stream}; pub use comm_lib::auth::UserIdentity; pub use comm_lib::backup::{ DownloadLogsRequest, LatestBackupInfoResponse, LogWSRequest, LogWSResponse, UploadLogRequest, }; pub use futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt}; use hex::ToHex; use reqwest::{ header::InvalidHeaderValue, multipart::{Form, Part}, - Body, + Body, StatusCode, }; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::time::Duration; use tokio_tungstenite_wasm::{ connect, Error as TungsteniteError, Message::Binary, }; const LOG_DOWNLOAD_RETRY_DELAY: Duration = Duration::from_secs(5); const LOG_DOWNLOAD_MAX_RETRY: usize = 3; #[cfg(target_arch = "wasm32")] use wasm_bindgen::prelude::wasm_bindgen; #[cfg_attr(target_arch = "wasm32", wasm_bindgen)] #[derive(Debug, Clone)] pub struct BackupClient { url: reqwest::Url, } impl BackupClient { pub fn new>(url: T) -> Result { Ok(BackupClient { url: url.try_into()?, }) } } /// Backup functions impl BackupClient { pub async fn upload_backup( &self, user_identity: &UserIdentity, backup_data: BackupData, ) -> Result<(), Error> { let BackupData { backup_id, user_keys, user_data, attachments, siwe_backup_msg, } = backup_data; let endpoint = match (user_data.clone(), user_keys.clone()) { (None, None) => return Err(Error::InvalidRequest), (Some(_), Some(_)) => "backups", (Some(_), None) => "backups/user_data", (None, Some(_)) => "backups/user_keys", }; let client = reqwest::Client::new(); let mut form = Form::new().text("backup_id", backup_id); if let Some(user_keys_value) = user_keys.clone() { form = form .text( "user_keys_hash", Sha256::digest(&user_keys_value).encode_hex::(), ) .part("user_keys", Part::stream(Body::from(user_keys_value))); } if let Some(user_data_value) = user_data.clone() { form = form .text( "user_data_hash", Sha256::digest(&user_data_value).encode_hex::(), ) .part("user_data", Part::stream(Body::from(user_data_value))) .text("attachments", attachments.join("\n")); } if let Some(siwe_backup_msg_value) = siwe_backup_msg { form = form.text("siwe_backup_msg", siwe_backup_msg_value); } let response = client .post(self.url.join(endpoint)?) .bearer_auth(user_identity.as_authorization_token()?) .multipart(form) .send() .await?; + if matches!( + response.status(), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN + ) { + return Err(Error::Unauthenticated); + } + response.error_for_status()?; Ok(()) } pub async fn download_backup_data( &self, backup_descriptor: &BackupDescriptor, requested_data: RequestedData, ) -> Result, Error> { let client = reqwest::Client::new(); let url = self.url.join("backups/")?; let url = match backup_descriptor { BackupDescriptor::BackupID { backup_id, .. } => { url.join(&format!("{backup_id}/"))? } BackupDescriptor::Latest { user_identifier } => { url.join(&format!("latest/{user_identifier}/"))? } }; let url = match &requested_data { RequestedData::BackupInfo => url.join("backup_info")?, RequestedData::UserKeys => url.join("user_keys")?, RequestedData::UserData => url.join("user_data")?, }; let mut request = client.get(url); if let BackupDescriptor::BackupID { user_identity, .. } = backup_descriptor { request = request.bearer_auth(user_identity.as_authorization_token()?) } let response = request.send().await?; let result = response.error_for_status()?.bytes().await?.to_vec(); Ok(result) } } /// Log functions impl BackupClient { pub async fn upload_logs( &self, user_identity: &UserIdentity, ) -> Result< ( impl Sink, impl Stream>, ), Error, > { let (tx, rx) = self.create_log_ws_connection(user_identity).await?; let rx = rx.map(|response| match response? { LogWSResponse::LogUploaded { backup_id, log_id } => { Ok(LogUploadConfirmation { backup_id, log_id }) } LogWSResponse::ServerError => Err(Error::ServerError), msg => Err(Error::InvalidBackupMessage(msg)), }); Ok((tx, rx)) } /// Handles complete log download. /// It will try and retry download a few times, but if the issues persist /// the next item returned will be the last received error and the stream /// will be closed. pub async fn download_logs<'this>( &'this self, user_identity: &'this UserIdentity, backup_id: &'this str, ) -> impl Stream> + 'this { stream! { let mut last_downloaded_log = None; let mut fail_count = 0; 'retry: loop { let stream = self.log_download_stream(user_identity, backup_id, &mut last_downloaded_log).await; let mut stream = Box::pin(stream); while let Some(item) = stream.next().await { match item { Ok(log) => yield Ok(log), Err(err) => { println!("Error when downloading logs: {err:?}"); fail_count += 1; if fail_count >= LOG_DOWNLOAD_MAX_RETRY { yield Err(err); break 'retry; } #[cfg(target_arch = "wasm32")] let _ = web::sleep(LOG_DOWNLOAD_RETRY_DELAY).await; #[cfg(not(target_arch = "wasm32"))] tokio::time::sleep(LOG_DOWNLOAD_RETRY_DELAY).await; continue 'retry; } } } // Everything downloaded return; } println!("Log download failed!"); } } /// Handles singular connection websocket connection. Returns error in case /// anything goes wrong e.g. missing log or connection error. async fn log_download_stream<'stream>( &'stream self, user_identity: &'stream UserIdentity, backup_id: &'stream str, last_downloaded_log: &'stream mut Option, ) -> impl Stream> + 'stream { try_stream! { let (mut tx, mut rx) = self.create_log_ws_connection(user_identity).await?; tx.send(DownloadLogsRequest { backup_id: backup_id.to_string(), from_id: *last_downloaded_log, }) .await?; while let Some(response) = rx.try_next().await? { let expected_log_id = last_downloaded_log.unwrap_or(0); match response { LogWSResponse::LogDownload { content, attachments, log_id, } if log_id == expected_log_id + 1 => { *last_downloaded_log = Some(log_id); yield DownloadedLog { content, attachments, }; } LogWSResponse::LogDownload { .. } => { Err(Error::LogMissing)?; } LogWSResponse::LogDownloadFinished { last_log_id: Some(log_id), } if log_id == expected_log_id => { tx.send(DownloadLogsRequest { backup_id: backup_id.to_string(), from_id: *last_downloaded_log, }) .await? } LogWSResponse::LogDownloadFinished { last_log_id: None } => return, LogWSResponse::LogDownloadFinished { .. } => { Err(Error::LogMissing)?; } msg => Err(Error::InvalidBackupMessage(msg))?, } } Err(Error::WSClosed)?; } } async fn create_log_ws_connection>( &self, user_identity: &UserIdentity, ) -> Result< ( impl Sink, impl Stream>, ), Error, > { let url = self.create_ws_url()?; let stream = connect(url).await?; let (mut tx, rx) = stream.split(); tx.send(Binary(bincode::serialize(&LogWSRequest::Authenticate( user_identity.clone(), ))?)) .await?; let tx = tx.with(|request: Request| async { let request: LogWSRequest = request.into(); let request = bincode::serialize(&request)?; Ok(Binary(request)) }); let rx = rx.filter_map(|msg| async { let bytes = match msg { Ok(Binary(bytes)) => bytes, Ok(_) => return Some(Err(Error::InvalidWSMessage)), Err(err) => return Some(Err(err.into())), }; match bincode::deserialize(&bytes) { Ok(response) => Some(Ok(response)), Err(err) => Some(Err(err.into())), } }); let tx = Box::pin(tx); let mut rx = Box::pin(rx); if let Some(response) = rx.try_next().await? { match response { LogWSResponse::AuthSuccess => {} LogWSResponse::Unauthenticated => Err(Error::Unauthenticated)?, msg => Err(Error::InvalidBackupMessage(msg))?, } } Ok((tx, rx)) } fn create_ws_url(&self) -> Result { let mut url = self.url.clone(); match url.scheme() { "http" => url.set_scheme("ws").map_err(|_| Error::UrlSchemaError)?, "https" => url.set_scheme("wss").map_err(|_| Error::UrlSchemaError)?, _ => (), }; let url = url.join("logs")?; Ok(url) } } #[derive(Debug, Clone)] pub struct BackupData { pub backup_id: String, pub user_keys: Option>, pub user_data: Option>, pub attachments: Vec, pub siwe_backup_msg: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum BackupDescriptor { BackupID { #[serde(rename = "backupID")] backup_id: String, #[serde(rename = "userIdentity")] user_identity: UserIdentity, }, Latest { user_identifier: String, }, } #[cfg_attr(target_arch = "wasm32", wasm_bindgen)] #[derive(Debug, Clone)] pub enum RequestedData { BackupInfo, UserKeys, UserData, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogUploadConfirmation { pub backup_id: String, pub log_id: usize, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct DownloadedLog { pub content: Vec, pub attachments: Option>, } #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { InvalidAuthorizationHeader, UrlSchemaError, UrlError(url::ParseError), ReqwestError(reqwest::Error), TungsteniteError(TungsteniteError), JsonError(serde_json::Error), BincodeError(bincode::Error), InvalidWSMessage, #[display(fmt = "Error::InvalidBackupMessage({:?})", _0)] InvalidBackupMessage(LogWSResponse), ServerError, LogMissing, WSClosed, Unauthenticated, InvalidRequest, } impl std::error::Error for Error {} impl From for Error { fn from(_: InvalidHeaderValue) -> Self { Self::InvalidAuthorizationHeader } }