diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs index 90c168135..28f7ed636 100644 --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -1,322 +1,322 @@ use derive_more::{Display, Error, From}; use futures_core::Stream; use futures_util::{StreamExt, TryStreamExt}; use reqwest::{ multipart::{Form, Part}, Body, Url, }; use tracing::{debug, trace, warn}; // publicly re-export some reqwest types pub use reqwest::Error as ReqwestError; pub use reqwest::StatusCode; #[derive(From, Error, Debug, Display)] pub enum BlobServiceError { /// HTTP Client errors, this includes: /// - connection failures /// - request errors (e.g. upload input stream) #[display(...)] ClientError(ReqwestError), /// Invalid Blob service URL provided #[display(...)] URLError(#[error(ignore)] String), /// Blob service returned HTTP 404 /// - blob or holder not found #[display(...)] NotFound, /// Blob service returned HTTP 409 /// - blob or holder already exists #[display(...)] AlreadyExists, /// Blob service returned HTTP 400 /// - invalid holder or blob_hash format #[display(...)] InvalidArguments, /// Blob service returned HTTP 50x #[display(...)] ServerError, #[display(...)] UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode), } /// A client interface to Blob service. // /// The `BlobServiceClient` holds a connection pool internally, so it is advised that /// you create one and **reuse** it, by **cloning**. /// /// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it, /// because it already uses an `Arc` internally. #[derive(Clone)] pub struct BlobServiceClient { http_client: reqwest::Client, blob_service_url: reqwest::Url, } impl BlobServiceClient { pub fn new(blob_service_url: reqwest::Url) -> Self { debug!("Creating BlobServiceClient. URL: {}", blob_service_url); Self { http_client: reqwest::Client::new(), blob_service_url, } } /// Downloads blob with given [`blob_hash`]. /// /// @returns a stream of blob bytes /// /// # Errors thrown /// - [BlobServiceError::NotFound] if blob with given hash does not exist /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format /// /// # Example - /// ```rust + /// ```ignore /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let mut stream = client.get("hello").await?; /// while let Some(data) = stream.try_next().await? { /// println!("Got data: {:?}", data); /// } /// ``` pub async fn get( &self, blob_hash: &str, ) -> BlobResult>>> { debug!(?blob_hash, "Get blob request"); let url = self.get_blob_url(Some(blob_hash))?; let response = self .http_client .get(url) .send() .await .map_err(BlobServiceError::ClientError)?; debug!("Response status: {}", response.status()); if response.status().is_success() { let stream = response.bytes_stream().map(|result| match result { Ok(bytes) => Ok(bytes.into()), Err(error) => { warn!("Error while streaming response: {}", error); Err(BlobServiceError::ClientError(error)) } }); return Ok(stream); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Assigns a new holder to a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::AlreadyExists` if blob already has /// a holder with given [`holder`] name. pub async fn assign_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult { debug!("Assign holder request"); let url = self.get_blob_url(None)?; let payload = AssignHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), }; debug!("Request payload: {:?}", payload); let response = self.http_client.post(url).json(&payload).send().await?; debug!("Response status: {}", response.status()); if response.status().is_success() { let AssignHolderResponse { data_exists } = response.json().await?; trace!("Data exists: {}", data_exists); return Ok(data_exists); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Revokes given holder from a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::NotFound` if blob with given hash does not exist /// or it does not have such holder pub async fn revoke_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult<()> { debug!("Revoke holder request"); let url = self.get_blob_url(None)?; let payload = RevokeHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), }; debug!("Request payload: {:?}", payload); let response = self.http_client.delete(url).json(&payload).send().await?; debug!("Response status: {}", response.status()); if response.status().is_success() { trace!("Revoke holder request successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash /// already exists. /// /// # Example - /// ```rust + /// ```ignore /// use std::io::{Error, ErrorKind}; /// /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let stream = async_stream::stream! { /// yield Ok(vec![1, 2, 3]); /// yield Ok(vec![4, 5, 6]); /// yield Err(Error::new(ErrorKind::Other, "Oops")); /// }; /// client.upload_blob(&blob_hash, stream).await?; /// ``` pub async fn upload_blob( &self, blob_hash: H, data_stream: S, ) -> BlobResult<()> where H: Into, S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Vec: From, { debug!("Upload blob request"); let url = self.get_blob_url(None)?; let stream = data_stream.map_ok(Vec::from); let streaming_body = Body::wrap_stream(stream); let form = Form::new() .text("blob_hash", blob_hash.into()) .part("blob_data", Part::stream(streaming_body)); let response = self.http_client.put(url).multipart(form).send().await?; debug!("Response status: {}", response.status()); if response.status().is_success() { trace!("Blob upload successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// A wrapper around [`BlobServiceClient::assign_holder`] and [`BlobServiceClient::upload_blob`]. /// /// Assigns a new holder to a blob represented by [`blob_hash`]. If the blob does not exist, /// uploads the data from [`data_stream`]. pub async fn simple_put( &self, blob_hash: &str, holder: &str, data_stream: S, ) -> BlobResult where S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Vec: From, { trace!("Begin simple put. Assigning holder..."); let data_exists = self.assign_holder(blob_hash, holder).await?; if data_exists { trace!("Blob data already exists. Skipping upload."); return Ok(false); } trace!("Uploading blob data..."); let Err(upload_error) = self.upload_blob(blob_hash, data_stream).await else { return Ok(true); }; trace!(%blob_hash, %holder, "Revoking holder due to upload failure"); self.schedule_revoke_holder(blob_hash, holder); Err(upload_error) } /// Revokes holder in a separate task. Useful to clean up after /// upload failure without blocking the current task. pub fn schedule_revoke_holder( &self, blob_hash: impl Into, holder: impl Into, ) { let this = self.clone(); let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); tokio::spawn(async move { if let Err(err) = this.revoke_holder(&blob_hash, &holder).await { warn!("Failed to revoke holder: {0:?} - {0}", err); } }); } } // private helper methods impl BlobServiceClient { fn get_blob_url( &self, blob_hash: Option<&str>, ) -> Result { let path = match blob_hash { Some(hash) => format!("/blob/{}", hash), None => "/blob".to_string(), }; let url = self .blob_service_url .join(&path) .map_err(|err| BlobServiceError::URLError(err.to_string()))?; trace!("Constructed request URL: {}", url); Ok(url) } } fn handle_http_error(status_code: StatusCode) -> BlobServiceError { match status_code { StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments, StatusCode::NOT_FOUND => BlobServiceError::NotFound, StatusCode::CONFLICT => BlobServiceError::AlreadyExists, code if code.is_server_error() => BlobServiceError::ServerError, code => BlobServiceError::UnexpectedHttpStatus(code), } } type BlobResult = Result; #[derive(serde::Deserialize)] struct AssignHolderResponse { data_exists: bool, } #[derive(Debug, serde::Serialize)] struct AssignHolderRequest { blob_hash: String, holder: String, } // they have the same layout so we can simply alias type RevokeHolderRequest = AssignHolderRequest; diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs index 85aeccabe..1040a3cae 100644 --- a/services/comm-services-lib/src/database.rs +++ b/services/comm-services-lib/src/database.rs @@ -1,366 +1,367 @@ use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::{DateTime, Utc}; use std::fmt::{Display, Formatter}; use std::num::ParseIntError; use std::str::FromStr; // # Useful type aliases // Rust exports `pub type` only into the so-called "type namespace", but in // order to use them e.g. with the `TryFromAttribute` trait, they also need // to be exported into the "value namespace" which is what `pub use` does. // // To overcome that, a dummy module is created and aliases are re-exported // with `pub use` construct mod aliases { use aws_sdk_dynamodb::types::AttributeValue; use std::collections::HashMap; pub type AttributeMap = HashMap; } pub use self::aliases::AttributeMap; // # Error handling #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), } #[derive(Debug)] pub enum Value { AttributeValue(Option), String(String), } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { attribute_name: String, attribute_value: Value, attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] TimestampOutOfRange, #[display(...)] InvalidTimestamp(chrono::ParseError), #[display(...)] InvalidNumberFormat(ParseIntError), } /// Conversion trait for [`AttributeValue`] /// /// Types implementing this trait are able to do the following: -/// ```rust +/// ```ignore /// use comm_services_lib::database::{TryFromAttribute, AttributeTryInto}; /// -/// let foo = SomeType::try_from_attr("MyAttribute", Some(attribute)); +/// let foo = SomeType::try_from_attr("MyAttribute", Some(attribute))?; /// /// // if `AttributeTryInto` is imported, also: -/// let bar = Some(attribute).attr_try_into("MyAttribute"); +/// let bar = Some(attribute).attr_try_into("MyAttribute")?; +/// ``` pub trait TryFromAttribute: Sized { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result; } /// Do NOT implement this trait directly. Implement [`TryFromAttribute`] instead pub trait AttributeTryInto { fn attr_try_into( self, attribute_name: impl Into, ) -> Result; } // Automatic attr_try_into() for all attribute values // that have TryFromAttribute implemented impl AttributeTryInto for Option { fn attr_try_into( self, attribute_name: impl Into, ) -> Result { T::try_from_attr(attribute_name, self) } } impl TryFromAttribute for String { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for bool { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::Bool(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for DateTime { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { match &attribute { Some(AttributeValue::S(datetime)) => datetime.parse().map_err(|e| { DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::InvalidTimestamp(e), ) }), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for AttributeMap { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for Vec { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::B(data)) => Ok(data.into_inner()), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } #[deprecated = "Use `String::try_from_attr()` instead"] pub fn parse_string_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { String::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `bool::try_from_attr()` instead"] pub fn parse_bool_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { bool::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `DateTime::::try_from_attr()` instead"] pub fn parse_datetime_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { DateTime::::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `AttributeMap::try_from_attr()` instead"] pub fn parse_map_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { attribute_value.attr_try_into(attribute_name) } pub fn parse_int_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result where T: FromStr, { match &attribute_value { Some(AttributeValue::N(numeric_str)) => { parse_integer(attribute_name, numeric_str) } Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } /// Parses the UTC timestamp in milliseconds from a DynamoDB numeric attribute pub fn parse_timestamp_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { let attribute_name: String = attribute_name.into(); let timestamp = parse_int_attribute::( attribute_name.clone(), attribute_value.clone(), )?; let naive_datetime = chrono::NaiveDateTime::from_timestamp_millis(timestamp) .ok_or_else(|| { DBItemError::new( attribute_name, Value::AttributeValue(attribute_value), DBItemAttributeError::TimestampOutOfRange, ) })?; Ok(DateTime::from_utc(naive_datetime, Utc)) } pub fn parse_integer( attribute_name: impl Into, attribute_value: &str, ) -> Result where T: FromStr, { attribute_value.parse::().map_err(|e| { DBItemError::new( attribute_name.into(), Value::String(attribute_value.into()), DBItemAttributeError::InvalidNumberFormat(e), ) }) } #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_integer() { assert!(parse_integer::("some_attr", "123").is_ok()); assert!(parse_integer::("negative", "-123").is_ok()); assert!(parse_integer::("float", "3.14").is_err()); assert!(parse_integer::("NaN", "foo").is_err()); assert!(parse_integer::("negative_uint", "-123").is_err()); assert!(parse_integer::("too_large", "65536").is_err()); } #[test] fn test_parse_timestamp() { let timestamp = Utc::now().timestamp_millis(); let attr = AttributeValue::N(timestamp.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_ok()); assert_eq!(parsed_timestamp.unwrap().timestamp_millis(), timestamp); } #[test] fn test_parse_invalid_timestamp() { let attr = AttributeValue::N("foo".to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); } #[test] fn test_parse_timestamp_out_of_range() { let attr = AttributeValue::N(i64::MAX.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); assert!(matches!( parsed_timestamp.unwrap_err().attribute_error, DBItemAttributeError::TimestampOutOfRange )); } }