diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs index 82ad07f7d..60bf1e24b 100644 --- a/native/native_rust_library/src/backup/upload_handler.rs +++ b/native/native_rust_library/src/backup/upload_handler.rs @@ -1,319 +1,318 @@ use super::file_info::BackupFileInfo; use super::get_user_identity_from_secure_store; use crate::backup::compaction_upload_promises; use crate::constants::BACKUP_SERVICE_CONNECTION_RETRY_DELAY; use crate::ffi::{ get_backup_directory_path, get_backup_file_path, get_backup_log_file_path, get_backup_user_keys_file_path, }; use crate::BACKUP_SOCKET_ADDR; use crate::RUNTIME; use backup_client::UserIdentity; use backup_client::{ BackupClient, Error as BackupError, LogUploadConfirmation, Stream, StreamExt, - WSError, }; use backup_client::{BackupData, Sink, UploadLogRequest}; use lazy_static::lazy_static; use std::collections::HashSet; use std::convert::Infallible; use std::error::Error; use std::future::Future; use std::io::BufRead; use std::io::ErrorKind; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::sync::Notify; use tokio::task::JoinHandle; lazy_static! { pub static ref UPLOAD_HANDLER: Arc>>> = Arc::new(Mutex::new(None)); static ref TRIGGER_BACKUP_FILE_UPLOAD: Arc = Arc::new(Notify::new()); static ref BACKUP_FOLDER_PATH: PathBuf = PathBuf::from( get_backup_directory_path().expect("Getting backup directory path failed") ); } pub mod ffi { use super::*; pub fn start_backup_handler() -> Result<(), Box> { let mut handle = UPLOAD_HANDLER.lock()?; match handle.take() { // Don't start backup handler if it's already running Some(handle) if !handle.is_finished() => (), _ => { *handle = Some(RUNTIME.spawn(super::start()?)); } } Ok(()) } pub fn stop_backup_handler() -> Result<(), Box> { let Some(handler) = UPLOAD_HANDLER.lock()?.take() else { return Ok(()); }; if handler.is_finished() { return Ok(()); } handler.abort(); Ok(()) } pub fn trigger_backup_file_upload() { TRIGGER_BACKUP_FILE_UPLOAD.notify_one(); } } pub fn start() -> Result, Box> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; Ok(async move { loop { let (tx, rx) = match backup_client.upload_logs(&user_identity).await { Ok(ws) => ws, Err(err) => { println!( "Backup handler error when estabilishing connection: '{err:?}'" ); tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; continue; } }; let mut tx = Box::pin(tx); let mut rx = Box::pin(rx); let logs_waiting_for_confirmation = Mutex::new(HashSet::::new()); loop { let err = tokio::select! { Err(err) = watch_and_upload_files(&backup_client, &user_identity, &mut tx, &logs_waiting_for_confirmation) => err, Err(err) = delete_confirmed_logs(&mut rx, &logs_waiting_for_confirmation) => err, }; println!("Backup handler error: '{err:?}'"); match err { BackupHandlerError::BackupError(_) - | BackupHandlerError::BackupWSError(_) | BackupHandlerError::WSClosed | BackupHandlerError::LockError => break, BackupHandlerError::IoError(_) | BackupHandlerError::CxxException(_) => continue, } } tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; println!("Retrying backup log upload"); } }) } async fn watch_and_upload_files( backup_client: &BackupClient, user_identity: &UserIdentity, - tx: &mut Pin>>, + tx: &mut Pin>>, logs_waiting_for_confirmation: &Mutex>, ) -> Result { loop { let mut file_stream = match tokio::fs::read_dir(&*BACKUP_FOLDER_PATH).await { Ok(file_stream) => file_stream, Err(err) if err.kind() == ErrorKind::NotFound => { TRIGGER_BACKUP_FILE_UPLOAD.notified().await; continue; } Err(err) => return Err(err.into()), }; while let Some(file) = file_stream.next_entry().await? { let path = file.path(); if logs_waiting_for_confirmation.lock()?.contains(&path) { continue; } let Ok(BackupFileInfo { backup_id, log_id, additional_data, }) = path.clone().try_into() else { continue; }; // Skip additional data files (attachments, user keys). They will be // handled when we iterate over the corresponding files with the // main content if additional_data.is_some() { continue; } if let Some(log_id) = log_id { log::upload_files(tx, backup_id, log_id).await?; logs_waiting_for_confirmation.lock()?.insert(path); } else { compaction::upload_files(backup_client, user_identity, backup_id) .await?; } } TRIGGER_BACKUP_FILE_UPLOAD.notified().await; } } async fn delete_confirmed_logs( - rx: &mut Pin>>>, + rx: &mut Pin< + Box>>, + >, logs_waiting_for_confirmation: &Mutex>, ) -> Result { while let Some(LogUploadConfirmation { backup_id, log_id }) = rx.next().await.transpose()? { let path = get_backup_log_file_path(&backup_id, &log_id.to_string(), false)?; logs_waiting_for_confirmation .lock()? .remove(&PathBuf::from(path)); tokio::spawn(log::cleanup_files(backup_id, log_id)); } Err(BackupHandlerError::WSClosed) } pub mod compaction { use super::*; pub async fn upload_files( backup_client: &BackupClient, user_identity: &UserIdentity, backup_id: String, ) -> Result<(), BackupHandlerError> { let user_data_path = get_backup_file_path(&backup_id, false)?; let user_data = tokio::fs::read(&user_data_path).await?; let user_keys_path = get_backup_user_keys_file_path(&backup_id)?; let user_keys = tokio::fs::read(&user_keys_path).await?; let attachments_path = get_backup_file_path(&backup_id, true)?; let attachments = match tokio::fs::read(&attachments_path).await { Ok(data) => data.lines().collect::>()?, Err(err) if err.kind() == ErrorKind::NotFound => Vec::new(), Err(err) => return Err(err.into()), }; let backup_data = BackupData { backup_id: backup_id.clone(), user_data, user_keys, attachments, }; backup_client .upload_backup(user_identity, backup_data) .await?; compaction_upload_promises::resolve(&backup_id, Ok(())); tokio::spawn(cleanup_files(backup_id)); Ok(()) } pub async fn cleanup_files(backup_id: String) { let backup_files_cleanup = async { let user_data_path = get_backup_file_path(&backup_id, false)?; tokio::fs::remove_file(&user_data_path).await?; let user_keys_path = get_backup_user_keys_file_path(&backup_id)?; tokio::fs::remove_file(&user_keys_path).await?; let attachments_path = get_backup_file_path(&backup_id, true)?; match tokio::fs::remove_file(&attachments_path).await { Ok(()) => Result::<_, Box>::Ok(()), Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), } }; if let Err(err) = backup_files_cleanup.await { println!("Error when cleaning up the backup files: {err:?}"); } } } mod log { use backup_client::SinkExt; use super::*; pub async fn upload_files( - tx: &mut Pin>>, + tx: &mut Pin>>, backup_id: String, log_id: usize, ) -> Result<(), BackupHandlerError> { let log_id_string = log_id.to_string(); let content_path = get_backup_log_file_path(&backup_id, &log_id_string, false)?; let content = tokio::fs::read(&content_path).await?; let attachments_path = get_backup_log_file_path(&backup_id, &log_id_string, true)?; let attachments = match tokio::fs::read(&attachments_path).await { Ok(data) => Some(data.lines().collect::>()?), Err(err) if err.kind() == ErrorKind::NotFound => None, Err(err) => return Err(err.into()), }; let log_data = UploadLogRequest { backup_id, log_id, content, attachments, }; tx.send(log_data.clone()).await?; Ok(()) } pub async fn cleanup_files(backup_id: String, log_id: usize) { let backup_files_cleanup = async { let log_id = log_id.to_string(); let path = get_backup_log_file_path(&backup_id, &log_id, false)?; tokio::fs::remove_file(&path).await?; let attachments_path = get_backup_log_file_path(&backup_id, &log_id, true)?; match tokio::fs::remove_file(&attachments_path).await { Ok(()) => Result::<_, Box>::Ok(()), Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), } }; if let Err(err) = backup_files_cleanup.await { println!("{err:?}"); } } } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BackupHandlerError { BackupError(BackupError), - BackupWSError(WSError), WSClosed, IoError(std::io::Error), CxxException(cxx::Exception), LockError, } impl From> for BackupHandlerError { fn from(_: std::sync::PoisonError) -> Self { Self::LockError } } diff --git a/shared/backup_client/src/lib.rs b/shared/backup_client/src/lib.rs index 8f46204b8..026400f7b 100644 --- a/shared/backup_client/src/lib.rs +++ b/shared/backup_client/src/lib.rs @@ -1,309 +1,303 @@ pub use comm_lib::auth::UserIdentity; pub use comm_lib::backup::{ DownloadLogsRequest, LatestBackupIDResponse, LogWSRequest, LogWSResponse, UploadLogRequest, }; pub use futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt}; use hex::ToHex; use reqwest::{ header::InvalidHeaderValue, multipart::{Form, Part}, Body, }; use sha2::{Digest, Sha256}; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{ connect_async, tungstenite::{ client::IntoClientRequest, http::{header, Request}, Error as TungsteniteError, Message::{Binary, Ping}, }, }; #[derive(Debug, Clone)] pub struct BackupClient { url: reqwest::Url, } impl BackupClient { pub fn new>(url: T) -> Result { Ok(BackupClient { url: url.try_into()?, }) } } /// Backup functions impl BackupClient { pub async fn upload_backup( &self, user_identity: &UserIdentity, backup_data: BackupData, ) -> Result<(), Error> { let BackupData { backup_id, user_keys, user_data, attachments, } = backup_data; let client = reqwest::Client::new(); let form = Form::new() .text("backup_id", backup_id) .text( "user_keys_hash", Sha256::digest(&user_keys).encode_hex::(), ) .part("user_keys", Part::stream(Body::from(user_keys))) .text( "user_data_hash", Sha256::digest(&user_data).encode_hex::(), ) .part("user_data", Part::stream(Body::from(user_data))) .text("attachments", attachments.join("\n")); let response = client .post(self.url.join("backups")?) .bearer_auth(user_identity.as_authorization_token()?) .multipart(form) .send() .await?; response.error_for_status()?; Ok(()) } pub async fn download_backup_data( &self, backup_descriptor: &BackupDescriptor, requested_data: RequestedData, ) -> Result, Error> { let client = reqwest::Client::new(); let url = self.url.join("backups/")?; let url = match backup_descriptor { BackupDescriptor::BackupID { backup_id, .. } => { url.join(&format!("{backup_id}/"))? } BackupDescriptor::Latest { username } => { url.join(&format!("latest/{username}/"))? } }; let url = match &requested_data { RequestedData::BackupID => url.join("backup_id")?, RequestedData::UserKeys => url.join("user_keys")?, RequestedData::UserData => url.join("user_data")?, }; let mut request = client.get(url); if let BackupDescriptor::BackupID { user_identity, .. } = backup_descriptor { request = request.bearer_auth(user_identity.as_authorization_token()?) } let response = request.send().await?; let result = response.error_for_status()?.bytes().await?.to_vec(); Ok(result) } } /// Log functions impl BackupClient { pub async fn upload_logs( &self, user_identity: &UserIdentity, ) -> Result< ( - impl Sink, - impl Stream>, + impl Sink, + impl Stream>, ), Error, > { let request = self.create_ws_request(user_identity)?; let (stream, response) = connect_async(request).await?; if response.status().is_client_error() { - return Err(Error::WSInitError(TungsteniteError::Http(response))); + return Err(Error::TungsteniteError(TungsteniteError::Http(response))); } let (tx, rx) = stream.split(); let tx = tx.with(|request: UploadLogRequest| async { let request = LogWSRequest::UploadLog(request); let request = bincode::serialize(&request)?; Ok(Binary(request)) }); let rx = rx.filter_map(|msg| async { let response = match get_log_ws_response(msg) { Some(Ok(response)) => response, Some(Err(err)) => return Some(Err(err)), None => return None, }; match response { LogWSResponse::LogUploaded { backup_id, log_id } => { Some(Ok(LogUploadConfirmation { backup_id, log_id })) } LogWSResponse::LogDownload { .. } | LogWSResponse::LogDownloadFinished { .. } => { - Some(Err(WSError::InvalidBackupMessage)) + Some(Err(Error::InvalidBackupMessage)) } - LogWSResponse::ServerError => Some(Err(WSError::ServerError)), + LogWSResponse::ServerError => Some(Err(Error::ServerError)), } }); Ok((tx, rx)) } pub async fn download_logs( &self, user_identity: &UserIdentity, ) -> Result< ( - impl Sink, - impl Stream>, + impl Sink, + impl Stream>, ), Error, > { let request = self.create_ws_request(user_identity)?; let (stream, response) = connect_async(request).await?; if response.status().is_client_error() { - return Err(Error::WSInitError(TungsteniteError::Http(response))); + return Err(Error::TungsteniteError(TungsteniteError::Http(response))); } let (tx, rx) = stream.split(); let tx = tx.with(|request: DownloadLogsRequest| async { let request = LogWSRequest::DownloadLogs(request); let request = bincode::serialize(&request)?; Ok(Binary(request)) }); let rx = rx.filter_map(|msg| async { let response = match get_log_ws_response(msg) { Some(Ok(response)) => response, Some(Err(err)) => return Some(Err(err)), None => return None, }; match response { LogWSResponse::LogDownloadFinished { .. } | LogWSResponse::LogDownload { .. } => Some(Ok(response)), LogWSResponse::LogUploaded { .. } => { - Some(Err(WSError::InvalidBackupMessage)) + Some(Err(Error::InvalidBackupMessage)) } - LogWSResponse::ServerError => Some(Err(WSError::ServerError)), + LogWSResponse::ServerError => Some(Err(Error::ServerError)), } }); Ok((tx, rx)) } fn create_ws_request( &self, user_identity: &UserIdentity, ) -> Result, Error> { let mut url = self.url.clone(); match url.scheme() { - "http" => url.set_scheme("ws")?, - "https" => url.set_scheme("wss")?, + "http" => url.set_scheme("ws").map_err(|_| Error::UrlSchemaError)?, + "https" => url.set_scheme("wss").map_err(|_| Error::UrlSchemaError)?, _ => (), }; let url = url.join("logs")?; let mut request = url.into_client_request().unwrap(); let token = user_identity.as_authorization_token()?; request .headers_mut() .insert(header::AUTHORIZATION, format!("Bearer {token}").parse()?); Ok(request) } } #[derive(Debug, Clone)] pub struct BackupData { pub backup_id: String, pub user_keys: Vec, pub user_data: Vec, pub attachments: Vec, } #[derive(Debug, Clone)] pub enum BackupDescriptor { BackupID { backup_id: String, user_identity: UserIdentity, }, Latest { username: String, }, } #[derive(Debug, Clone, Copy)] pub enum RequestedData { BackupID, UserKeys, UserData, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogUploadConfirmation { pub backup_id: String, pub log_id: usize, } #[derive( Debug, derive_more::Display, derive_more::Error, derive_more::From, )] pub enum Error { InvalidAuthorizationHeader, + UrlSchemaError, UrlError(url::ParseError), ReqwestError(reqwest::Error), - WSInitError(TungsteniteError), + TungsteniteError(TungsteniteError), JsonError(serde_json::Error), + BincodeError(bincode::Error), + InvalidWSMessage, + InvalidBackupMessage, + ServerError, } impl From for Error { fn from(_: InvalidHeaderValue) -> Self { Self::InvalidAuthorizationHeader } } -#[derive( - Debug, derive_more::Display, derive_more::Error, derive_more::From, -)] -pub enum WSError { - BincodeError(bincode::Error), - TungsteniteError(TungsteniteError), - InvalidWSMessage, - InvalidBackupMessage, - ServerError, -} - fn get_log_ws_response( msg: Result, -) -> Option> { +) -> Option> { let bytes = match msg { Ok(Binary(bytes)) => bytes, // Handled by tungstenite Ok(Ping(_)) => return None, - Ok(_) => return Some(Err(WSError::InvalidWSMessage)), + Ok(_) => return Some(Err(Error::InvalidWSMessage)), Err(err) => return Some(Err(err.into())), }; match bincode::deserialize(&bytes) { Ok(response) => Some(Ok(response)), Err(err) => Some(Err(err.into())), } }