diff --git a/services/comm-services-lib/Cargo.lock b/services/comm-services-lib/Cargo.lock --- a/services/comm-services-lib/Cargo.lock +++ b/services/comm-services-lib/Cargo.lock @@ -106,7 +106,7 @@ "futures-util", "mio", "num_cpus", - "socket2", + "socket2 0.4.7", "tokio", "tracing", ] @@ -165,11 +165,26 @@ "serde_json", "serde_urlencoded", "smallvec", - "socket2", + "socket2 0.4.7", "time 0.3.20", "url", ] +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.7.6" @@ -535,6 +550,21 @@ "tracing", ] +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.21.0" @@ -649,6 +679,7 @@ "reqwest", "serde", "serde_json", + "tokio", "tracing", ] @@ -918,6 +949,12 @@ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" + [[package]] name = "h2" version = "0.3.17" @@ -1015,7 +1052,7 @@ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.7", "tokio", "tower-service", "tracing", @@ -1227,6 +1264,15 @@ "unicase", ] +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.6" @@ -1286,6 +1332,15 @@ "libc", ] +[[package]] +name = "object" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.17.1" @@ -1399,9 +1454,9 @@ [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -1550,6 +1605,12 @@ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustc_version" version = "0.4.0" @@ -1771,6 +1832,16 @@ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "spin" version = "0.5.2" @@ -1871,20 +1942,19 @@ [[package]] name = "tokio" -version = "1.25.0" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", + "backtrace", "bytes", "libc", - "memchr", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", - "windows-sys 0.42.0", + "socket2 0.5.3", + "windows-sys 0.48.0", ] [[package]] diff --git a/services/comm-services-lib/Cargo.toml b/services/comm-services-lib/Cargo.toml --- a/services/comm-services-lib/Cargo.toml +++ b/services/comm-services-lib/Cargo.toml @@ -11,6 +11,7 @@ "dep:futures-util", "dep:serde", "dep:serde_json", + "dep:tokio", ] http = ["dep:actix-cors"] @@ -31,5 +32,6 @@ ], optional = true } serde = { version = "1.0", features = ["derive"], optional = true } serde_json = { version = "1.0", optional = true } +tokio = { version = "1.32", optional = true } # http dependencies actix-cors = { version = "0.6", optional = true } diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -251,8 +251,30 @@ return Ok(false); } trace!("Uploading blob data..."); - self.upload_blob(blob_hash, data_stream).await?; - Ok(true) + let Err(upload_error) = self.upload_blob(blob_hash, data_stream).await else { + return Ok(true); + }; + + trace!(%blob_hash, %holder, "Revoking holder due to upload failure"); + self.schedule_revoke_holder(blob_hash, holder); + Err(upload_error) + } + + /// Revokes holder in a separate task. Useful to clean up after + /// upload failure without blocking the current task. + pub fn schedule_revoke_holder( + &self, + blob_hash: impl Into, + holder: impl Into, + ) { + let this = self.clone(); + let blob_hash: String = blob_hash.into(); + let holder: String = holder.into(); + tokio::spawn(async move { + if let Err(err) = this.revoke_holder(&blob_hash, &holder).await { + warn!("Failed to revoke holder: {0:?} - {0}", err); + } + }); } }