diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs --- a/services/blob/src/http/handlers/holders.rs +++ b/services/blob/src/http/handlers/holders.rs @@ -1,5 +1,6 @@ -use actix_web::error::{ErrorBadRequest, ErrorNotImplemented}; +use actix_web::error::{ErrorBadRequest, ErrorForbidden}; use actix_web::{web, HttpResponse}; +use comm_lib::auth::AuthorizationCredential; use comm_lib::blob::types::http::{ AssignHoldersRequest, AssignHoldersResponse, BlobInfo, HolderAssignmentResult, RemoveHoldersRequest, RemoveHoldersResponse, @@ -62,17 +63,24 @@ pub async fn remove_holders_handler( service: web::Data, payload: web::Json, + requesting_identity: AuthorizationCredential, ) -> actix_web::Result { - let RemoveHoldersRequest::Items { - requests, - instant_delete, - } = payload.into_inner() - else { - return Err(ErrorNotImplemented("not implemented")); + let (requests, instant_delete) = match payload.into_inner() { + RemoveHoldersRequest::Items { + requests, + instant_delete, + } => (requests, instant_delete), + RemoveHoldersRequest::ByIndexedTags { tags } => { + verify_caller_is_service(&requesting_identity)?; + + tracing::debug!("Querying holders for {} tags", tags.len()); + let requests = service.query_holders_by_tags(tags).await?; + (requests, false) + } }; info!( instant_delete, - "Remove request for {} holders.", + "Requested removal of {} holders.", requests.len() ); validate_request(&requests)?; @@ -114,3 +122,15 @@ Ok(()) } + +/// Returns HTTP 403 if caller is not a Comm service +fn verify_caller_is_service( + requesting_identity: &AuthorizationCredential, +) -> actix_web::Result<()> { + match requesting_identity { + AuthorizationCredential::ServicesToken(_) => Ok(()), + _ => Err(ErrorForbidden( + "This endpoint can only be called by other services", + )), + } +} diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -13,7 +13,7 @@ use once_cell::sync::Lazy; use tokio_stream::StreamExt; use tonic::codegen::futures_core::Stream; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn, Instrument}; use crate::config::{CONFIG, OFFENSIVE_INVITE_LINKS}; use crate::constants::{ @@ -47,8 +47,11 @@ InvalidState, DB(DBError), S3(S3Error), + #[from(ignore)] InputError(#[error(ignore)] BoxedError), InviteLinkError(InviteLinkError), + #[from(ignore)] + UnexpectedError(#[error(ignore)] BoxedError), } type BlobServiceResult = Result; @@ -307,6 +310,40 @@ Ok(results) } + pub async fn query_holders_by_tags( + &self, + tags: Vec, + ) -> BlobServiceResult> { + let mut tasks = tokio::task::JoinSet::new(); + + for tag in tags { + let db = self.db.clone(); + let task = async move { db.query_indexed_holders(tag).await }; + tasks.spawn(task.in_current_span()); + } + + let mut results = Vec::new(); + while let Some(result) = tasks.join_next().await { + match result { + Ok(Ok(items)) => results.extend(items), + Ok(Err(db_error)) => { + tasks.abort_all(); + return Err(db_error.into()); + } + Err(join_error) => { + error!( + errorType = error_types::OTHER_ERROR, + "Holder query task failed: {:?}", join_error + ); + tasks.abort_all(); + return Err(BlobServiceError::UnexpectedError(Box::new(join_error))); + } + } + } + + Ok(results) + } + pub async fn perform_cleanup(&self) -> anyhow::Result<()> { info!("Starting cleanup..."); // 1. Fetch blobs and holders marked as "unchecked"