diff --git a/services/identity/Cargo.toml b/services/identity/Cargo.toml --- a/services/identity/Cargo.toml +++ b/services/identity/Cargo.toml @@ -16,6 +16,7 @@ comm-lib = { path = "../../shared/comm-lib", features = [ "aws", "grpc_clients", + "blob-client", ] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } diff --git a/services/identity/src/comm_service/blob.rs b/services/identity/src/comm_service/blob.rs new file mode 100644 --- /dev/null +++ b/services/identity/src/comm_service/blob.rs @@ -0,0 +1,58 @@ +use comm_lib::{ + blob::{ + client::BlobServiceClient, + types::http::{RemoveHoldersRequest, RemoveHoldersResponse}, + }, + database::batch_operations::ExponentialBackoffConfig, + tools::base64_to_base64url, +}; +use tracing::{debug, warn}; + +#[tracing::instrument(skip_all)] +pub async fn remove_holders_for_devices( + blob_client: &BlobServiceClient, + device_ids: &[String], +) -> Result<(), crate::error::Error> { + if device_ids.is_empty() { + debug!("No holders to remove."); + return Ok(()); + } + + debug!( + "Attempting to remove holders for {} devices.", + device_ids.len() + ); + + let retry_config = ExponentialBackoffConfig::default(); + let mut retry_counter = retry_config.new_counter(); + + // holders are prefixed with deviceID in base64url format + // to escape forbidden characters + let holder_prefixes: Vec = device_ids + .iter() + .map(|device_id| base64_to_base64url(device_id)) + .collect(); + + let mut request = RemoveHoldersRequest::ByIndexedTags { + tags: holder_prefixes, + }; + loop { + request = match blob_client.remove_multiple_holders(request.clone()).await { + Ok(response) if response.failed_requests.is_empty() => break, + Ok(RemoveHoldersResponse { failed_requests }) => { + warn!( + "Remaining {} holders not removed. Retrying...", + failed_requests.len() + ); + RemoveHoldersRequest::from(failed_requests) + } + Err(err) => { + warn!(?err, "Removing holders failed due to error. Retrying..."); + request + } + }; + retry_counter.sleep_and_retry().await?; + } + debug!("Removed all holders"); + Ok(()) +} diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -25,6 +25,8 @@ #[display(...)] Reqwest(reqwest::Error), #[display(...)] + BlobService(comm_lib::blob::client::BlobServiceError), + #[display(...)] CannotOverwrite, #[display(...)] OneTimeKeyUploadLimitExceeded, diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs --- a/services/identity/src/main.rs +++ b/services/identity/src/main.rs @@ -32,6 +32,7 @@ mod comm_service { pub mod backup; + pub mod blob; pub mod tunnelbroker; } diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs --- a/shared/comm-lib/src/blob/types.rs +++ b/shared/comm-lib/src/blob/types.rs @@ -38,7 +38,7 @@ } // Remove multiple holders - #[derive(Serialize, Deserialize, Debug)] + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(untagged)] pub enum RemoveHoldersRequest { // remove holders with given (hash, holder) pairs diff --git a/shared/comm-lib/src/tools.rs b/shared/comm-lib/src/tools.rs --- a/shared/comm-lib/src/tools.rs +++ b/shared/comm-lib/src/tools.rs @@ -20,6 +20,12 @@ .all(|c| c.is_ascii_alphanumeric() || VALID_IDENTIFIER_CHARS.contains(&c)) } +/// Converts base64 string to base64url format. See RFC 4648 ยง 5 for details. +#[inline] +pub fn base64_to_base64url(base64_string: &str) -> String { + base64_string.replace('/', "_").replace('+', "-") +} + pub type BoxedError = Box; /// Defers call of the provided function to when [Defer] goes out of scope.