diff --git a/services/commtest/src/blob/constants.rs b/services/commtest/src/blob/constants.rs new file mode 100644 --- /dev/null +++ b/services/commtest/src/blob/constants.rs @@ -0,0 +1,3 @@ +use std::ops::Range; +// Hack for passing None to Option<&impl RangeBounds> typed argument +pub const NO_RANGE: Option<&Range> = None; diff --git a/services/commtest/src/blob/get.rs b/services/commtest/src/blob/get.rs --- a/services/commtest/src/blob/get.rs +++ b/services/commtest/src/blob/get.rs @@ -1,15 +1,39 @@ use crate::blob::blob_utils::{BlobData, BlobServiceClient}; use crate::tools::Error; +use reqwest::header::RANGE; +use std::ops::RangeBounds; pub async fn run( client: &BlobServiceClient, blob_data: &BlobData, + range: Option<&impl RangeBounds>, ) -> Result, Error> { println!("[{}] get", blob_data.hash); let path = format!("/blob/{}", blob_data.hash); let url = client.blob_service_url.join(&path)?; - let response = client.http_client.get(url).send().await?; + let parsed_range = match range { + Some(range) => { + let start_range = match range.start_bound() { + std::ops::Bound::Included(start) => start.to_string(), + std::ops::Bound::Excluded(start) => (start + 1).to_string(), + std::ops::Bound::Unbounded => "".to_string(), + }; + let end_range = match range.end_bound() { + std::ops::Bound::Included(end) => end.to_string(), + std::ops::Bound::Excluded(end) => (end - 1).to_string(), + std::ops::Bound::Unbounded => "".to_string(), + }; + format!("bytes={}-{}", start_range, end_range) + } + None => "".to_string(), + }; + let response = client + .http_client + .get(url) + .header(RANGE, parsed_range) + .send() + .await?; if !response.status().is_success() { return Err(Error::HttpStatus(response.status())); diff --git a/services/commtest/src/blob/mod.rs b/services/commtest/src/blob/mod.rs --- a/services/commtest/src/blob/mod.rs +++ b/services/commtest/src/blob/mod.rs @@ -1,4 +1,5 @@ pub mod blob_utils; +pub mod constants; pub mod get; pub mod put; pub mod remove; diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,11 +1,70 @@ use bytesize::ByteSize; use commtest::blob::{ blob_utils::{BlobData, BlobServiceClient}, + constants::NO_RANGE, get, put, remove, }; use commtest::constants; use commtest::tools::Error; -use std::env; +use std::{env, ops::RangeBounds}; + +async fn run_http_range_test( + client: &BlobServiceClient, + blob_item: &BlobData, + range: impl RangeBounds, +) -> Result<(), Error> { + let blob_size: i64 = blob_item + .chunks_sizes + .iter() + .sum::() + .try_into() + .unwrap(); + let end_range = match range.end_bound() { + std::ops::Bound::Included(end) => *end, + std::ops::Bound::Excluded(end) => *end - 1, + std::ops::Bound::Unbounded => blob_size - 1, + }; + let start_range = match range.start_bound() { + std::ops::Bound::Included(start) => *start, + std::ops::Bound::Excluded(start) => *start + 1, + std::ops::Bound::Unbounded => { + // HTTP Range with defined end_range only will take the last N bytes + // of the blob, so we need to add one byte to match the expected_data_size calculation + (end_range < blob_size - 1) as i64 + } + }; + + // For invalid ranges, the expected data size is the whole blob + let expected_data_size = if start_range >= 0 && start_range <= end_range { + end_range - start_range + 1 + } else { + blob_size + }; + + match get::run(&client, &blob_item, Some(&range)).await { + Ok(received_sizes) => { + let received_data_size: i64 = + received_sizes.iter().sum::().try_into().unwrap(); + if expected_data_size != received_data_size { + Err(Error::AssertionError(format!( + "invalid size of data for range {}-{}, expected {}, got {}", + start_range, end_range, expected_data_size, received_data_size + )))?; + } + Ok(()) + } + Err(Error::HttpStatus(reqwest::StatusCode::RANGE_NOT_SATISFIABLE)) => { + if start_range >= blob_size || end_range >= blob_size { + return Ok(()); + } + Err(Error::AssertionError(format!( + "invalid HTTP Range {}-{}", + start_range, end_range + )))? + } + Err(e) => Err(e)?, + } +} async fn run_blob_integration_test( client: &BlobServiceClient, @@ -22,7 +81,7 @@ } for (i, blob_item) in blob_data.iter().enumerate() { - let received_sizes = get::run(&client, &blob_item).await?; + let received_sizes = get::run(&client, &blob_item, NO_RANGE).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size: usize = received_sizes.iter().sum::(); if expected_data_size != received_data_size { @@ -33,9 +92,33 @@ } } + // Test HTTP Range header + let blob_item = &blob_data[0]; + let blob_size: i64 = blob_item + .chunks_sizes + .iter() + .sum::() + .try_into() + .unwrap(); + // Valid ranges + run_http_range_test(client, blob_item, 0..blob_size).await?; + run_http_range_test(client, blob_item, 5..26).await?; + run_http_range_test(client, blob_item, 5..6).await?; + run_http_range_test(client, blob_item, 40..).await?; + run_http_range_test(client, blob_item, ..40).await?; + // 416 Range Not Satisfiable + run_http_range_test(client, blob_item, 0..blob_size + 1).await?; + run_http_range_test(client, blob_item, ..blob_size + 1).await?; + run_http_range_test(client, blob_item, blob_size + 1..).await?; + // Invalid ranges (should return the whole data) + run_http_range_test(client, blob_item, 31..21).await?; + run_http_range_test(client, blob_item, -5..).await?; + run_http_range_test(client, blob_item, ..-5).await?; + run_http_range_test(client, blob_item, ..).await?; + for (i, item) in blob_data.iter().enumerate() { remove::run(&client, &item).await?; - if get::run(&client, &item).await.is_ok() { + if get::run(&client, &item, NO_RANGE).await.is_ok() { Err(Error::AssertionError(format!( "test data no. {} should no longer be available", i diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,6 +1,7 @@ use bytesize::ByteSize; use commtest::blob::{ blob_utils::{BlobData, BlobServiceClient}, + constants::NO_RANGE, get, put, remove, }; use commtest::tools::{obtain_number_of_threads, Error}; @@ -69,8 +70,9 @@ let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { - let received_sizes = - get::run(&client_cloned, &item_cloned).await.unwrap(); + let received_sizes = get::run(&client_cloned, &item_cloned, NO_RANGE) + .await + .unwrap(); let expected_data_size = item_cloned.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); @@ -98,7 +100,9 @@ handlers.push(tokio::spawn(async move { remove::run(&client_cloned, &item_cloned).await.unwrap(); assert!( - get::run(&client_cloned, &item_cloned).await.is_err(), + get::run(&client_cloned, &item_cloned, NO_RANGE) + .await + .is_err(), "item should no longer be available" ); }));