diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs --- a/native/native_rust_library/src/backup.rs +++ b/native/native_rust_library/src/backup.rs @@ -3,7 +3,7 @@ aes, secure_store, BACKUP_SERVICE_CONNECTION_RETRY_DELAY, }; use crate::ffi::{ - get_backup_directory_path, get_backup_file_path, + get_backup_directory_path, get_backup_file_path, get_backup_log_file_path, get_backup_user_keys_file_path, secure_store_get, }; use crate::BACKUP_SOCKET_ADDR; @@ -12,13 +12,14 @@ use backup_client::{ BackupClient, BackupData, BackupDescriptor, DownloadLogsRequest, Error as BackupError, LatestBackupIDResponse, LogUploadConfirmation, - LogWSResponse, RequestedData, SinkExt, Stream, StreamExt, UploadLogRequest, - UserIdentity, WSError, + LogWSResponse, RequestedData, Sink, SinkExt, Stream, StreamExt, + UploadLogRequest, UserIdentity, WSError, }; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::collections::HashSet; use std::convert::Infallible; use std::error::Error; use std::future::Future; @@ -120,20 +121,23 @@ } }; - let mut _tx = Box::pin(tx); + let mut tx = Box::pin(tx); let mut rx = Box::pin(rx); + let logs_waiting_for_confirmation = Mutex::new(HashSet::::new()); + loop { let err = tokio::select! { - Err(err) = backup_data_sender(&backup_client, &user_identity) => err, - Err(err) = backup_confirmation_receiver(&mut rx) => err, + Err(err) = backup_data_sender(&backup_client, &user_identity, &mut tx, &logs_waiting_for_confirmation) => err, + Err(err) = backup_confirmation_receiver(&mut rx, &logs_waiting_for_confirmation) => err, }; println!("Backup handler error: '{err:?}'"); match err { BackupHandlerError::BackupError(_) | BackupHandlerError::BackupWSError(_) - | BackupHandlerError::WSClosed => break, + | BackupHandlerError::WSClosed + | BackupHandlerError::LockError => break, BackupHandlerError::IoError(_) | BackupHandlerError::CxxException(_) => continue, } @@ -148,6 +152,8 @@ async fn backup_data_sender( backup_client: &BackupClient, user_identity: &UserIdentity, + tx: &mut Pin>>, + logs_waiting_for_confirmation: &Mutex>, ) -> Result { loop { let mut file_stream = match tokio::fs::read_dir(&*BACKUP_FOLDER_PATH).await @@ -163,11 +169,15 @@ while let Some(file) = file_stream.next_entry().await? { let path = file.path(); + if logs_waiting_for_confirmation.lock()?.contains(&path) { + continue; + } + let Ok(BackupDataFileInfo { backup_id, log_id, additional_data, - }) = path.try_into() + }) = path.clone().try_into() else { continue; }; @@ -179,7 +189,9 @@ continue; } - if let Some(_) = log_id { + if let Some(log_id) = log_id { + upload_backup_log_file(tx, backup_id, log_id).await?; + logs_waiting_for_confirmation.lock()?.insert(path); } else { upload_backup_compaction_file(backup_client, user_identity, backup_id) .await?; @@ -192,8 +204,27 @@ async fn backup_confirmation_receiver( rx: &mut Pin>>>, + logs_waiting_for_confirmation: &Mutex>, ) -> Result { - while let Some(_) = rx.next().await.transpose()? {} + while let Some(LogUploadConfirmation { backup_id, log_id }) = + rx.next().await.transpose()? + { + let log_id = log_id.to_string(); + + let path = get_backup_log_file_path(&backup_id, &log_id, false)?; + tokio::fs::remove_file(&path).await?; + + let attachments_path = get_backup_log_file_path(&backup_id, &log_id, true)?; + match tokio::fs::remove_file(&attachments_path).await { + Ok(()) => (), + Err(err) if err.kind() == ErrorKind::NotFound => (), + Err(err) => return Err(err.into()), + } + + logs_waiting_for_confirmation + .lock()? + .remove(&PathBuf::from(path)); + } Err(BackupHandlerError::WSClosed) } @@ -295,6 +326,35 @@ } } +async fn upload_backup_log_file( + tx: &mut Pin>>, + backup_id: String, + log_id: usize, +) -> Result<(), BackupHandlerError> { + let log_id_string = log_id.to_string(); + let content_path = + get_backup_log_file_path(&backup_id, &log_id_string, false)?; + let content = tokio::fs::read(&content_path).await?; + + let attachments_path = + get_backup_log_file_path(&backup_id, &log_id_string, true)?; + let attachments = match tokio::fs::read(&attachments_path).await { + Ok(data) => Some(data.lines().collect::>()?), + Err(err) if err.kind() == ErrorKind::NotFound => None, + Err(err) => return Err(err.into()), + }; + + let log_data = UploadLogRequest { + backup_id, + log_id, + content, + attachments, + }; + tx.send(log_data.clone()).await?; + + Ok(()) +} + #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] @@ -304,6 +364,13 @@ WSClosed, IoError(std::io::Error), CxxException(cxx::Exception), + LockError, +} + +impl From> for BackupHandlerError { + fn from(_: std::sync::PoisonError) -> Self { + Self::LockError + } } pub async fn create_backup(