diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -3,15 +3,17 @@ use futures_util::{StreamExt, TryStreamExt}; use reqwest::{ multipart::{Form, Part}, - Body, + Body, Method, RequestBuilder, }; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace, warn}; // publicly re-export some reqwest types pub use reqwest::Error as ReqwestError; pub use reqwest::StatusCode; pub use reqwest::Url; +use crate::auth::UserIdentity; + #[derive(From, Error, Debug, Display)] pub enum BlobServiceError { /// HTTP Client errors, this includes: @@ -39,6 +41,8 @@ ServerError, #[display(...)] UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode), + #[display(...)] + UnexpectedError, } /// A client interface to Blob service. @@ -46,23 +50,60 @@ /// The `BlobServiceClient` holds a connection pool internally, so it is advised that /// you create one and **reuse** it, by **cloning**. /// +/// A clone is recommended for each individual client identity, that has +/// a different `UserIdentity`. +/// +/// # Example +/// ```ignore +/// // create client for user A +/// let clientA = BlobServiceClient::new(blob_endpoint).with_user_identity(userA); +/// +/// // reuse the client connection with different credentials +/// let clientB = clientA.with_user_identity(userB); +/// +/// // clientA still uses userA credentials +/// ``` +/// +/// A clone is recommended when the client concurrently handles multiple identities - +/// e.g. a HTTP service handling requests from different users. The `with_user_identity()` +/// method should be used for this purpose. +/// /// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it, /// because it already uses an `Arc` internally. #[derive(Clone)] pub struct BlobServiceClient { http_client: reqwest::Client, blob_service_url: reqwest::Url, + user_identity: Option, } impl BlobServiceClient { + /// Creates a new Blob Service client instance. It is unauthenticated by default + /// so you need to call `with_user_identity()` afterwards: + /// ```ignore + /// let client = BlobServiceClient::new(blob_endpoint).with_user_identity(user); + /// ``` + /// + /// **Note**: It is advised to create this client once and reuse by **cloning**. + /// See [`BlobServiceClient`] docs for details pub fn new(blob_service_url: reqwest::Url) -> Self { debug!("Creating BlobServiceClient. URL: {}", blob_service_url); Self { http_client: reqwest::Client::new(), blob_service_url, + user_identity: None, } } + /// Clones the client and sets the [`UserIdentity`] for the new instance. + /// This allows the client to reuse the same connection pool for different users. + pub fn with_user_identity(&self, user_identity: UserIdentity) -> Self { + trace!("Set user_identity: {:?}", &user_identity); + let mut this = self.clone(); + this.user_identity = Some(user_identity); + this + } + /// Downloads blob with given [`blob_hash`]. /// /// @returns a stream of blob bytes @@ -89,8 +130,7 @@ let url = self.get_blob_url(Some(blob_hash))?; let response = self - .http_client - .get(url) + .request(Method::GET, url)? .send() .await .map_err(BlobServiceError::ClientError)?; @@ -130,7 +170,11 @@ blob_hash: blob_hash.to_string(), }; debug!("Request payload: {:?}", payload); - let response = self.http_client.post(url).json(&payload).send().await?; + let response = self + .request(Method::POST, url)? + .json(&payload) + .send() + .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { @@ -163,7 +207,11 @@ }; debug!("Request payload: {:?}", payload); - let response = self.http_client.delete(url).json(&payload).send().await?; + let response = self + .request(Method::DELETE, url)? + .json(&payload) + .send() + .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { @@ -215,7 +263,11 @@ .text("blob_hash", blob_hash.into()) .part("blob_data", Part::stream(streaming_body)); - let response = self.http_client.put(url).multipart(form).send().await?; + let response = self + .request(Method::PUT, url)? + .multipart(form) + .send() + .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { @@ -296,6 +348,24 @@ trace!("Constructed request URL: {}", url); Ok(url) } + + fn request( + &self, + http_method: Method, + url: Url, + ) -> BlobResult { + let request = self.http_client.request(http_method, url); + match &self.user_identity { + Some(user) => { + let token = user.as_authorization_token().map_err(|e| { + error!("Failed to parse authorization token: {}", e); + BlobServiceError::UnexpectedError + })?; + Ok(request.bearer_auth(token)) + } + None => Ok(request), + } + } } fn handle_http_error(status_code: StatusCode) -> BlobServiceError {