diff --git a/services/identity/Cargo.toml b/services/identity/Cargo.toml --- a/services/identity/Cargo.toml +++ b/services/identity/Cargo.toml @@ -16,6 +16,7 @@ comm-lib = { path = "../../shared/comm-lib", features = [ "aws", "grpc_clients", + "blob-client", ] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } diff --git a/services/identity/src/comm_service/blob.rs b/services/identity/src/comm_service/blob.rs new file mode 100644 --- /dev/null +++ b/services/identity/src/comm_service/blob.rs @@ -0,0 +1,47 @@ +use comm_lib::{ + blob::{client::BlobServiceClient, types::http::RemoveHoldersRequest}, + database::batch_operations::ExponentialBackoffConfig, +}; +use tracing::debug; + +#[tracing::instrument(skip_all)] +pub async fn remove_holders_for_devices( + blob_client: &BlobServiceClient, + device_ids: &[String], +) -> Result<(), crate::error::Error> { + if device_ids.is_empty() { + debug!("No holders to remove."); + return Ok(()); + } + + debug!( + "Attempting to remove holders for {} devices.", + device_ids.len() + ); + + let retry_config = ExponentialBackoffConfig::default(); + let mut retry_counter = retry_config.new_counter(); + + let holder_prefixes = Vec::from(device_ids); + let mut pending_request = Some(RemoveHoldersRequest::from(holder_prefixes)); + loop { + let Some(request) = pending_request.take() else { + break; + }; + + let remaining = blob_client + .remove_multiple_holders(request) + .await? + .failed_requests; + + if remaining.is_empty() { + break; + } + + debug!("Remaining {} holders not removed.", remaining.len()); + pending_request = Some(remaining.into()); + retry_counter.sleep_and_retry().await?; + } + debug!("Removed all holders"); + Ok(()) +} diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -25,6 +25,8 @@ #[display(...)] Reqwest(reqwest::Error), #[display(...)] + BlobService(comm_lib::blob::client::BlobServiceError), + #[display(...)] CannotOverwrite, #[display(...)] OneTimeKeyUploadLimitExceeded, diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs --- a/services/identity/src/main.rs +++ b/services/identity/src/main.rs @@ -32,6 +32,7 @@ mod comm_service { pub mod backup; + pub mod blob; pub mod tunnelbroker; }