diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs index e0fcf2c82..160687708 100644 --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -1,295 +1,289 @@ -// TODO: Remove this when possible -#![allow(unused)] - use aws_sdk_dynamodb::{ operation::put_item::PutItemOutput, - types::{ - AttributeValue, Delete, DeleteRequest, KeysAndAttributes, PutRequest, - TransactWriteItem, Update, WriteRequest, - }, + types::{AttributeValue, Delete, TransactWriteItem, Update}, Error as DynamoDBError, }; use chrono::Utc; use comm_services_lib::database::parse_string_attribute; use std::{collections::HashMap, sync::Arc}; use tracing::{debug, error, trace}; use crate::constants::db::*; use super::errors::{BlobDBError, Error as DBError}; use super::types::*; #[derive(Clone)] pub struct DatabaseClient { ddb: Arc, } /// public interface implementation impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { ddb: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } /// Gets a blob item row from the database by its blob hash /// Returns None if the blob item is not found pub async fn get_blob_item( &self, blob_hash: impl Into, ) -> DBResult> { let key = PrimaryKey::for_blob_item(blob_hash); self .get_raw_item(key.clone()) .await? .map(BlobItemRow::try_from) .transpose() } /// Inserts a new blob item row into the database. Returns Error /// if the item already exists. pub async fn put_blob_item(&self, blob_item: BlobItemInput) -> DBResult<()> { let item = HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( ATTR_HOLDER.to_string(), AttributeValue::S(BLOB_ITEM_ROW_HOLDER_VALUE.into()), ), ( ATTR_S3_PATH.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), (ATTR_UNCHECKED.to_string(), UncheckedKind::Blob.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes blob item row. Doesn't delete its holders. pub async fn delete_blob_item( &self, blob_hash: impl Into, ) -> DBResult<()> { let key = PrimaryKey::for_blob_item(blob_hash); self .ddb .delete_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to delete blob item: {:?}", err); DBError::AwsSdk(err.into()) })?; Ok(()) } // Inserts a new holder assignment row into the database. Returns Error // if the item already exists or holder format is invalid. pub async fn put_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let item = HashMap::from([ (ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_hash)), (ATTR_HOLDER.to_string(), AttributeValue::S(holder)), (ATTR_UNCHECKED.to_string(), UncheckedKind::Holder.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes a holder assignment row from the table. /// If the blob item for given holder assignment exists, it will be marked as unchecked. /// /// Returns Error if the holder format is invalid or race condition happened. /// Doesn't fail if the holder assignment didn't exist before. pub async fn delete_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let mut transaction = Vec::new(); // delete the holder row let assignment_key = PrimaryKey { blob_hash: blob_hash.clone(), holder: holder.into(), }; let delete_request = Delete::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(assignment_key.into())) .build(); transaction .push(TransactWriteItem::builder().delete(delete_request).build()); // mark the blob item as unchecked if exists let blob_primary_key = PrimaryKey::for_blob_item(blob_hash); if self.get_raw_item(blob_primary_key.clone()).await?.is_some() { let update_request = Update::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(blob_primary_key.into())) // even though we checked that the blob item exists, we still need to check it again // using DDB built-in conditions in case it was deleted in meantime .condition_expression( "attribute_exists(#blob_hash) AND attribute_exists(#holder)", ) .update_expression("SET #unchecked = :unchecked, #last_modified = :now") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_names("#unchecked", ATTR_UNCHECKED) .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED) .expression_attribute_values(":unchecked", UncheckedKind::Blob.into()) .expression_attribute_values( ":now", AttributeValue::N(Utc::now().timestamp_millis().to_string()), ) .build(); transaction .push(TransactWriteItem::builder().update(update_request).build()); } self .ddb .transact_write_items() .set_transact_items(Some(transaction)) .send() .await .map_err(|err| { debug!("DynamoDB client failed to run transaction: {:?}", err); DBError::AwsSdk(err.into()) })?; Ok(()) } /// Queries the table for a list of holders for given blob hash. /// Optionally limits the number of results. pub async fn list_blob_holders( &self, blob_hash: impl Into, limit: Option, ) -> DBResult> { let response = self .ddb .query() .table_name(BLOB_TABLE_NAME) .projection_expression("#holder") .key_condition_expression("#blob_hash = :blob_hash") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_values( ":blob_hash", AttributeValue::S(blob_hash.into()), ) .consistent_read(true) .set_limit(limit) .send() .await .map_err(|err| { error!("DynamoDB client failed to query holders: {:?}", err); DBError::AwsSdk(err.into()) })?; let Some(items) = response.items else { return Ok(vec![]); }; items .into_iter() .filter_map(|mut row| { // filter out rows that are blob items // we cannot do it in key condition expression - it doesn't support the <> operator // filter expression doesn't work either - it doesn't support filtering by sort key match parse_string_attribute(ATTR_HOLDER, row.remove(ATTR_HOLDER)) { Ok(value) if value.as_str() == BLOB_ITEM_ROW_HOLDER_VALUE => None, holder => Some(holder), } }) .collect::, _>>() .map_err(|err| DBError::Attribute(err)) } } // private helpers impl DatabaseClient { /// inserts a new item into the table using PutItem. Returns /// error if the item already exists async fn insert_item( &self, mut item: RawAttributes, ) -> DBResult { // add metadata attributes common for all types of rows let now = Utc::now().timestamp_millis(); item.insert( ATTR_CREATED_AT.to_string(), AttributeValue::N(now.to_string()), ); item.insert( ATTR_LAST_MODIFIED.to_string(), AttributeValue::N(now.to_string()), ); self .ddb .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) // make sure we don't accidentaly overwrite existing row .condition_expression( "attribute_not_exists(#blob_hash) AND attribute_not_exists(#holder)", ) .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .send() .await .map_err(|err| match DynamoDBError::from(err) { DynamoDBError::ConditionalCheckFailedException(e) => { debug!("DynamoDB client failed to insert: item already exists"); trace!("Conditional check failed with error: {}", e); DBError::ItemAlreadyExists } err => { debug!("DynamoDB client failed to insert: {:?}", err); DBError::AwsSdk(err) } }) } /// Gets a single row from the table using GetItem, without parsing it async fn get_raw_item( &self, key: PrimaryKey, ) -> DBResult> { self .ddb .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to get item: {:?}", err); DBError::AwsSdk(err.into()) }) .map(|response| response.item) } } fn validate_holder(holder: &str) -> DBResult<()> { if holder == BLOB_ITEM_ROW_HOLDER_VALUE { debug!("Invalid holder: {}", holder); return Err(DBError::Blob(BlobDBError::InvalidInput(holder.to_string()))); } Ok(()) } diff --git a/services/blob/src/http/context.rs b/services/blob/src/http/errors.rs similarity index 58% rename from services/blob/src/http/context.rs rename to services/blob/src/http/errors.rs index 741dc49d5..3c0cb5724 100644 --- a/services/blob/src/http/context.rs +++ b/services/blob/src/http/errors.rs @@ -1,133 +1,73 @@ -use crate::database::errors::{BlobDBError, Error as DBError}; -use crate::database::old::{BlobItem, DatabaseClient, ReverseIndexItem}; -use crate::s3::{Error as S3Error, S3Client, S3Path}; -use crate::service::BlobServiceError; use actix_web::error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, }; use actix_web::{Error as HttpError, HttpResponse, ResponseError}; -use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; use http::StatusCode; use tracing::{debug, error, trace, warn}; -/// This structure is passed to every HTTP request handler -/// It should be cloneable because each HTTP worker thread receives a copy -#[derive(Clone)] -pub struct AppContext { - pub db: DatabaseClient, - pub s3: S3Client, -} - -impl AppContext { - pub async fn find_s3_path_by_reverse_index( - &self, - reverse_index_item: &ReverseIndexItem, - ) -> Result { - let blob_hash = &reverse_index_item.blob_hash; - match self.db.find_blob_item(&blob_hash).await { - Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), - Ok(None) => { - debug!("No blob found for {:?}", reverse_index_item); - Err(ErrorNotFound("blob not found")) - } - Err(err) => Err(handle_db_error(err)), - } - } -} - -pub fn handle_db_error(db_error: DBError) -> HttpError { - match db_error { - DBError::AwsSdk(DynamoDBError::InternalServerError(_)) - | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( - _, - )) - | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { - warn!("AWS transient error occurred"); - ErrorServiceUnavailable("please retry") - } - DBError::Blob(blob_err) => { - error!("Encountered Blob database error: {}", blob_err); - ErrorInternalServerError("Internal error") - } - err => { - error!("Encountered an unexpected error: {}", err); - ErrorInternalServerError("unexpected error") - } - } -} - -pub fn handle_s3_error(s3_error: S3Error) -> HttpError { - match s3_error { - S3Error::EmptyUpload => { - warn!("Empty upload. Aborting"); - ErrorBadRequest("Empty upload") - } - err => { - error!("Encountered S3 error: {:?}", err); - ErrorInternalServerError("Internal error") - } - } -} +use crate::database::errors::{BlobDBError, Error as DBError}; +use crate::s3::Error as S3Error; +use crate::service::BlobServiceError; pub(super) fn handle_blob_service_error(err: &BlobServiceError) -> HttpError { trace!("Handling blob service error: {:?}", err); match err { BlobServiceError::BlobNotFound => ErrorNotFound("not found"), BlobServiceError::BlobAlreadyExists | BlobServiceError::DB(DBError::ItemAlreadyExists) => { ErrorConflict("blob already exists") } BlobServiceError::DB(db_err) => match db_err { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk( DynamoDBError::ProvisionedThroughputExceededException(_), ) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } DBError::Blob(BlobDBError::InvalidInput(_)) => { ErrorBadRequest("bad request") } unexpected => { error!("Received an unexpected DB error: {0:?} - {0}", unexpected); ErrorInternalServerError("server error") } }, BlobServiceError::S3(s3_err) => match s3_err { S3Error::AwsSdk(aws_sdk_s3::Error::NotFound(_)) | S3Error::AwsSdk(aws_sdk_s3::Error::NoSuchKey(_)) => { error!("Data inconsistency! Blob is present in database but not present in S3!"); ErrorInternalServerError("server error") } S3Error::EmptyUpload => ErrorBadRequest("empty upload"), unexpected => { error!("Received an unexpected S3 error: {0:?} - {0}", unexpected); ErrorInternalServerError("server error") } }, BlobServiceError::InputError(err) => { debug!("Received request input error: {0:?} - {0}", err); ErrorBadRequest("bad request") } err => { error!("Received an unexpected error: {0:?} - {0}", err); ErrorInternalServerError("server error") } } } /// This allow us to `await?` blob service calls in HTTP handlers impl ResponseError for BlobServiceError { fn error_response(&self) -> HttpResponse { handle_blob_service_error(self).error_response() } fn status_code(&self) -> StatusCode { handle_blob_service_error(self) .as_response_error() .status_code() } } diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs index c4a848191..25ca401a7 100644 --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,224 +1,218 @@ -#![allow(unused)] - -use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; -use crate::database::old::{BlobItem, ReverseIndexItem}; -use crate::http::context::{handle_blob_service_error, handle_s3_error}; +use crate::http::errors::handle_blob_service_error; use crate::service::BlobService; use crate::tools::BoxedError; use crate::validate_identifier; -use super::{handle_db_error, AppContext}; use actix_web::error::{ - ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, - ErrorRangeNotSatisfiable, + ErrorBadRequest, ErrorInternalServerError, ErrorRangeNotSatisfiable, }; use actix_web::{ http::header::{ByteRangeSpec, Range}, web, Error as HttpError, HttpResponse, }; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; use tracing::{debug, info, instrument, trace, warn}; use tracing_futures::Instrument; /// Returns a tuple of first and last byte number (inclusive) represented by given range header. fn parse_range_header( range_header: &Option>, file_size: u64, ) -> actix_web::Result<(u64, u64)> { let (range_start, range_end): (u64, u64) = match range_header { Some(web::Header(Range::Bytes(ranges))) => { if ranges.len() > 1 { return Err(ErrorBadRequest("Multiple ranges not supported")); } match ranges[0] { ByteRangeSpec::FromTo(start, end) => { if end >= file_size || start > end { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (start, end) } ByteRangeSpec::From(start) => { if start >= file_size { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (start, file_size - 1) } ByteRangeSpec::Last(length) => { if length >= file_size { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (file_size - length, file_size - 1) } } } Some(web::Header(Range::Unregistered(..))) => { return Err(ErrorBadRequest("Use ranges registered at IANA")); } None => (0, file_size - 1), }; Ok((range_start, range_end)) } #[instrument( name = "get_blob", skip_all, fields(blob_hash = %params.as_ref().as_str(), s3_path)) ] pub async fn get_blob_handler( service: web::Data, params: web::Path, range_header: Option>, ) -> actix_web::Result { info!("Get blob request"); let blob_hash = params.into_inner(); validate_identifier!(blob_hash); trace!("Initializing download session"); let mut download = service.create_download(blob_hash).await?; let total_size = download.blob_size; let (range_start, range_end): (u64, u64) = parse_range_header(&range_header, total_size)?; download.set_byte_range(range_start..=range_end); let content_length = download.download_size(); let stream = download .into_stream() .map(|data| match data { Ok(bytes) => Ok(web::Bytes::from(bytes)), Err(err) => { warn!("Error during download stream: {:?}", err); Err(handle_blob_service_error(&err)) } }) .in_current_span(); if range_header.is_some() { return Ok( HttpResponse::PartialContent() .content_type("application/octet-stream") .append_header(("Content-Length", content_length)) .append_header(( "Content-Range", format!("bytes {}-{}/{}", range_start, range_end, total_size), )) .streaming(Box::pin(stream)), ); } Ok( HttpResponse::Ok() .content_type("application/octet-stream") .append_header(("Content-Length", content_length)) .streaming(Box::pin(stream)), ) } #[derive(Deserialize, Debug)] pub struct AssignHolderPayload { holder: String, blob_hash: String, } #[derive(Serialize)] struct AssignHolderResponnse { data_exists: bool, } #[instrument(name = "assign_holder", skip(service))] pub async fn assign_holder_handler( service: web::Data, payload: web::Json, ) -> actix_web::Result { info!("Assign holder request"); let AssignHolderPayload { holder, blob_hash } = payload.into_inner(); validate_identifier!(holder); validate_identifier!(blob_hash); let data_exists = service.assign_holder(blob_hash, holder).await?; Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) } async fn get_blob_hash_field( multipart_payload: &mut actix_multipart::Multipart, ) -> Result { let Some(mut field) = multipart_payload.try_next().await? else { debug!("Malfolmed multipart request"); return Err(ErrorBadRequest("Bad request")); }; if field.name() != "blob_hash" { warn!("Blob hash is required as a first form field"); return Err(ErrorBadRequest("Bad request")); } let mut buf = Vec::new(); while let Some(chunk) = field.try_next().await? { buf.extend_from_slice(&chunk); } let blob_hash = String::from_utf8(buf) .map_err(|_| ErrorInternalServerError("Internal error"))?; validate_identifier!(blob_hash); return Ok(blob_hash); } #[instrument(skip_all, name = "upload_blob", fields(blob_hash))] pub async fn upload_blob_handler( service: web::Data, mut payload: actix_multipart::Multipart, ) -> actix_web::Result { info!("Upload blob request"); let blob_hash = get_blob_hash_field(&mut payload).await?; debug!("Received blob_hash: {}", &blob_hash); tracing::Span::current().record("blob_hash", &blob_hash); trace!("Receiving blob data"); let stream: AsyncStream, BoxedError>, _> = try_stream! { while let Some(mut field) = payload.try_next().await.map_err(Box::new)? { let field_name = field.name(); if field_name != "blob_data" { warn!( field_name, "Malfolmed request: 'blob_data' multipart field expected." ); Err(ErrorBadRequest("Bad request"))?; } while let Some(chunk) = field.try_next().await.map_err(Box::new)? { yield chunk.to_vec(); } } trace!("Stream done"); }; service.put_blob(blob_hash, stream).await?; Ok(HttpResponse::NoContent().finish()) } #[derive(Deserialize, Debug)] pub struct RemoveHolderPayload { holder: String, blob_hash: String, } #[instrument(name = "remove_holder", skip(service))] pub async fn remove_holder_handler( service: web::Data, payload: web::Json, ) -> actix_web::Result { info!("Revoke holder request"); let RemoveHolderPayload { holder, blob_hash } = payload.into_inner(); validate_identifier!(holder); validate_identifier!(blob_hash); service.revoke_holder(blob_hash, holder).await?; Ok(HttpResponse::NoContent().finish()) } diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index 5688dc66f..a58c6b66a 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,73 +1,57 @@ -use crate::database::old::DatabaseClient; -use crate::s3::S3Client; use crate::{config::CONFIG, service::BlobService}; use actix_cors::Cors; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; -mod context; -use context::AppContext; +mod errors; mod utils; mod handlers { pub(super) mod blob; - - // convenience exports to be used in handlers - use super::context::{handle_db_error, AppContext}; } fn cors_config() -> Cors { if CONFIG.is_sandbox { // All origins, methods, request headers and exposed headers allowed. // Credentials supported. Max age 1 hour. Does not send wildcard. return Cors::permissive(); } Cors::default() .allowed_origin("https://web.comm.app") // for local development using prod service .allowed_origin("http://localhost:3000") .allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]) .allow_any_header() .expose_any_header() } -pub async fn run_http_server( - db_client: DatabaseClient, - s3_client: S3Client, - blob_service: BlobService, -) -> Result<()> { +pub async fn run_http_server(blob_service: BlobService) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { - // context that is passed to every handler - let ctx = AppContext { - db: db_client.to_owned(), - s3: s3_client.to_owned(), - }; App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(cors_config()) - .app_data(web::Data::new(ctx)) .app_data(web::Data::new(blob_service.to_owned())) .service( web::resource("/blob/{holder}") .route(web::get().to(handlers::blob::get_blob_handler)), ) .service( web::resource("/blob") .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)) .route(web::delete().to(handlers::blob::remove_holder_handler)), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs index fde55bece..15e048220 100644 --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -1,49 +1,49 @@ pub mod config; pub mod constants; pub mod database; pub mod grpc; pub mod http; pub mod s3; pub mod service; pub mod tools; use anyhow::Result; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use crate::service::BlobServiceConfig; fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { configure_logging()?; config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; let db = database::old::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); let new_db = database::DatabaseClient::new(&aws_config); let service = service::BlobService::new( new_db, s3.clone(), BlobServiceConfig { instant_delete_orphaned_blobs: true, ..Default::default() }, ); tokio::select! { - http_result = crate::http::run_http_server(db.clone(), s3.clone(), service) => http_result, + http_result = crate::http::run_http_server(service) => http_result, grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result, } }