Page MenuHomePhorge

D14786.1765323206.diff
No OneTemporary

Size
7 KB
Referenced Files
None
Subscribers
None

D14786.1765323206.diff

diff --git a/native/native_rust_library/Cargo.lock b/native/native_rust_library/Cargo.lock
--- a/native/native_rust_library/Cargo.lock
+++ b/native/native_rust_library/Cargo.lock
@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-version = 3
+version = 4
[[package]]
name = "addr2line"
@@ -269,9 +269,9 @@
[[package]]
name = "bytes"
-version = "1.2.1"
+version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
+checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "cc"
@@ -1259,6 +1259,7 @@
"serde_json",
"siwe",
"tokio",
+ "tokio-util",
"tonic",
"tracing",
]
@@ -2241,16 +2242,15 @@
[[package]]
name = "tokio-util"
-version = "0.7.4"
+version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
+checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
- "tracing",
]
[[package]]
diff --git a/native/native_rust_library/Cargo.toml b/native/native_rust_library/Cargo.toml
--- a/native/native_rust_library/Cargo.toml
+++ b/native/native_rust_library/Cargo.toml
@@ -7,6 +7,7 @@
[dependencies]
cxx = "1.0"
tokio = { version = "1.24", features = ["macros", "rt-multi-thread"] }
+tokio-util = "0.7.15"
tonic = "0.9.1"
lazy_static = "1.4"
tracing = "0.1"
diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs
--- a/native/native_rust_library/src/backup/upload_handler.rs
+++ b/native/native_rust_library/src/backup/upload_handler.rs
@@ -22,61 +22,158 @@
use std::io::ErrorKind;
use std::path::PathBuf;
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, LazyLock, Mutex};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
+use tokio_util::sync::CancellationToken;
lazy_static! {
- pub static ref UPLOAD_HANDLER: Arc<Mutex<Option<JoinHandle<Infallible>>>> =
- Arc::new(Mutex::new(None));
static ref TRIGGER_BACKUP_FILE_UPLOAD: Arc<Notify> = Arc::new(Notify::new());
static ref BACKUP_FOLDER_PATH: PathBuf = PathBuf::from(
get_backup_directory_path().expect("Getting backup directory path failed")
);
}
+static BACKUP_HANDLER: LazyLock<BackupHandler> =
+ LazyLock::new(BackupHandler::new);
pub mod ffi {
use super::*;
pub fn start_backup_handler() -> Result<(), Box<dyn Error>> {
- let mut handle = UPLOAD_HANDLER.lock()?;
+ BACKUP_HANDLER.start_if_not_running(super::start_handler_routine)?;
+ Ok(())
+ }
+
+ pub fn stop_backup_handler() -> Result<(), Box<dyn Error>> {
+ BACKUP_HANDLER.stop()?;
+ Ok(())
+ }
+
+ pub fn trigger_backup_file_upload() {
+ TRIGGER_BACKUP_FILE_UPLOAD.notify_one();
+ }
+}
+
+type TaskResult<'err, T> = Result<T, Box<dyn Error + 'err>>;
+
+struct BackupHandlerTask {
+ handle: JoinHandle<()>,
+ cancel_token: tokio_util::sync::CancellationToken,
+ task_id: u32,
+}
+
+struct BackupHandler {
+ task: Arc<Mutex<Option<BackupHandlerTask>>>,
+}
+
+impl BackupHandlerTask {
+ /// True if task hasn't finished and isn't pending cancelation
+ fn is_active(&self) -> bool {
+ !self.handle.is_finished() && !self.cancel_token.is_cancelled()
+ }
+
+ /// Requests this task routine to gracefully stop
+ fn request_stop(&self) {
+ self.cancel_token.cancel();
+ }
- if let Some(handle) = &*handle {
- if !handle.is_finished() {
- return Ok(()); // Early exit if a running future is detected
+ /// Creates a backup handler task and asynchronously starts it
+ fn start_new<'task, F>(
+ prev_task: Option<BackupHandlerTask>,
+ routine: impl FnOnce(u32, CancellationToken) -> TaskResult<'task, F>,
+ ) -> TaskResult<'task, Self>
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let cancel_token = CancellationToken::new();
+ let task_id = prev_task.as_ref().map_or(1, |prev| prev.task_id + 1);
+
+ let routine_future = routine(task_id, cancel_token.clone())?;
+ let handle = RUNTIME.spawn(async move {
+ // previous task might still be running, wait for it to complete
+ if let Some(prev_task) = prev_task {
+ if let Err(cancel_reason) = prev_task.handle.await {
+ println!(
+ "Backup handler task {} has just been unexpectedly canceled: {:?}",
+ prev_task.task_id, cancel_reason
+ );
+ }
}
+ // start the new task
+ routine_future.await
+ });
+
+ Ok(BackupHandlerTask {
+ handle,
+ task_id,
+ cancel_token,
+ })
+ }
+}
+
+impl<'task> BackupHandler {
+ pub fn new() -> Self {
+ Self {
+ task: Arc::new(Mutex::new(None)),
}
+ }
- // No running future or the existing one is finished
- *handle = Some(RUNTIME.spawn(super::start()?));
+ pub fn start_if_not_running<F>(
+ &'task self,
+ routine: impl FnOnce(u32, CancellationToken) -> TaskResult<'task, F>,
+ ) -> TaskResult<'task, ()>
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let mut task = self.task.lock()?;
+
+ let prev_task = if let Some(prev_task) = &*task {
+ // There might be situation that task is still running
+ // but is pending cancelation
+ let is_canceled = prev_task.cancel_token.is_cancelled();
+ if !prev_task.handle.is_finished() && !is_canceled {
+ return Ok(());
+ }
+
+ task.take()
+ } else {
+ None
+ };
+
+ let new_task = BackupHandlerTask::start_new(prev_task, routine)?;
+ *task = Some(new_task);
Ok(())
}
- pub fn stop_backup_handler() -> Result<(), Box<dyn Error>> {
- let Some(handler) = UPLOAD_HANDLER.lock()?.take() else {
+ fn stop(&'task self) -> TaskResult<'task, ()> {
+ let Some(task) = &*self.task.lock()? else {
return Ok(());
};
- if handler.is_finished() {
+ if !task.is_active() {
return Ok(());
}
- handler.abort();
+ task.request_stop();
Ok(())
}
-
- pub fn trigger_backup_file_upload() {
- TRIGGER_BACKUP_FILE_UPLOAD.notify_one();
- }
}
-pub fn start() -> Result<impl Future<Output = Infallible>, Box<dyn Error>> {
+pub fn start_handler_routine(
+ task_id: u32,
+ cancel_token: CancellationToken,
+) -> Result<impl Future<Output = ()>, Box<dyn Error>> {
let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?;
let user_identity = get_user_identity_from_secure_store()?;
Ok(async move {
- loop {
- let (tx, rx) = match backup_client.upload_logs(&user_identity).await {
+ println!("Backup handler task id={task_id} started.");
+ 'task_loop: loop {
+ let logs_upload_stream = tokio::select!(
+ result = backup_client.upload_logs(&user_identity) => result,
+ _ = cancel_token.cancelled() => { break 'task_loop; }
+ );
+ let (tx, rx) = match logs_upload_stream {
Ok(ws) => ws,
Err(err) => {
println!(
@@ -96,6 +193,7 @@
let err = tokio::select! {
Err(err) = watch_and_upload_files(&backup_client, &user_identity, &mut tx, &logs_waiting_for_confirmation) => err,
Err(err) = delete_confirmed_logs(&mut rx, &logs_waiting_for_confirmation) => err,
+ _ = cancel_token.cancelled() => { break 'task_loop; }
};
println!("Backup handler error: '{err:?}'");
@@ -109,9 +207,13 @@
}
}
- tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await;
+ tokio::select!(
+ _ = tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY) => (),
+ _ = cancel_token.cancelled() => { break 'task_loop; }
+ );
println!("Retrying backup log upload");
}
+ println!("Backup handler task id={task_id} stopped.");
})
}

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 9, 11:33 PM (13 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5858710
Default Alt Text
D14786.1765323206.diff (7 KB)

Event Timeline