Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32365245
D14786.1765323206.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
7 KB
Referenced Files
None
Subscribers
None
D14786.1765323206.diff
View Options
diff --git a/native/native_rust_library/Cargo.lock b/native/native_rust_library/Cargo.lock
--- a/native/native_rust_library/Cargo.lock
+++ b/native/native_rust_library/Cargo.lock
@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-version = 3
+version = 4
[[package]]
name = "addr2line"
@@ -269,9 +269,9 @@
[[package]]
name = "bytes"
-version = "1.2.1"
+version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
+checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "cc"
@@ -1259,6 +1259,7 @@
"serde_json",
"siwe",
"tokio",
+ "tokio-util",
"tonic",
"tracing",
]
@@ -2241,16 +2242,15 @@
[[package]]
name = "tokio-util"
-version = "0.7.4"
+version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
+checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
- "tracing",
]
[[package]]
diff --git a/native/native_rust_library/Cargo.toml b/native/native_rust_library/Cargo.toml
--- a/native/native_rust_library/Cargo.toml
+++ b/native/native_rust_library/Cargo.toml
@@ -7,6 +7,7 @@
[dependencies]
cxx = "1.0"
tokio = { version = "1.24", features = ["macros", "rt-multi-thread"] }
+tokio-util = "0.7.15"
tonic = "0.9.1"
lazy_static = "1.4"
tracing = "0.1"
diff --git a/native/native_rust_library/src/backup/upload_handler.rs b/native/native_rust_library/src/backup/upload_handler.rs
--- a/native/native_rust_library/src/backup/upload_handler.rs
+++ b/native/native_rust_library/src/backup/upload_handler.rs
@@ -22,61 +22,158 @@
use std::io::ErrorKind;
use std::path::PathBuf;
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, LazyLock, Mutex};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
+use tokio_util::sync::CancellationToken;
lazy_static! {
- pub static ref UPLOAD_HANDLER: Arc<Mutex<Option<JoinHandle<Infallible>>>> =
- Arc::new(Mutex::new(None));
static ref TRIGGER_BACKUP_FILE_UPLOAD: Arc<Notify> = Arc::new(Notify::new());
static ref BACKUP_FOLDER_PATH: PathBuf = PathBuf::from(
get_backup_directory_path().expect("Getting backup directory path failed")
);
}
+static BACKUP_HANDLER: LazyLock<BackupHandler> =
+ LazyLock::new(BackupHandler::new);
pub mod ffi {
use super::*;
pub fn start_backup_handler() -> Result<(), Box<dyn Error>> {
- let mut handle = UPLOAD_HANDLER.lock()?;
+ BACKUP_HANDLER.start_if_not_running(super::start_handler_routine)?;
+ Ok(())
+ }
+
+ pub fn stop_backup_handler() -> Result<(), Box<dyn Error>> {
+ BACKUP_HANDLER.stop()?;
+ Ok(())
+ }
+
+ pub fn trigger_backup_file_upload() {
+ TRIGGER_BACKUP_FILE_UPLOAD.notify_one();
+ }
+}
+
+type TaskResult<'err, T> = Result<T, Box<dyn Error + 'err>>;
+
+struct BackupHandlerTask {
+ handle: JoinHandle<()>,
+ cancel_token: tokio_util::sync::CancellationToken,
+ task_id: u32,
+}
+
+struct BackupHandler {
+ task: Arc<Mutex<Option<BackupHandlerTask>>>,
+}
+
+impl BackupHandlerTask {
+ /// True if task hasn't finished and isn't pending cancelation
+ fn is_active(&self) -> bool {
+ !self.handle.is_finished() && !self.cancel_token.is_cancelled()
+ }
+
+ /// Requests this task routine to gracefully stop
+ fn request_stop(&self) {
+ self.cancel_token.cancel();
+ }
- if let Some(handle) = &*handle {
- if !handle.is_finished() {
- return Ok(()); // Early exit if a running future is detected
+ /// Creates a backup handler task and asynchronously starts it
+ fn start_new<'task, F>(
+ prev_task: Option<BackupHandlerTask>,
+ routine: impl FnOnce(u32, CancellationToken) -> TaskResult<'task, F>,
+ ) -> TaskResult<'task, Self>
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let cancel_token = CancellationToken::new();
+ let task_id = prev_task.as_ref().map_or(1, |prev| prev.task_id + 1);
+
+ let routine_future = routine(task_id, cancel_token.clone())?;
+ let handle = RUNTIME.spawn(async move {
+ // previous task might still be running, wait for it to complete
+ if let Some(prev_task) = prev_task {
+ if let Err(cancel_reason) = prev_task.handle.await {
+ println!(
+ "Backup handler task {} has just been unexpectedly canceled: {:?}",
+ prev_task.task_id, cancel_reason
+ );
+ }
}
+ // start the new task
+ routine_future.await
+ });
+
+ Ok(BackupHandlerTask {
+ handle,
+ task_id,
+ cancel_token,
+ })
+ }
+}
+
+impl<'task> BackupHandler {
+ pub fn new() -> Self {
+ Self {
+ task: Arc::new(Mutex::new(None)),
}
+ }
- // No running future or the existing one is finished
- *handle = Some(RUNTIME.spawn(super::start()?));
+ pub fn start_if_not_running<F>(
+ &'task self,
+ routine: impl FnOnce(u32, CancellationToken) -> TaskResult<'task, F>,
+ ) -> TaskResult<'task, ()>
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let mut task = self.task.lock()?;
+
+ let prev_task = if let Some(prev_task) = &*task {
+ // There might be situation that task is still running
+ // but is pending cancelation
+ let is_canceled = prev_task.cancel_token.is_cancelled();
+ if !prev_task.handle.is_finished() && !is_canceled {
+ return Ok(());
+ }
+
+ task.take()
+ } else {
+ None
+ };
+
+ let new_task = BackupHandlerTask::start_new(prev_task, routine)?;
+ *task = Some(new_task);
Ok(())
}
- pub fn stop_backup_handler() -> Result<(), Box<dyn Error>> {
- let Some(handler) = UPLOAD_HANDLER.lock()?.take() else {
+ fn stop(&'task self) -> TaskResult<'task, ()> {
+ let Some(task) = &*self.task.lock()? else {
return Ok(());
};
- if handler.is_finished() {
+ if !task.is_active() {
return Ok(());
}
- handler.abort();
+ task.request_stop();
Ok(())
}
-
- pub fn trigger_backup_file_upload() {
- TRIGGER_BACKUP_FILE_UPLOAD.notify_one();
- }
}
-pub fn start() -> Result<impl Future<Output = Infallible>, Box<dyn Error>> {
+pub fn start_handler_routine(
+ task_id: u32,
+ cancel_token: CancellationToken,
+) -> Result<impl Future<Output = ()>, Box<dyn Error>> {
let backup_client = BackupClient::new(BACKUP_SOCKET_ADDR)?;
let user_identity = get_user_identity_from_secure_store()?;
Ok(async move {
- loop {
- let (tx, rx) = match backup_client.upload_logs(&user_identity).await {
+ println!("Backup handler task id={task_id} started.");
+ 'task_loop: loop {
+ let logs_upload_stream = tokio::select!(
+ result = backup_client.upload_logs(&user_identity) => result,
+ _ = cancel_token.cancelled() => { break 'task_loop; }
+ );
+ let (tx, rx) = match logs_upload_stream {
Ok(ws) => ws,
Err(err) => {
println!(
@@ -96,6 +193,7 @@
let err = tokio::select! {
Err(err) = watch_and_upload_files(&backup_client, &user_identity, &mut tx, &logs_waiting_for_confirmation) => err,
Err(err) = delete_confirmed_logs(&mut rx, &logs_waiting_for_confirmation) => err,
+ _ = cancel_token.cancelled() => { break 'task_loop; }
};
println!("Backup handler error: '{err:?}'");
@@ -109,9 +207,13 @@
}
}
- tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY).await;
+ tokio::select!(
+ _ = tokio::time::sleep(BACKUP_SERVICE_CONNECTION_RETRY_DELAY) => (),
+ _ = cancel_token.cancelled() => { break 'task_loop; }
+ );
println!("Retrying backup log upload");
}
+ println!("Backup handler task id={task_id} stopped.");
})
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 9, 11:33 PM (13 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5858710
Default Alt Text
D14786.1765323206.diff (7 KB)
Attached To
Mode
D14786: [native_rust_lib] Make backup upload handler thread safe
Attached
Detach File
Event Timeline
Log In to Comment