diff --git a/services/backup/src/error.rs b/services/backup/src/error.rs --- a/services/backup/src/error.rs +++ b/services/backup/src/error.rs @@ -37,6 +37,7 @@ err @ (BlobServiceError::ClientError(_) | BlobServiceError::UnexpectedHttpStatus(_) | BlobServiceError::ServerError + | BlobServiceError::MaxRetriesExceeded | BlobServiceError::UnexpectedError), ) => { warn!("Transient blob error occurred: {err}"); diff --git a/shared/comm-lib/src/blob/client.rs b/shared/comm-lib/src/blob/client.rs --- a/shared/comm-lib/src/blob/client.rs +++ b/shared/comm-lib/src/blob/client.rs @@ -19,11 +19,17 @@ AssignHolderRequest, AssignHolderResponse, RemoveHolderRequest, RemoveHoldersRequest, }, + tools::exponential_backoff::{ + ExponentialBackoffConfig, MaxRetriesExceededError, + }, }; -use super::types::http::{ - AssignHoldersRequest, AssignHoldersResponse, BlobSizesRequest, - BlobSizesResponse, RemoveHoldersResponse, +use super::types::{ + http::{ + AssignHoldersRequest, AssignHoldersResponse, BlobSizesRequest, + BlobSizesResponse, RemoveHoldersResponse, + }, + BlobInfo, }; #[derive(From, Error, Debug, Display)] @@ -55,6 +61,14 @@ UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode), #[display(...)] UnexpectedError, + #[display(fmt = "Maximum retires exceeded")] + MaxRetriesExceeded, +} + +impl From for BlobServiceError { + fn from(_: MaxRetriesExceededError) -> Self { + Self::MaxRetriesExceeded + } } /// A client interface to Blob service. @@ -292,6 +306,34 @@ Ok(response) } + pub async fn assign_multiple_holders_with_retries( + &self, + blob_infos: Vec, + retry_config: ExponentialBackoffConfig, + ) -> BlobResult> { + let mut exponential_backoff = retry_config.new_counter(); + let mut blob_requests = blob_infos; + let mut established_holders = Vec::new(); + + while !blob_requests.is_empty() { + let request = AssignHoldersRequest { + requests: std::mem::take(&mut blob_requests), + }; + let response = self.assign_multiple_holders(request).await?; + + let holders_added = response.established_new_holders(); + established_holders.extend(holders_added); + + let failed_requests = response.failed_blob_infos(); + if !failed_requests.is_empty() { + blob_requests = failed_requests; + exponential_backoff.sleep_and_retry().await?; + } + } + + Ok(established_holders) + } + /// Removes multiple holders. /// - Holders don't have to own the same blob item. For each item /// a (blob hash; holder) pair is specified. diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs --- a/shared/comm-lib/src/blob/types.rs +++ b/shared/comm-lib/src/blob/types.rs @@ -105,6 +105,18 @@ .map(|result| result.request.clone()) .collect() } + + /// Returns a vec of [`BlobInfo`]s which have just been successfully + /// established. Doesn't include entries for which `holder_already_exists` + /// is true. + pub fn established_new_holders(&self) -> Vec { + self + .results + .iter() + .filter(|result| result.success && !result.holder_already_exists) + .map(|result| result.request.clone()) + .collect() + } } // Blob metadata endpoint types