diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs
--- a/services/blob/src/database/client.rs
+++ b/services/blob/src/database/client.rs
@@ -1,10 +1,15 @@
 use aws_sdk_dynamodb::{
   operation::put_item::PutItemOutput,
-  types::{AttributeValue, Delete, TransactWriteItem, Update},
+  types::{
+    AttributeValue, Delete, DeleteRequest, PutRequest, TransactWriteItem,
+    Update, WriteRequest,
+  },
   Error as DynamoDBError,
 };
 use chrono::Utc;
-use comm_services_lib::database::TryFromAttribute;
+use comm_services_lib::database::{
+  self, batch_operations::ExponentialBackoffConfig, TryFromAttribute,
+};
 use std::collections::HashMap;
 use tracing::{debug, error, trace};
 
@@ -190,7 +195,9 @@
         AttributeValue::S(blob_hash.into()),
       )
       .consistent_read(true)
-      .set_limit(limit)
+      // we need to increase limit by 1 because the blob item itself can be fetched too
+      // it is filtered-out later
+      .set_limit(limit.map(|it| it + 1))
       .send()
       .await
       .map_err(|err| {
@@ -213,6 +220,123 @@
       .collect::<Result<Vec<_>, _>>()
       .map_err(DBError::Attribute)
   }
+
+  /// Returns a list of primary keys for rows that already exist in the table
+  pub async fn list_existing_keys(
+    &self,
+    keys: impl IntoIterator<Item = PrimaryKey>,
+  ) -> DBResult<Vec<PrimaryKey>> {
+    database::batch_operations::batch_get(
+      &self.ddb,
+      BLOB_TABLE_NAME,
+      keys,
+      Some(format!("{}, {}", ATTR_BLOB_HASH, ATTR_HOLDER)),
+      ExponentialBackoffConfig::default(),
+    )
+    .await?
+    .into_iter()
+    .map(PrimaryKey::try_from)
+    .collect::<Result<Vec<_>, _>>()
+  }
+
+  /// Returns a list of primary keys for "unchecked" items (blob / holder)
+  /// that were last modified at least `min_age` ago.
+  /// We need to specify if we want to get blob or holder items.
+  pub async fn find_unchecked_items(
+    &self,
+    kind: UncheckedKind,
+    min_age: chrono::Duration,
+  ) -> DBResult<Vec<PrimaryKey>> {
+    let created_until = Utc::now() - min_age;
+    let timestamp = created_until.timestamp_millis();
+
+    let response = self
+      .ddb
+      .query()
+      .table_name(BLOB_TABLE_NAME)
+      .index_name(UNCHECKED_INDEX_NAME)
+      .key_condition_expression(
+        "#unchecked = :kind AND #last_modified < :timestamp",
+      )
+      .expression_attribute_names("#unchecked", ATTR_UNCHECKED)
+      .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED)
+      .expression_attribute_values(":kind", kind.into())
+      .expression_attribute_values(
+        ":timestamp",
+        AttributeValue::N(timestamp.to_string()),
+      )
+      .send()
+      .await
+      .map_err(|err| {
+        error!("DynamoDB client failed to query unchecked items: {:?}", err);
+        DBError::AwsSdk(err.into())
+      })?;
+
+    let Some(items) = response.items else { return Ok(vec![]); };
+    items
+      .into_iter()
+      .map(PrimaryKey::try_from)
+      .collect::<Result<Vec<_>, _>>()
+  }
+
+  /// For all rows in specified set of primary keys, removes
+  /// the "unchecked" attribute using PutItem operation in batch.
+  pub async fn batch_mark_checked(
+    &self,
+    keys: impl IntoIterator<Item = PrimaryKey>,
+  ) -> DBResult<()> {
+    let items_to_mark = database::batch_operations::batch_get(
+      &self.ddb,
+      BLOB_TABLE_NAME,
+      keys,
+      None,
+      ExponentialBackoffConfig::default(),
+    )
+    .await?;
+
+    let write_requests = items_to_mark
+      .into_iter()
+      .filter_map(|mut row| {
+        // filter out rows that are already checked
+        // to save some write capacity
+        row.remove(ATTR_UNCHECKED)?;
+        let put_request = PutRequest::builder().set_item(Some(row)).build();
+        let request = WriteRequest::builder().put_request(put_request).build();
+        Some(request)
+      })
+      .collect();
+
+    database::batch_operations::batch_write(
+      &self.ddb,
+      BLOB_TABLE_NAME,
+      write_requests,
+      ExponentialBackoffConfig::default(),
+    )
+    .await?;
+    Ok(())
+  }
+
+  /// Performs multiple DeleteItem operations in batch
+  pub async fn batch_delete_rows(
+    &self,
+    keys: impl IntoIterator<Item = PrimaryKey>,
+  ) -> DBResult<()> {
+    let write_requests = keys
+      .into_iter()
+      .map(|key| DeleteRequest::builder().set_key(Some(key.into())).build())
+      .map(|request| WriteRequest::builder().delete_request(request).build())
+      .collect::<Vec<_>>();
+
+    database::batch_operations::batch_write(
+      &self.ddb,
+      BLOB_TABLE_NAME,
+      write_requests,
+      ExponentialBackoffConfig::default(),
+    )
+    .await?;
+
+    Ok(())
+  }
 }
 
 // private helpers
diff --git a/services/blob/src/database/errors.rs b/services/blob/src/database/errors.rs
--- a/services/blob/src/database/errors.rs
+++ b/services/blob/src/database/errors.rs
@@ -18,6 +18,18 @@
   Blob(BlobDBError),
   #[display(...)]
   ItemAlreadyExists,
+  MaxRetriesExceeded,
+}
+
+impl From<comm_services_lib::database::Error> for Error {
+  fn from(value: comm_services_lib::database::Error) -> Self {
+    use comm_services_lib::database::Error as E;
+    match value {
+      E::AwsSdk(err) => Self::AwsSdk(err),
+      E::Attribute(err) => Self::Attribute(err),
+      E::MaxRetriesExceeded => Self::MaxRetriesExceeded,
+    }
+  }
 }
 
 #[derive(Debug)]