diff --git a/shared/comm-lib/src/blob/client.rs b/shared/comm-lib/src/blob/client.rs
index b1d4de8cc..c43baa8c8 100644
--- a/shared/comm-lib/src/blob/client.rs
+++ b/shared/comm-lib/src/blob/client.rs
@@ -1,416 +1,474 @@
 use bytes::Bytes;
 use derive_more::{Display, Error, From};
 use futures_core::Stream;
 use futures_util::StreamExt;
 use reqwest::{
   multipart::{Form, Part},
   Body, Method, RequestBuilder,
 use tracing::{debug, error, trace, warn};
 // publicly re-export some reqwest types
 pub use reqwest::Error as ReqwestError;
 pub use reqwest::StatusCode;
 pub use reqwest::Url;
 use crate::{
   auth::{AuthorizationCredential, UserIdentity},
     AssignHolderRequest, AssignHolderResponse, RemoveHolderRequest,
+    RemoveHoldersRequest,
+use super::types::{http::RemoveHoldersResponse, BlobInfo};
 #[derive(From, Error, Debug, Display)]
 pub enum BlobServiceError {
   /// HTTP Client errors, this includes:
   /// - connection failures
   /// - request errors (e.g. upload input stream)
   /// Invalid Blob service URL provided
   URLError(#[error(ignore)] String),
   /// Blob service returned HTTP 404
   /// - blob or holder not found
   /// Blob service returned HTTP 409
   /// - blob or holder already exists
   /// Blob service returned HTTP 400
   /// - invalid holder or blob_hash format
   /// Blob service returned HTTP 50x
   UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode),
 /// A client interface to Blob service.
 /// The `BlobServiceClient` holds a connection pool internally, so it is advised that
 /// you create one and **reuse** it, by **cloning**.
 /// A clone is recommended for each individual client identity, that has
 /// a different `UserIdentity`.
 /// # Example
 /// ```ignore
 /// // create client for user A
 /// let clientA = BlobServiceClient::new(blob_endpoint).with_user_identity(userA);
 /// // reuse the client connection with different credentials
 /// let clientB = clientA.with_user_identity(userB);
 /// // clientA still uses userA credentials
 /// ```
 /// A clone is recommended when the client concurrently handles multiple identities -
 /// e.g. a HTTP service handling requests from different users. The `with_user_identity()`
 /// method should be used for this purpose.
 /// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it,
 /// because it already uses an `Arc` internally.
 pub struct BlobServiceClient {
   http_client: reqwest::Client,
   blob_service_url: reqwest::Url,
   auth_credential: Option<AuthorizationCredential>,
 impl BlobServiceClient {
   /// Creates a new Blob Service client instance. It is unauthenticated by default
   /// so you need to call `with_user_identity()` afterwards:
   /// ```ignore
   /// let client = BlobServiceClient::new(blob_endpoint).with_user_identity(user);
   /// ```
   /// **Note**: It is advised to create this client once and reuse by **cloning**.
   /// See [`BlobServiceClient`] docs for details
   pub fn new(blob_service_url: reqwest::Url) -> Self {
     debug!("Creating BlobServiceClient. URL: {}", blob_service_url);
     Self {
       http_client: reqwest::Client::new(),
       auth_credential: None,
   /// Clones the client and sets the [`UserIdentity`] for the new instance.
   /// This allows the client to reuse the same connection pool for different users.
   /// This is the same as calling
   /// ```ignore
   /// client.with_authentication(AuthorizationCredential::UserToken(user_identity))
   /// ````
   pub fn with_user_identity(&self, user_identity: UserIdentity) -> Self {
   /// Clones the client and sets the [`AuthorizationCredential`] for the new instance.
   /// This allows the client to reuse the same connection pool for different users.
   pub fn with_authentication(
     auth_credential: AuthorizationCredential,
   ) -> Self {
     trace!("Set auth_credential: {:?}", &auth_credential);
     let mut this = self.clone();
     this.auth_credential = Some(auth_credential);
   /// Downloads blob with given [`blob_hash`].
   /// @returns a stream of blob bytes
   /// # Errors thrown
   /// - [BlobServiceError::NotFound] if blob with given hash does not exist
   /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format
   /// # Example
   /// ```ignore
   /// let client =
   ///   BlobServiceClient::new("http://localhost:50053".parse()?);
   /// let mut stream = client.get("hello").await?;
   /// while let Some(data) = stream.try_next().await? {
   ///   println!("Got data: {:?}", data);
   /// }
   /// ```
   pub async fn get(
     blob_hash: &str,
   ) -> BlobResult<impl Stream<Item = BlobResult<Bytes>>> {
     debug!(?blob_hash, "Get blob request");
     let url = self.get_blob_url(Some(blob_hash))?;
     let response = self
       .request(Method::GET, url)?
     debug!("Response status: {}", response.status());
     if response.status().is_success() {
       let stream = response.bytes_stream().map(|result| match result {
         Ok(bytes) => Ok(bytes),
         Err(error) => {
           warn!("Error while streaming response: {}", error);
       return Ok(stream);
     let error = handle_http_error(response.status());
     if let Ok(message) = response.text().await {
       trace!("Error response message: {}", message);
   /// Assigns a new holder to a blob represented by [`blob_hash`].
   /// Returns `BlobServiceError::AlreadyExists` if blob already has
   /// a holder with given [`holder`] name.
   pub async fn assign_holder(
     blob_hash: &str,
     holder: &str,
   ) -> BlobResult<bool> {
     debug!("Assign holder request");
     let url = self.get_blob_url(None)?;
     let payload = AssignHolderRequest {
       holder: holder.to_string(),
       blob_hash: blob_hash.to_string(),
     debug!("Request payload: {:?}", payload);
     let response = self
       .request(Method::POST, url)?
     debug!("Response status: {}", response.status());
     if response.status().is_success() {
       let AssignHolderResponse { data_exists } = response.json().await?;
       trace!("Data exists: {}", data_exists);
       return Ok(data_exists);
     let error = handle_http_error(response.status());
     if let Ok(message) = response.text().await {
       trace!("Error response message: {}", message);
   /// Revokes given holder from a blob represented by [`blob_hash`].
   /// Returns `BlobServiceError::NotFound` if blob with given hash does not exist
   /// or it does not have such holder
   pub async fn revoke_holder(
     blob_hash: &str,
     holder: &str,
   ) -> BlobResult<()> {
     debug!("Revoke holder request");
     let url = self.get_blob_url(None)?;
     let payload = RemoveHolderRequest {
       holder: holder.to_string(),
       blob_hash: blob_hash.to_string(),
       instant_delete: false,
     debug!("Request payload: {:?}", payload);
     let response = self
       .request(Method::DELETE, url)?
     debug!("Response status: {}", response.status());
     if response.status().is_success() {
       trace!("Revoke holder request successful");
       return Ok(());
     let error = handle_http_error(response.status());
     if let Ok(message) = response.text().await {
       trace!("Error response message: {}", message);
+  /// Removes multiple holders.
+  /// - Holders don't have to own the same blob item. For each item
+  ///   a (blob hash; holder) pair is specified.
+  /// - Operation is idempotent. Not existing holders are treated as
+  ///   successfully removed.
+  /// - If one or more removal failed server-side, these will be returned
+  ///   in the `failed_requests` response field. It has the same format
+  ///   as this function input and can be directly used to retry removal.
+  ///
+  /// For single holder removal, see [`BlobServiceClient::revoke_holder`].
+  pub async fn remove_multiple_holders(
+    &self,
+    blob_infos: Vec<BlobInfo>,
+  ) -> BlobResult<RemoveHoldersResponse> {
+    let num_holders = blob_infos.len();
+    debug!(num_holders, "Revoke multiple holders request.");
+    let url = self.get_holders_url()?;
+    let payload = RemoveHoldersRequest {
+      requests: blob_infos,
+      instant_delete: false,
+    };
+    trace!("Request payload: {:?}", payload);
+    let response = self
+      .request(Method::DELETE, url)?
+      .json(&payload)
+      .send()
+      .await?;
+    debug!("Response status: {}", response.status());
+    if response.status().is_success() {
+      let result: RemoveHoldersResponse = response.json().await?;
+      debug!(
+        "Request successful. {} holders failed to be removed.",
+        result.failed_requests.len()
+      );
+      return Ok(result);
+    }
+    let error = handle_http_error(response.status());
+    if let Ok(message) = response.text().await {
+      debug!("Error response message: {}", message);
+    }
+    Err(error)
+  }
   /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash
   /// already exists.
   /// # Example
   /// ```ignore
   /// use std::io::{Error, ErrorKind};
   /// let client =
   ///   BlobServiceClient::new("http://localhost:50053".parse()?);
   /// let stream = async_stream::stream! {
   ///   yield Ok(vec![1, 2, 3]);
   ///   yield Ok(vec![4, 5, 6]);
   ///   yield Err(Error::new(ErrorKind::Other, "Oops"));
   /// };
   /// client.upload_blob(&blob_hash, stream).await?;
   /// ```
   pub async fn upload_blob<H, S>(
     blob_hash: H,
     data_stream: S,
   ) -> BlobResult<()>
     H: Into<String>,
     S: futures_core::stream::TryStream + Send + Sync + 'static,
     S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
     Bytes: From<S::Ok>,
     debug!("Upload blob request");
     let url = self.get_blob_url(None)?;
     let streaming_body = Body::wrap_stream(data_stream);
     let form = Form::new()
       .text("blob_hash", blob_hash.into())
       .part("blob_data", Part::stream(streaming_body));
     let response = self
       .request(Method::PUT, url)?
     debug!("Response status: {}", response.status());
     if response.status().is_success() {
       trace!("Blob upload successful");
       return Ok(());
     let error = handle_http_error(response.status());
     if let Ok(message) = response.text().await {
       trace!("Error response message: {}", message);
   /// A wrapper around [`BlobServiceClient::assign_holder`] and [`BlobServiceClient::upload_blob`].
   /// Assigns a new holder to a blob represented by [`blob_hash`]. If the blob does not exist,
   /// uploads the data from [`data_stream`].
   pub async fn simple_put<S>(
     blob_hash: &str,
     holder: &str,
     data_stream: S,
   ) -> BlobResult<bool>
     S: futures_core::stream::TryStream + Send + Sync + 'static,
     S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
     Bytes: From<S::Ok>,
     trace!("Begin simple put. Assigning holder...");
     let data_exists = self.assign_holder(blob_hash, holder).await?;
     if data_exists {
       trace!("Blob data already exists. Skipping upload.");
       return Ok(false);
     trace!("Uploading blob data...");
     let Err(upload_error) = self.upload_blob(blob_hash, data_stream).await
     else {
       return Ok(true);
     trace!(%blob_hash, %holder, "Revoking holder due to upload failure");
     self.schedule_revoke_holder(blob_hash, holder);
   /// Revokes holder in a separate task. Useful to clean up after
   /// upload failure without blocking the current task.
   pub fn schedule_revoke_holder(
     blob_hash: impl Into<String>,
     holder: impl Into<String>,
   ) {
     let this = self.clone();
     let blob_hash: String = blob_hash.into();
     let holder: String = holder.into();
     tokio::spawn(async move {
       if let Err(err) = this.revoke_holder(&blob_hash, &holder).await {
         warn!("Failed to revoke holder: {0:?} - {0}", err);
 // private helper methods
 impl BlobServiceClient {
   fn get_blob_url(
     blob_hash: Option<&str>,
   ) -> Result<Url, BlobServiceError> {
     let path = match blob_hash {
       Some(hash) => format!("/blob/{}", hash),
       None => "/blob".to_string(),
     let url = self
       .map_err(|err| BlobServiceError::URLError(err.to_string()))?;
     trace!("Constructed request URL: {}", url);
+  fn get_holders_url(&self) -> Result<Url, BlobServiceError> {
+    let url = self
+      .blob_service_url
+      .join("/holders")
+      .map_err(|err| BlobServiceError::URLError(err.to_string()))?;
+    trace!("Constructed request URL: {}", url);
+    Ok(url)
+  }
   fn request(
     http_method: Method,
     url: Url,
   ) -> BlobResult<RequestBuilder> {
     let request = self.http_client.request(http_method, url);
     match &self.auth_credential {
       Some(credential) => {
         let token = credential.as_authorization_token().map_err(|e| {
           error!("Failed to parse authorization token: {}", e);
       None => Ok(request),
 fn handle_http_error(status_code: StatusCode) -> BlobServiceError {
   match status_code {
     StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments,
     StatusCode::NOT_FOUND => BlobServiceError::NotFound,
     StatusCode::CONFLICT => BlobServiceError::AlreadyExists,
     code if code.is_server_error() => BlobServiceError::ServerError,
     code => BlobServiceError::UnexpectedHttpStatus(code),
 type BlobResult<T> = Result<T, BlobServiceError>;
 #[cfg(feature = "http")]
 impl crate::http::auth_service::HttpAuthenticatedService for BlobServiceClient {
   fn make_authenticated(
     auth_credential: AuthorizationCredential,
   ) -> Self {
   fn accepts_services_token(&self, _req: &actix_web::HttpRequest) -> bool {
diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs
index b3aaa2a3c..d62fbcf78 100644
--- a/shared/comm-lib/src/blob/types.rs
+++ b/shared/comm-lib/src/blob/types.rs
@@ -1,141 +1,141 @@
 use derive_more::Constructor;
 use hex::ToHex;
 use sha2::{Digest, Sha256};
 /// This module defines structures for HTTP requests and responses
 /// for the Blob Service. The definitions in this file should remain in sync
 /// with the types and validators defined in the corresponding
 /// JavaScript file at `lib/types/blob-service-types.js`.
 /// If you edit the definitions in one file,
 /// please make sure to update the corresponding definitions in the other.
 pub mod http {
   use serde::{Deserialize, Serialize};
   pub use super::BlobInfo;
   // Assign multiple holders
   #[derive(Serialize, Deserialize, Debug)]
   #[serde(rename_all = "camelCase")]
   pub struct AssignHoldersRequest {
     pub requests: Vec<BlobInfo>,
   #[derive(Serialize, Deserialize, Debug)]
   #[serde(rename_all = "camelCase")]
   pub struct HolderAssignmentResult {
     pub request: BlobInfo,
     pub success: bool,
     pub data_exists: bool,
     pub holder_already_exists: bool,
   #[derive(Serialize, Deserialize, Debug)]
   #[serde(rename_all = "camelCase")]
   pub struct AssignHoldersResponse {
     pub results: Vec<HolderAssignmentResult>,
   // Remove multiple holders
-  #[derive(Deserialize, Debug)]
+  #[derive(Serialize, Deserialize, Debug)]
   #[serde(rename_all = "camelCase")]
   pub struct RemoveHoldersRequest {
     pub requests: Vec<BlobInfo>,
     pub instant_delete: bool,
-  #[derive(Serialize, Debug)]
+  #[derive(Serialize, Deserialize, Debug)]
   #[serde(rename_all = "camelCase")]
   pub struct RemoveHoldersResponse {
     pub failed_requests: Vec<BlobInfo>,
   // Single holder endpoint types
   #[derive(Serialize, Deserialize, Debug)]
   pub struct AssignHolderRequest {
     pub blob_hash: String,
     pub holder: String,
   #[derive(Serialize, Deserialize, Debug)]
   pub struct AssignHolderResponse {
     pub data_exists: bool,
   #[derive(Serialize, Deserialize, Debug)]
   pub struct RemoveHolderRequest {
     pub blob_hash: String,
     pub holder: String,
     /// If true, the blob will be deleted intantly
     /// after the last holder is revoked.
     pub instant_delete: bool,
 /// Blob owning information - stores both blob_hash and holder
 #[derive(Clone, Debug, Constructor, serde::Serialize, serde::Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct BlobInfo {
   pub blob_hash: String,
   pub holder: String,
 impl BlobInfo {
   pub fn from_bytes(data: &[u8]) -> Self {
     Self {
       blob_hash: Sha256::digest(data).encode_hex(),
       holder: uuid::Uuid::new_v4().to_string(),
 #[cfg(feature = "aws")]
 mod db_conversions {
   use super::*;
   use crate::database::{AttributeTryInto, DBItemError, TryFromAttribute};
   use aws_sdk_dynamodb::types::AttributeValue;
   use std::collections::HashMap;
   const BLOB_HASH_DDB_MAP_KEY: &str = "blob_hash";
   const HOLDER_DDB_MAP_KEY: &str = "holder";
   impl From<BlobInfo> for AttributeValue {
     fn from(value: BlobInfo) -> Self {
       let map = HashMap::from([
   impl From<&BlobInfo> for AttributeValue {
     fn from(value: &BlobInfo) -> Self {
   impl TryFromAttribute for BlobInfo {
     fn try_from_attr(
       attribute_name: impl Into<String>,
       attribute: Option<AttributeValue>,
     ) -> Result<Self, DBItemError> {
       let attr_name: String = attribute_name.into();
       let mut inner_map: HashMap<String, AttributeValue> =
       let blob_hash = inner_map
       let holder = inner_map
       Ok(BlobInfo { blob_hash, holder })