Page MenuHomePhabricator

D8786.id30353.diff
No OneTemporary

D8786.id30353.diff

diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs
--- a/services/comm-services-lib/src/blob/client.rs
+++ b/services/comm-services-lib/src/blob/client.rs
@@ -3,14 +3,16 @@
use futures_util::{StreamExt, TryStreamExt};
use reqwest::{
multipart::{Form, Part},
- Body, Url,
+ Body, Method, RequestBuilder, Url,
};
-use tracing::{debug, trace, warn};
+use tracing::{debug, error, trace, warn};
// publicly re-export some reqwest types
pub use reqwest::Error as ReqwestError;
pub use reqwest::StatusCode;
+use crate::auth::UserIdentity;
+
#[derive(From, Error, Debug, Display)]
pub enum BlobServiceError {
/// HTTP Client errors, this includes:
@@ -38,6 +40,8 @@
ServerError,
#[display(...)]
UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode),
+ #[display(...)]
+ UnexpectedError,
}
/// A client interface to Blob service.
@@ -45,23 +49,60 @@
/// The `BlobServiceClient` holds a connection pool internally, so it is advised that
/// you create one and **reuse** it, by **cloning**.
///
+/// A clone is recommended for each individual client identity, that has
+/// a different `UserIdentity`.
+///
+/// # Example
+/// ```ignore
+/// // create client for user A
+/// let clientA = BlobServiceClient::new(blob_endpoint).with_user_identity(userA);
+///
+/// // reuse the client connection with different credentials
+/// let clientB = clientA.with_user_identity(userB);
+///
+/// // clientA still uses userA credentials
+/// ```
+///
+/// A clone is recommended when the client concurrently handles multiple identities -
+/// e.g. a HTTP service handling requests from different users. The `with_user_identity()`
+/// method should be used for this purpose.
+///
/// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it,
/// because it already uses an `Arc` internally.
#[derive(Clone)]
pub struct BlobServiceClient {
http_client: reqwest::Client,
blob_service_url: reqwest::Url,
+ user_identity: Option<UserIdentity>,
}
impl BlobServiceClient {
+ /// Creates a new Blob Service client instance. It is unauthenticated by default
+ /// so you need to call `with_user_identity()` afterwards:
+ /// ```ignore
+ /// let client = BlobServiceClient::new(blob_endpoint).with_user_identity(user);
+ /// ```
+ ///
+ /// **Note**: It is advised to create this client once and reuse by **cloning**.
+ /// See [`BlobServiceClient`] docs for details
pub fn new(blob_service_url: reqwest::Url) -> Self {
debug!("Creating BlobServiceClient. URL: {}", blob_service_url);
Self {
http_client: reqwest::Client::new(),
blob_service_url,
+ user_identity: None,
}
}
+ /// Clones the client and sets the [`UserIdentity`] for the new instance.
+ /// This allows the client to reuse the same connection pool for different users.
+ pub fn with_user_identity(&self, user_identity: UserIdentity) -> Self {
+ trace!("Set user_identity: {:?}", &user_identity);
+ let mut this = self.clone();
+ this.user_identity = Some(user_identity);
+ this
+ }
+
/// Downloads blob with given [`blob_hash`].
///
/// @returns a stream of blob bytes
@@ -88,8 +129,7 @@
let url = self.get_blob_url(Some(blob_hash))?;
let response = self
- .http_client
- .get(url)
+ .request(Method::GET, url)?
.send()
.await
.map_err(BlobServiceError::ClientError)?;
@@ -129,7 +169,11 @@
blob_hash: blob_hash.to_string(),
};
debug!("Request payload: {:?}", payload);
- let response = self.http_client.post(url).json(&payload).send().await?;
+ let response = self
+ .request(Method::POST, url)?
+ .json(&payload)
+ .send()
+ .await?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
@@ -162,7 +206,11 @@
};
debug!("Request payload: {:?}", payload);
- let response = self.http_client.delete(url).json(&payload).send().await?;
+ let response = self
+ .request(Method::DELETE, url)?
+ .json(&payload)
+ .send()
+ .await?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
@@ -214,7 +262,11 @@
.text("blob_hash", blob_hash.into())
.part("blob_data", Part::stream(streaming_body));
- let response = self.http_client.put(url).multipart(form).send().await?;
+ let response = self
+ .request(Method::PUT, url)?
+ .multipart(form)
+ .send()
+ .await?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
@@ -295,6 +347,24 @@
trace!("Constructed request URL: {}", url);
Ok(url)
}
+
+ fn request(
+ &self,
+ http_method: Method,
+ url: Url,
+ ) -> BlobResult<RequestBuilder> {
+ let request = self.http_client.request(http_method, url);
+ match &self.user_identity {
+ Some(user) => {
+ let token = user.as_authorization_token().map_err(|e| {
+ error!("Failed to parse authorization token: {}", e);
+ BlobServiceError::UnexpectedError
+ })?;
+ Ok(request.bearer_auth(token))
+ }
+ None => Ok(request),
+ }
+ }
}
fn handle_http_error(status_code: StatusCode) -> BlobServiceError {

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 9:42 AM (18 h, 5 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2570100
Default Alt Text
D8786.id30353.diff (5 KB)

Event Timeline