diff --git a/services/commtest/tests/blob/blob_utils.rs b/services/commtest/tests/blob/blob_utils.rs index 538519b43..d5641b12c 100644 --- a/services/commtest/tests/blob/blob_utils.rs +++ b/services/commtest/tests/blob/blob_utils.rs @@ -1,12 +1,13 @@ pub mod proto { tonic::include_proto!("blob"); } pub use proto::blob_service_client::BlobServiceClient; #[allow(dead_code)] +#[derive(Clone)] pub struct BlobData { pub holder: String, pub hash: String, pub chunks_sizes: Vec, } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs index de46cce35..1d71d82d3 100644 --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,58 +1,80 @@ #[path = "./blob/blob_utils.rs"] mod blob_utils; +#[path = "./blob/put.rs"] +mod put; #[path = "./lib/tools.rs"] mod tools; use bytesize::ByteSize; +use std::env; use tokio::runtime::Runtime; use tools::{obtain_number_of_threads, Error}; -use blob_utils::BlobData; +use blob_utils::{BlobData, BlobServiceClient}; #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { + let port = env::var("COMM_SERVICES_PORT_BLOB") + .expect("port env var expected but not received"); + let client = + BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; + let number_of_threads = obtain_number_of_threads(); println!( "Running performance tests for blob, number of threads: {}", number_of_threads ); let mut blob_data = vec![]; for i in 0..number_of_threads { let index: u64 = (i as u64) % 10; blob_data.push(BlobData { holder: format!("test_holder_{}", i), hash: format!("test_hash_{}", i), chunks_sizes: vec![ ByteSize::kib(200 + (300 - index * 20)).as_u64() as usize, ByteSize::kib(500 + (400 - index * 20)).as_u64() as usize, ByteSize::kib(700 + (500 - index * 25)).as_u64() as usize, ], }) } let rt = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || { // PUT rt.block_on(async { println!("performing PUT operations"); + let mut handlers = vec![]; + for item in &blob_data { + let item_cloned = item.clone(); + let mut client_cloned = client.clone(); + handlers.push(tokio::spawn(async move { + let data_exists: bool = + put::run(&mut client_cloned, &item_cloned).await.unwrap(); + assert!(!data_exists, "test data should not exist"); + })); + } + + for handler in handlers { + handler.await.unwrap(); + } }); // GET rt.block_on(async { println!("performing GET operations"); }); // REMOVE rt.block_on(async { println!("performing REMOVE operations"); }); }) .await .expect("Task panicked"); Ok(()) }