diff --git a/services/commtest/Cargo.lock b/services/commtest/Cargo.lock --- a/services/commtest/Cargo.lock +++ b/services/commtest/Cargo.lock @@ -223,6 +223,8 @@ "openssl", "prost", "rand", + "reqwest", + "serde", "serde_json", "sha2", "tokio", @@ -245,6 +247,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" + [[package]] name = "cpufeatures" version = "0.2.5" @@ -378,6 +396,15 @@ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" +dependencies = [ + "cfg-if", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -665,6 +692,19 @@ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "idna" version = "0.3.0" @@ -694,6 +734,12 @@ "cfg-if", ] +[[package]] +name = "ipnet" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" + [[package]] name = "itertools" version = "0.10.5" @@ -709,6 +755,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +[[package]] +name = "js-sys" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -717,9 +772,9 @@ [[package]] name = "libc" -version = "0.2.138" +version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "log" @@ -748,6 +803,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" version = "0.8.5" @@ -766,6 +831,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "num_cpus" version = "1.14.0" @@ -830,6 +913,12 @@ "syn 1.0.107", ] +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-sys" version = "0.9.90" @@ -1059,6 +1148,44 @@ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" +dependencies = [ + "base64 0.21.2", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "mime_guess", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "rustc_version" version = "0.4.0" @@ -1080,6 +1207,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +[[package]] +name = "schannel" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3" +dependencies = [ + "windows-sys", +] + [[package]] name = "sec1" version = "0.3.0" @@ -1093,6 +1229,29 @@ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc758eb7bffce5b308734e9b0c1468893cae9ff70ebf13e7090be8dcbcc83a8" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f51d0c0d83bec45f16480d0ce0058397a69e48fcdc52d1dc8855fb68acbd31a7" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.16" @@ -1101,18 +1260,18 @@ [[package]] name = "serde" -version = "1.0.160" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" +checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.160" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", @@ -1130,6 +1289,18 @@ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.5" @@ -1293,6 +1464,16 @@ "syn 1.0.107", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.11" @@ -1528,6 +1709,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -1615,9 +1805,9 @@ [[package]] name = "wasm-bindgen" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" +checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1625,9 +1815,9 @@ [[package]] name = "wasm-bindgen-backend" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb" +checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" dependencies = [ "bumpalo", "log", @@ -1638,11 +1828,23 @@ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258" +checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1650,9 +1852,9 @@ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" +checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", @@ -1663,9 +1865,19 @@ [[package]] name = "wasm-bindgen-shared" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" +checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" + +[[package]] +name = "web-sys" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +dependencies = [ + "js-sys", + "wasm-bindgen", +] [[package]] name = "which" @@ -1757,6 +1969,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "zeroize" version = "1.6.0" diff --git a/services/commtest/Cargo.toml b/services/commtest/Cargo.toml --- a/services/commtest/Cargo.toml +++ b/services/commtest/Cargo.toml @@ -26,6 +26,8 @@ futures-util = "0.3.28" serde_json = "1.0.96" rand = "0.8.5" +reqwest = { version = "0.11", features = ["json", "multipart"] } +serde = "1.0" [build-dependencies] tonic-build = "0.8" diff --git a/services/commtest/src/blob/blob_utils.rs b/services/commtest/src/blob/blob_utils.rs --- a/services/commtest/src/blob/blob_utils.rs +++ b/services/commtest/src/blob/blob_utils.rs @@ -1,8 +1,17 @@ -pub mod proto { - tonic::include_proto!("blob"); +#[derive(Clone)] +pub struct BlobServiceClient { + pub(super) http_client: reqwest::Client, + pub(super) blob_service_url: reqwest::Url, } -pub use proto::blob_service_client::BlobServiceClient; +impl BlobServiceClient { + pub fn new(blob_service_url: reqwest::Url) -> Self { + Self { + http_client: reqwest::Client::new(), + blob_service_url, + } + } +} #[derive(Clone)] pub struct BlobData { diff --git a/services/commtest/src/blob/get.rs b/services/commtest/src/blob/get.rs --- a/services/commtest/src/blob/get.rs +++ b/services/commtest/src/blob/get.rs @@ -1,23 +1,21 @@ -use crate::blob::blob_utils::{proto::GetRequest, BlobData, BlobServiceClient}; +use crate::blob::blob_utils::{BlobData, BlobServiceClient}; use crate::tools::Error; -use tonic::Request; pub async fn run( - client: &mut BlobServiceClient, + client: &BlobServiceClient, blob_data: &BlobData, ) -> Result, Error> { - let cloned_holder = blob_data.holder.clone(); - println!("[{}] get", cloned_holder); + println!("[{}] get", blob_data.hash); - let response = client - .get(Request::new(GetRequest { - holder: cloned_holder, - })) - .await?; - let mut inbound = response.into_inner(); - let mut sizes: Vec = Vec::new(); - while let Some(response) = inbound.message().await? { - sizes.push(response.data_chunk.len()); + let path = format!("/blob/{}", blob_data.hash); + let url = client.blob_service_url.join(&path)?; + let response = client.http_client.get(url).send().await?; + + if !response.status().is_success() { + return Err(Error::HttpStatus(response.status())); } + + let bytes = response.bytes().await?; + let sizes = vec![bytes.len()]; Ok(sizes) } diff --git a/services/commtest/src/blob/put.rs b/services/commtest/src/blob/put.rs --- a/services/commtest/src/blob/put.rs +++ b/services/commtest/src/blob/put.rs @@ -1,43 +1,61 @@ -use crate::blob::blob_utils::{ - proto::put_request::Data::*, proto::PutRequest, BlobData, BlobServiceClient, -}; +use std::collections::HashMap; + +use crate::blob::blob_utils::{BlobData, BlobServiceClient}; use crate::tools::{generate_stable_nbytes, Error}; -use tonic::Request; +#[derive(serde::Deserialize)] +struct AssignHolderResponse { + data_exists: bool, +} pub async fn run( - client: &mut BlobServiceClient, + client: &BlobServiceClient, blob_data: &BlobData, ) -> Result { - let cloned_holder = blob_data.holder.clone(); - let cloned_hash = blob_data.hash.clone(); - let cloned_chunks_sizes = blob_data.chunks_sizes.clone(); - println!("[{}] put", cloned_holder); - - let outbound = async_stream::stream! { - println!("[{}] - sending holder", cloned_holder); - let request = PutRequest { - data: Some(Holder(cloned_holder.to_string())), - }; - yield request; - println!("[{}] - sending hash", cloned_holder); - let request = PutRequest { - data: Some(BlobHash(cloned_hash.to_string())), - }; - yield request; - for chunk_size in cloned_chunks_sizes { - println!("[{}] - sending data chunk {}", cloned_holder, chunk_size); - let request = PutRequest { - data: Some(DataChunk(generate_stable_nbytes(chunk_size, None))), - }; - yield request; - } - }; - - let mut data_exists: bool = false; - let response = client.put(Request::new(outbound)).await?; - let mut inbound = response.into_inner(); - while let Some(response) = inbound.message().await? { - data_exists = data_exists || response.data_exists; + let url = client.blob_service_url.join("/blob")?; + + let holder = blob_data.holder.clone(); + let blob_hash = blob_data.hash.clone(); + println!("[{}] put holder: {}", &blob_hash, &holder); + + // 1. Assign holder + let assign_holder_payload = + HashMap::from([("holder", &holder), ("blob_hash", &blob_hash)]); + let assign_holder_response = client + .http_client + .post(url.clone()) + .json(&assign_holder_payload) + .send() + .await?; + + let AssignHolderResponse { data_exists } = + assign_holder_response.json::<_>().await?; + + if data_exists { + return Ok(data_exists); } + + // 2. Upload blob + let form = + reqwest::multipart::Form::new().text("blob_hash", blob_hash.clone()); + let parts = blob_data + .chunks_sizes + .iter() + .fold(form, |form, chunk_size| { + println!("[{}] - adding data chunk {}", &blob_hash, chunk_size); + form.part( + "blob_data", + reqwest::multipart::Part::bytes(generate_stable_nbytes( + *chunk_size, + None, + )), + ) + }); + + let response = client.http_client.put(url).multipart(parts).send().await?; + + if !response.status().is_success() { + return Err(Error::HttpStatus(response.status())); + } + Ok(data_exists) } diff --git a/services/commtest/src/blob/remove.rs b/services/commtest/src/blob/remove.rs --- a/services/commtest/src/blob/remove.rs +++ b/services/commtest/src/blob/remove.rs @@ -1,20 +1,32 @@ -use crate::blob::blob_utils::{ - proto::RemoveRequest, BlobData, BlobServiceClient, -}; +use std::collections::HashMap; + +use crate::blob::blob_utils::BlobData; use crate::tools::Error; -use tonic::Request; + +use super::blob_utils::BlobServiceClient; pub async fn run( - client: &mut BlobServiceClient, + client: &BlobServiceClient, blob_data: &BlobData, ) -> Result<(), Error> { let cloned_holder = blob_data.holder.clone(); + let cloned_blob_hash = blob_data.hash.clone(); println!("[{}] remove", cloned_holder); - client - .remove(Request::new(RemoveRequest { - holder: cloned_holder, - })) + let request_body = + HashMap::from([("holder", cloned_holder), ("blob_hash", cloned_blob_hash)]); + + let url = client.blob_service_url.join("/blob")?; + let response = client + .http_client + .delete(url) + .json(&request_body) + .send() .await?; + + if !response.status().is_success() { + return Err(Error::HttpStatus(response.status())); + } + Ok(()) } diff --git a/services/commtest/src/tools.rs b/services/commtest/src/tools.rs --- a/services/commtest/src/tools.rs +++ b/services/commtest/src/tools.rs @@ -2,6 +2,7 @@ use num_cpus; use sha2::{Digest, Sha512}; use std::env; +use url::ParseError; pub fn generate_stable_nbytes( number_of_bytes: usize, @@ -21,6 +22,12 @@ Tonic(tonic::transport::Error), #[display(...)] TonicStatus(tonic::Status), + #[display(...)] + Reqwest(reqwest::Error), + #[display(fmt = "HTTP status: {:?}.", _0)] + HttpStatus(#[error(ignore)] reqwest::StatusCode), + #[display(...)] + ParseError(ParseError), } pub fn obtain_number_of_threads() -> usize { diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -10,9 +10,12 @@ #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received"); - let mut client = - BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; + .expect("port env var expected but not received") + .parse() + .expect("port env var should be a number"); + let mut url = reqwest::Url::parse("http://localhost")?; + url.set_port(Some(port)).expect("failed to set port"); + let client = BlobServiceClient::new(url); let blob_data = vec![ BlobData { @@ -45,12 +48,12 @@ ]; for item in &blob_data { - let data_exists: bool = put::run(&mut client, &item).await?; + let data_exists: bool = put::run(&client, &item).await?; assert!(!data_exists, "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { - let received_sizes = get::run(&mut client, &blob_item).await?; + let received_sizes = get::run(&client, &blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( @@ -61,9 +64,9 @@ } for item in &blob_data { - remove::run(&mut client, &item).await?; + remove::run(&client, &item).await?; assert!( - get::run(&mut client, &item).await.is_err(), + get::run(&client, &item).await.is_err(), "item should no longer be available" ); } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -10,9 +10,12 @@ #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received"); - let client = - BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; + .expect("port env var expected but not received") + .parse() + .expect("port env var should be a number"); + let mut url = reqwest::Url::parse("http://localhost")?; + url.set_port(Some(port)).expect("failed to set port"); + let client = BlobServiceClient::new(url); let number_of_threads = obtain_number_of_threads(); @@ -44,10 +47,10 @@ let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); - let mut client_cloned = client.clone(); + let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let data_exists: bool = - put::run(&mut client_cloned, &item_cloned).await.unwrap(); + put::run(&client_cloned, &item_cloned).await.unwrap(); assert!(!data_exists, "test data should not exist"); })); } @@ -64,10 +67,10 @@ for (i, item) in blob_data.iter().enumerate() { let item_cloned = item.clone(); - let mut client_cloned = client.clone(); + let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let received_sizes = - get::run(&mut client_cloned, &item_cloned).await.unwrap(); + get::run(&client_cloned, &item_cloned).await.unwrap(); let expected_data_size = item_cloned.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); @@ -91,11 +94,11 @@ for item in &blob_data { let item_cloned = item.clone(); - let mut client_cloned = client.clone(); + let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { - remove::run(&mut client_cloned, &item_cloned).await.unwrap(); + remove::run(&client_cloned, &item_cloned).await.unwrap(); assert!( - get::run(&mut client_cloned, &item_cloned).await.is_err(), + get::run(&client_cloned, &item_cloned).await.is_err(), "item should no longer be available" ); }));