diff --git a/services/backup/Cargo.lock b/services/backup/Cargo.lock --- a/services/backup/Cargo.lock +++ b/services/backup/Cargo.lock @@ -955,6 +955,7 @@ "aws-sdk-dynamodb", "aws-types", "base64 0.21.2", + "bytes", "chrono", "derive_more", "futures-core", diff --git a/services/comm-services-lib/Cargo.lock b/services/comm-services-lib/Cargo.lock --- a/services/comm-services-lib/Cargo.lock +++ b/services/comm-services-lib/Cargo.lock @@ -809,6 +809,7 @@ "aws-sdk-dynamodb", "aws-types", "base64 0.21.0", + "bytes", "chrono", "derive_more", "futures-core", diff --git a/services/comm-services-lib/Cargo.toml b/services/comm-services-lib/Cargo.toml --- a/services/comm-services-lib/Cargo.toml +++ b/services/comm-services-lib/Cargo.toml @@ -5,7 +5,12 @@ license = "BSD-3-Clause" [features] -blob-client = ["dep:reqwest", "dep:futures-core", "dep:futures-util"] +blob-client = [ + "dep:bytes", + "dep:reqwest", + "dep:futures-core", + "dep:futures-util", +] http = [ "dep:actix-cors", "dep:actix-web", @@ -30,6 +35,7 @@ tracing = "0.1" anyhow = "1.0.74" # blob client dependencies +bytes = { version = "1.4", optional = true } futures-core = { version = "0.3", optional = true } futures-util = { version = "0.3", optional = true } reqwest = { version = "0.11", features = [ diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -1,6 +1,7 @@ +use bytes::Bytes; use derive_more::{Display, Error, From}; use futures_core::Stream; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::StreamExt; use reqwest::{ multipart::{Form, Part}, Body, Method, RequestBuilder, @@ -125,7 +126,7 @@ pub async fn get( &self, blob_hash: &str, - ) -> BlobResult>>> { + ) -> BlobResult>> { debug!(?blob_hash, "Get blob request"); let url = self.get_blob_url(Some(blob_hash))?; @@ -138,7 +139,7 @@ debug!("Response status: {}", response.status()); if response.status().is_success() { let stream = response.bytes_stream().map(|result| match result { - Ok(bytes) => Ok(bytes.into()), + Ok(bytes) => Ok(bytes), Err(error) => { warn!("Error while streaming response: {}", error); Err(BlobServiceError::ClientError(error)) @@ -252,13 +253,12 @@ H: Into, S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, - Vec: From, + Bytes: From, { debug!("Upload blob request"); let url = self.get_blob_url(None)?; - let stream = data_stream.map_ok(Vec::from); - let streaming_body = Body::wrap_stream(stream); + let streaming_body = Body::wrap_stream(data_stream); let form = Form::new() .text("blob_hash", blob_hash.into()) .part("blob_data", Part::stream(streaming_body)); @@ -295,7 +295,7 @@ where S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, - Vec: From, + Bytes: From, { trace!("Begin simple put. Assigning holder..."); let data_exists = self.assign_holder(blob_hash, holder).await?; diff --git a/services/comm-services-lib/src/lib.rs b/services/comm-services-lib/src/lib.rs --- a/services/comm-services-lib/src/lib.rs +++ b/services/comm-services-lib/src/lib.rs @@ -5,3 +5,12 @@ #[cfg(feature = "http")] pub mod http; pub mod tools; + +mod reexports { + #[cfg(feature = "blob-client")] + pub use {bytes, reqwest}; + + #[cfg(feature = "http")] + pub use {actix_web, http}; +} +pub use reexports::*; diff --git a/services/reports/Cargo.lock b/services/reports/Cargo.lock --- a/services/reports/Cargo.lock +++ b/services/reports/Cargo.lock @@ -908,6 +908,7 @@ "aws-sdk-dynamodb", "aws-types", "base64 0.21.3", + "bytes", "chrono", "derive_more", "futures-core",