Page MenuHomePhabricator

D13480.id44584.diff
No OneTemporary

D13480.id44584.diff

diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs
--- a/services/blob/src/http/handlers/holders.rs
+++ b/services/blob/src/http/handlers/holders.rs
@@ -12,6 +12,79 @@
holder: String,
}
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct AssignHoldersPayload {
+ requests: Vec<BlobHashAndHolder>,
+}
+
+#[derive(Serialize, Debug)]
+#[serde(rename_all = "camelCase")]
+struct HolderAssignmentResult {
+ #[serde(flatten)]
+ request: BlobHashAndHolder,
+ success: bool,
+ data_exists: bool,
+ holder_already_exists: bool,
+}
+#[derive(Serialize, Debug)]
+#[serde(rename_all = "camelCase")]
+struct AssignHoldersResponse {
+ results: Vec<HolderAssignmentResult>,
+}
+
+#[instrument(name = "assign_multiple_holders", skip_all)]
+pub async fn assign_holders_handler(
+ service: web::Data<BlobService>,
+ payload: web::Json<AssignHoldersPayload>,
+) -> actix_web::Result<HttpResponse> {
+ use crate::database::DBError;
+ use crate::service::BlobServiceError;
+
+ let AssignHoldersPayload { requests } = payload.into_inner();
+ info!("Assign holder request for {} holders", requests.len());
+ validate_request(&requests)?;
+
+ let mut results = Vec::with_capacity(requests.len());
+
+ for item in requests {
+ let BlobHashAndHolder { blob_hash, holder } = &item;
+ let result = match service.assign_holder(blob_hash, holder).await {
+ Ok(data_exists) => HolderAssignmentResult {
+ request: item,
+ success: true,
+ data_exists,
+ holder_already_exists: false,
+ },
+ Err(BlobServiceError::DB(DBError::ItemAlreadyExists)) => {
+ let data_exists =
+ service.blob_hash_exists(blob_hash).await.unwrap_or(false);
+ HolderAssignmentResult {
+ request: item,
+ success: true,
+ data_exists,
+ holder_already_exists: true,
+ }
+ }
+ Err(err) => {
+ warn!("Holder assignment error: {:?}", err);
+ let data_exists =
+ service.blob_hash_exists(blob_hash).await.unwrap_or(false);
+ HolderAssignmentResult {
+ request: item,
+ success: false,
+ data_exists,
+ holder_already_exists: false,
+ }
+ }
+ };
+ results.push(result);
+ }
+
+ let response = AssignHoldersResponse { results };
+ Ok(HttpResponse::Ok().json(web::Json(response)))
+}
+
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RemoveHoldersPayload {
diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs
--- a/services/blob/src/http/mod.rs
+++ b/services/blob/src/http/mod.rs
@@ -48,6 +48,7 @@
.service(
web::resource("/holders")
.wrap(auth_middleware)
+ .route(web::post().to(handlers::holders::assign_holders_handler))
.route(web::delete().to(handlers::holders::remove_holders_handler)),
)
})
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -282,6 +282,19 @@
Ok(())
}
+ pub async fn blob_hash_exists(
+ &self,
+ blob_hash: impl Into<String>,
+ ) -> BlobServiceResult<bool> {
+ match self.db.get_blob_item(blob_hash).await {
+ Ok(item) => Ok(item.is_some()),
+ Err(err) => {
+ warn!("Failed to check if blob exists: {err:?}");
+ Err(err.into())
+ }
+ }
+ }
+
pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
info!("Starting cleanup...");
// 1. Fetch blobs and holders marked as "unchecked"

File Metadata

Mime Type
text/plain
Expires
Mon, Sep 30, 8:37 AM (22 h, 5 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2204068
Default Alt Text
D13480.id44584.diff (3 KB)

Event Timeline