Page MenuHomePhabricator

D13647.diff
No OneTemporary

D13647.diff

diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs
--- a/services/blob/src/http/handlers/holders.rs
+++ b/services/blob/src/http/handlers/holders.rs
@@ -1,5 +1,6 @@
-use actix_web::error::{ErrorBadRequest, ErrorNotImplemented};
+use actix_web::error::{ErrorBadRequest, ErrorForbidden};
use actix_web::{web, HttpResponse};
+use comm_lib::auth::AuthorizationCredential;
use comm_lib::blob::types::http::{
AssignHoldersRequest, AssignHoldersResponse, BlobInfo,
HolderAssignmentResult, RemoveHoldersRequest, RemoveHoldersResponse,
@@ -62,17 +63,24 @@
pub async fn remove_holders_handler(
service: web::Data<BlobService>,
payload: web::Json<RemoveHoldersRequest>,
+ requesting_identity: AuthorizationCredential,
) -> actix_web::Result<HttpResponse> {
- let RemoveHoldersRequest::Items {
- requests,
- instant_delete,
- } = payload.into_inner()
- else {
- return Err(ErrorNotImplemented("not implemented"));
+ let (requests, instant_delete) = match payload.into_inner() {
+ RemoveHoldersRequest::Items {
+ requests,
+ instant_delete,
+ } => (requests, instant_delete),
+ RemoveHoldersRequest::ByIndexedTags { tags } => {
+ verify_caller_is_service(&requesting_identity)?;
+
+ tracing::debug!("Querying holders for {} tags", tags.len());
+ let requests = service.query_holders_by_tags(tags).await?;
+ (requests, false)
+ }
};
info!(
instant_delete,
- "Remove request for {} holders.",
+ "Requested removal of {} holders.",
requests.len()
);
validate_request(&requests)?;
@@ -114,3 +122,15 @@
Ok(())
}
+
+/// Returns HTTP 403 if caller is not a Comm service
+fn verify_caller_is_service(
+ requesting_identity: &AuthorizationCredential,
+) -> actix_web::Result<()> {
+ match requesting_identity {
+ AuthorizationCredential::ServicesToken(_) => Ok(()),
+ _ => Err(ErrorForbidden(
+ "This endpoint can only be called by other services",
+ )),
+ }
+}
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -13,7 +13,7 @@
use once_cell::sync::Lazy;
use tokio_stream::StreamExt;
use tonic::codegen::futures_core::Stream;
-use tracing::{debug, error, info, trace, warn};
+use tracing::{debug, error, info, trace, warn, Instrument};
use crate::config::{CONFIG, OFFENSIVE_INVITE_LINKS};
use crate::constants::{
@@ -47,8 +47,11 @@
InvalidState,
DB(DBError),
S3(S3Error),
+ #[from(ignore)]
InputError(#[error(ignore)] BoxedError),
InviteLinkError(InviteLinkError),
+ #[from(ignore)]
+ UnexpectedError(#[error(ignore)] BoxedError),
}
type BlobServiceResult<T> = Result<T, BlobServiceError>;
@@ -307,6 +310,40 @@
Ok(results)
}
+ pub async fn query_holders_by_tags(
+ &self,
+ tags: Vec<String>,
+ ) -> BlobServiceResult<Vec<BlobInfo>> {
+ let mut tasks = tokio::task::JoinSet::new();
+
+ for tag in tags {
+ let db = self.db.clone();
+ let task = async move { db.query_indexed_holders(tag).await };
+ tasks.spawn(task.in_current_span());
+ }
+
+ let mut results = Vec::new();
+ while let Some(result) = tasks.join_next().await {
+ match result {
+ Ok(Ok(items)) => results.extend(items),
+ Ok(Err(db_error)) => {
+ tasks.abort_all();
+ return Err(db_error.into());
+ }
+ Err(join_error) => {
+ error!(
+ errorType = error_types::OTHER_ERROR,
+ "Holder query task failed: {:?}", join_error
+ );
+ tasks.abort_all();
+ return Err(BlobServiceError::UnexpectedError(Box::new(join_error)));
+ }
+ }
+ }
+
+ Ok(results)
+ }
+
pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
info!("Starting cleanup...");
// 1. Fetch blobs and holders marked as "unchecked"

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 11:27 AM (17 h, 57 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2561340
Default Alt Text
D13647.diff (3 KB)

Event Timeline