Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3346746
D8786.id30353.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D8786.id30353.diff
View Options
diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs
--- a/services/comm-services-lib/src/blob/client.rs
+++ b/services/comm-services-lib/src/blob/client.rs
@@ -3,14 +3,16 @@
use futures_util::{StreamExt, TryStreamExt};
use reqwest::{
multipart::{Form, Part},
- Body, Url,
+ Body, Method, RequestBuilder, Url,
};
-use tracing::{debug, trace, warn};
+use tracing::{debug, error, trace, warn};
// publicly re-export some reqwest types
pub use reqwest::Error as ReqwestError;
pub use reqwest::StatusCode;
+use crate::auth::UserIdentity;
+
#[derive(From, Error, Debug, Display)]
pub enum BlobServiceError {
/// HTTP Client errors, this includes:
@@ -38,6 +40,8 @@
ServerError,
#[display(...)]
UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode),
+ #[display(...)]
+ UnexpectedError,
}
/// A client interface to Blob service.
@@ -45,23 +49,60 @@
/// The `BlobServiceClient` holds a connection pool internally, so it is advised that
/// you create one and **reuse** it, by **cloning**.
///
+/// A clone is recommended for each individual client identity, that has
+/// a different `UserIdentity`.
+///
+/// # Example
+/// ```ignore
+/// // create client for user A
+/// let clientA = BlobServiceClient::new(blob_endpoint).with_user_identity(userA);
+///
+/// // reuse the client connection with different credentials
+/// let clientB = clientA.with_user_identity(userB);
+///
+/// // clientA still uses userA credentials
+/// ```
+///
+/// A clone is recommended when the client concurrently handles multiple identities -
+/// e.g. a HTTP service handling requests from different users. The `with_user_identity()`
+/// method should be used for this purpose.
+///
/// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it,
/// because it already uses an `Arc` internally.
#[derive(Clone)]
pub struct BlobServiceClient {
http_client: reqwest::Client,
blob_service_url: reqwest::Url,
+ user_identity: Option<UserIdentity>,
}
impl BlobServiceClient {
+ /// Creates a new Blob Service client instance. It is unauthenticated by default
+ /// so you need to call `with_user_identity()` afterwards:
+ /// ```ignore
+ /// let client = BlobServiceClient::new(blob_endpoint).with_user_identity(user);
+ /// ```
+ ///
+ /// **Note**: It is advised to create this client once and reuse by **cloning**.
+ /// See [`BlobServiceClient`] docs for details
pub fn new(blob_service_url: reqwest::Url) -> Self {
debug!("Creating BlobServiceClient. URL: {}", blob_service_url);
Self {
http_client: reqwest::Client::new(),
blob_service_url,
+ user_identity: None,
}
}
+ /// Clones the client and sets the [`UserIdentity`] for the new instance.
+ /// This allows the client to reuse the same connection pool for different users.
+ pub fn with_user_identity(&self, user_identity: UserIdentity) -> Self {
+ trace!("Set user_identity: {:?}", &user_identity);
+ let mut this = self.clone();
+ this.user_identity = Some(user_identity);
+ this
+ }
+
/// Downloads blob with given [`blob_hash`].
///
/// @returns a stream of blob bytes
@@ -88,8 +129,7 @@
let url = self.get_blob_url(Some(blob_hash))?;
let response = self
- .http_client
- .get(url)
+ .request(Method::GET, url)?
.send()
.await
.map_err(BlobServiceError::ClientError)?;
@@ -129,7 +169,11 @@
blob_hash: blob_hash.to_string(),
};
debug!("Request payload: {:?}", payload);
- let response = self.http_client.post(url).json(&payload).send().await?;
+ let response = self
+ .request(Method::POST, url)?
+ .json(&payload)
+ .send()
+ .await?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
@@ -162,7 +206,11 @@
};
debug!("Request payload: {:?}", payload);
- let response = self.http_client.delete(url).json(&payload).send().await?;
+ let response = self
+ .request(Method::DELETE, url)?
+ .json(&payload)
+ .send()
+ .await?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
@@ -214,7 +262,11 @@
.text("blob_hash", blob_hash.into())
.part("blob_data", Part::stream(streaming_body));
- let response = self.http_client.put(url).multipart(form).send().await?;
+ let response = self
+ .request(Method::PUT, url)?
+ .multipart(form)
+ .send()
+ .await?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
@@ -295,6 +347,24 @@
trace!("Constructed request URL: {}", url);
Ok(url)
}
+
+ fn request(
+ &self,
+ http_method: Method,
+ url: Url,
+ ) -> BlobResult<RequestBuilder> {
+ let request = self.http_client.request(http_method, url);
+ match &self.user_identity {
+ Some(user) => {
+ let token = user.as_authorization_token().map_err(|e| {
+ error!("Failed to parse authorization token: {}", e);
+ BlobServiceError::UnexpectedError
+ })?;
+ Ok(request.bearer_auth(token))
+ }
+ None => Ok(request),
+ }
+ }
}
fn handle_http_error(status_code: StatusCode) -> BlobServiceError {
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 23, 9:42 AM (18 h, 5 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2570100
Default Alt Text
D8786.id30353.diff (5 KB)
Attached To
Mode
D8786: [services-lib] Add HTTP client auth support
Attached
Detach File
Event Timeline
Log In to Comment