diff --git a/native/native_rust_library/Cargo.lock b/native/native_rust_library/Cargo.lock --- a/native/native_rust_library/Cargo.lock +++ b/native/native_rust_library/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -269,9 +269,9 @@ [[package]] name = "bytes" -version = "1.2.1" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" @@ -1259,6 +1259,7 @@ "serde_json", "siwe", "tokio", + "tokio-util", "tonic", "tracing", ] @@ -2241,16 +2242,15 @@ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] diff --git a/native/native_rust_library/Cargo.toml b/native/native_rust_library/Cargo.toml --- a/native/native_rust_library/Cargo.toml +++ b/native/native_rust_library/Cargo.toml @@ -7,6 +7,7 @@ [dependencies] cxx = "1.0" tokio = { version = "1.24", features = ["macros", "rt-multi-thread"] } +tokio-util = "0.7.15" tonic = "0.9.1" lazy_static = "1.4" tracing = "0.1" diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs --- a/native/native_rust_library/src/backup/upload_handler.rs +++ b/native/native_rust_library/src/backup/upload_handler.rs @@ -22,61 +22,158 @@ use std::io::ErrorKind; use std::path::PathBuf; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, LazyLock, Mutex}; use tokio::sync::Notify; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; lazy_static! { - pub static ref UPLOAD_HANDLER: Arc>>> = - Arc::new(Mutex::new(None)); static ref TRIGGER_BACKUP_FILE_UPLOAD: Arc = Arc::new(Notify::new()); static ref BACKUP_FOLDER_PATH: PathBuf = PathBuf::from( get_backup_directory_path().expect("Getting backup directory path failed") ); } +static BACKUP_HANDLER: LazyLock = + LazyLock::new(BackupHandler::new); pub mod ffi { use super::*; pub fn start_backup_handler() -> Result<(), Box> { - let mut handle = UPLOAD_HANDLER.lock()?; + BACKUP_HANDLER.start_if_not_running(super::start_handler_routine)?; + Ok(()) + } + + pub fn stop_backup_handler() -> Result<(), Box> { + BACKUP_HANDLER.stop()?; + Ok(()) + } + + pub fn trigger_backup_file_upload() { + TRIGGER_BACKUP_FILE_UPLOAD.notify_one(); + } +} + +type TaskResult<'err, T> = Result>; + +struct BackupHandlerTask { + handle: JoinHandle<()>, + cancel_token: tokio_util::sync::CancellationToken, + task_id: u32, +} + +struct BackupHandler { + task: Arc>>, +} + +impl BackupHandlerTask { + /// True if task hasn't finished and isn't pending cancelation + fn is_active(&self) -> bool { + !self.handle.is_finished() && !self.cancel_token.is_cancelled() + } + + /// Requests this task routine to gracefully stop + fn request_stop(&self) { + self.cancel_token.cancel(); + } - if let Some(handle) = &*handle { - if !handle.is_finished() { - return Ok(()); // Early exit if a running future is detected + /// Creates a backup handler task and asynchronously starts it + fn start_new<'task, F>( + prev_task: Option, + routine: impl FnOnce(u32, CancellationToken) -> TaskResult<'task, F>, + ) -> TaskResult<'task, Self> + where + F: Future + Send + 'static, + { + let cancel_token = CancellationToken::new(); + let task_id = prev_task.as_ref().map_or(1, |prev| prev.task_id + 1); + + let routine_future = routine(task_id, cancel_token.clone())?; + let handle = RUNTIME.spawn(async move { + // previous task might still be running, wait for it to complete + if let Some(prev_task) = prev_task { + if let Err(cancel_reason) = prev_task.handle.await { + println!( + "Backup handler task {} has just been unexpectedly canceled: {:?}", + prev_task.task_id, cancel_reason + ); + } } + // start the new task + routine_future.await + }); + + Ok(BackupHandlerTask { + handle, + task_id, + cancel_token, + }) + } +} + +impl<'task> BackupHandler { + pub fn new() -> Self { + Self { + task: Arc::new(Mutex::new(None)), } + } - // No running future or the existing one is finished - *handle = Some(RUNTIME.spawn(super::start()?)); + pub fn start_if_not_running( + &'task self, + routine: impl FnOnce(u32, CancellationToken) -> TaskResult<'task, F>, + ) -> TaskResult<'task, ()> + where + F: Future + Send + 'static, + { + let mut task = self.task.lock()?; + + let prev_task = if let Some(prev_task) = &*task { + // There might be situation that task is still running + // but is pending cancelation + let is_canceled = prev_task.cancel_token.is_cancelled(); + if !prev_task.handle.is_finished() && !is_canceled { + return Ok(()); + } + + task.take() + } else { + None + }; + + let new_task = BackupHandlerTask::start_new(prev_task, routine)?; + *task = Some(new_task); Ok(()) } - pub fn stop_backup_handler() -> Result<(), Box> { - let Some(handler) = UPLOAD_HANDLER.lock()?.take() else { + fn stop(&'task self) -> TaskResult<'task, ()> { + let Some(task) = &*self.task.lock()? else { return Ok(()); }; - if handler.is_finished() { + if !task.is_active() { return Ok(()); } - handler.abort(); + task.request_stop(); Ok(()) } - - pub fn trigger_backup_file_upload() { - TRIGGER_BACKUP_FILE_UPLOAD.notify_one(); - } } -pub fn start() -> Result, Box> { +pub fn start_handler_routine( + task_id: u32, + cancel_token: CancellationToken, +) -> Result, Box> { let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?; let user_identity = get_user_identity_from_secure_store()?; Ok(async move { - loop { - let (tx, rx) = match backup_client.upload_logs(&user_identity).await { + println!("Backup handler task id={task_id} started."); + 'task_loop: loop { + let logs_upload_stream = tokio::select!( + result = backup_client.upload_logs(&user_identity) => result, + _ = cancel_token.cancelled() => { break 'task_loop; } + ); + let (tx, rx) = match logs_upload_stream { Ok(ws) => ws, Err(err) => { println!( @@ -96,6 +193,7 @@ let err = tokio::select! { Err(err) = watch_and_upload_files(&backup_client, &user_identity, &mut tx, &logs_waiting_for_confirmation) => err, Err(err) = delete_confirmed_logs(&mut rx, &logs_waiting_for_confirmation) => err, + _ = cancel_token.cancelled() => { break 'task_loop; } }; println!("Backup handler error: '{err:?}'"); @@ -109,9 +207,13 @@ } } - tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await; + tokio::select!( + _ = tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY) => (), + _ = cancel_token.cancelled() => { break 'task_loop; } + ); println!("Retrying backup log upload"); } + println!("Backup handler task id={task_id} stopped."); }) }