diff --git a/services/commtest/src/blob/put.rs b/services/commtest/src/blob/put.rs index a1166b470..42928b8a0 100644 --- a/services/commtest/src/blob/put.rs +++ b/services/commtest/src/blob/put.rs @@ -1,61 +1,87 @@ use std::collections::HashMap; +use reqwest::StatusCode; + use crate::blob::blob_utils::{BlobData, BlobServiceClient}; use crate::tools::{generate_stable_nbytes, Error}; #[derive(serde::Deserialize)] struct AssignHolderResponse { data_exists: bool, } + +#[derive(Debug)] +pub enum PutResult { + HolderEstablished { data_exists: bool }, + HolderAlreadyExists, +} + +impl PutResult { + pub fn blob_was_uploaded(&self) -> bool { + match self { + Self::HolderEstablished { data_exists } => !data_exists, + _ => false, + } + } + + pub fn holder_already_exists(&self) -> bool { + matches!(self, Self::HolderAlreadyExists) + } +} + pub async fn run( client: &BlobServiceClient, blob_data: &BlobData, -) -> Result { +) -> Result { let url = client.blob_service_url.join("/blob")?; let holder = blob_data.holder.clone(); let blob_hash = blob_data.hash.clone(); println!("[{}] put holder: {}", &blob_hash, &holder); // 1. Assign holder let assign_holder_payload = HashMap::from([("holder", &holder), ("blob_hash", &blob_hash)]); let assign_holder_response = client .http_client .post(url.clone()) .json(&assign_holder_payload) .send() .await?; + if assign_holder_response.status() == StatusCode::CONFLICT { + return Ok(PutResult::HolderAlreadyExists); + } + let AssignHolderResponse { data_exists } = assign_holder_response.json::<_>().await?; if data_exists { - return Ok(data_exists); + return Ok(PutResult::HolderEstablished { data_exists }); } // 2. Upload blob let form = reqwest::multipart::Form::new().text("blob_hash", blob_hash.clone()); let parts = blob_data .chunks_sizes .iter() .fold(form, |form, chunk_size| { println!("[{}] - adding data chunk {}", &blob_hash, chunk_size); form.part( "blob_data", reqwest::multipart::Part::bytes(generate_stable_nbytes( *chunk_size, None, )), ) }); let response = client.http_client.put(url).multipart(parts).send().await?; if !response.status().is_success() { return Err(Error::HttpStatus(response.status())); } - Ok(data_exists) + Ok(PutResult::HolderEstablished { data_exists }) } diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs index f3f9b82e0..ff264a6b8 100644 --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,73 +1,73 @@ use bytesize::ByteSize; use commtest::constants; use commtest::tools::Error; use commtest::{ blob::{ blob_utils::{BlobData, BlobServiceClient}, get, put, remove, }, service_addr, }; #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let blob_data = vec![ BlobData { holder: "test_holder001".to_string(), hash: "test_hash001".to_string(), chunks_sizes: vec![ ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ], }, BlobData { holder: "test_holder002".to_string(), hash: "test_hash002".to_string(), chunks_sizes: vec![ *constants::GRPC_CHUNK_SIZE_LIMIT, *constants::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(10).as_u64() as usize, ], }, BlobData { holder: "test_holder003".to_string(), hash: "test_hash003".to_string(), chunks_sizes: vec![ *constants::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(100).as_u64() as usize, *constants::GRPC_CHUNK_SIZE_LIMIT, ], }, ]; for item in &blob_data { - let data_exists: bool = put::run(&client, item).await?; - assert!(!data_exists, "test data should not exist"); + let result = put::run(&client, item).await?; + assert!(result.blob_was_uploaded(), "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { let received_sizes = get::run(&client, blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); } for item in &blob_data { remove::run(&client, item).await?; assert!( get::run(&client, item).await.is_err(), "item should no longer be available" ); } Ok(()) } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs index b27f1b9af..6418117e4 100644 --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,114 +1,113 @@ use bytesize::ByteSize; use commtest::tools::{obtain_number_of_threads, Error}; use commtest::{ blob::{ blob_utils::{BlobData, BlobServiceClient}, get, put, remove, }, service_addr, }; use tokio::runtime::Runtime; #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let number_of_threads = obtain_number_of_threads(); println!( "Running performance tests for blob, number of threads: {}", number_of_threads ); let mut blob_data = vec![]; for i in 0..number_of_threads { let index: u64 = (i as u64) % 10; blob_data.push(BlobData { holder: format!("test_holder_{}", i), hash: format!("test_hash_{}", i), chunks_sizes: vec![ ByteSize::kib(200 + (300 - index * 20)).as_u64() as usize, ByteSize::kib(500 + (400 - index * 20)).as_u64() as usize, ByteSize::kib(700 + (500 - index * 25)).as_u64() as usize, ], }) } let rt = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || { // PUT rt.block_on(async { println!("performing PUT operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { - let data_exists: bool = - put::run(&client_cloned, &item_cloned).await.unwrap(); - assert!(!data_exists, "test data should not exist"); + let result = put::run(&client_cloned, &item_cloned).await.unwrap(); + assert!(result.blob_was_uploaded(), "test data should not exist"); })); } for handler in handlers { handler.await.unwrap(); } }); // GET rt.block_on(async { println!("performing GET operations"); let mut handlers = vec![]; for (i, item) in blob_data.iter().enumerate() { let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let received_sizes = get::run(&client_cloned, &item_cloned).await.unwrap(); let expected_data_size = item_cloned.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); })); } for handler in handlers { handler.await.unwrap(); } }); // REMOVE rt.block_on(async { println!("performing REMOVE operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { remove::run(&client_cloned, &item_cloned).await.unwrap(); assert!( get::run(&client_cloned, &item_cloned).await.is_err(), "item should no longer be available" ); })); } for handler in handlers { handler.await.unwrap(); } }); }) .await .expect("Task panicked"); Ok(()) } diff --git a/services/commtest/tests/identity_integration_tests.rs b/services/commtest/tests/identity_integration_tests.rs index 0ef8185b1..3d68c243e 100644 --- a/services/commtest/tests/identity_integration_tests.rs +++ b/services/commtest/tests/identity_integration_tests.rs @@ -1,75 +1,143 @@ +use std::time::Duration; + +use bytesize::ByteSize; +use comm_lib::tools::base64_to_base64url; +use commtest::blob::blob_utils::{BlobData, BlobServiceClient}; use commtest::identity::device::{ register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::identity::olm_account::generate_random_olm_key; use commtest::service_addr; +use grpc_clients::identity::protos::unauth::Empty; use grpc_clients::identity::PlatformMetadata; use grpc_clients::identity::{ get_auth_client, get_unauthenticated_client, protos::auth::{Identity, UserIdentitiesRequest}, protos::unauthenticated::{ find_user_id_request::Identifier, FindUserIdRequest, }, }; #[tokio::test] async fn find_user_id_by_username() { let device_info = register_user_device(None, None).await; let mut client = get_unauthenticated_client( &service_addr::IDENTITY_GRPC.to_string(), PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), ) .await .expect("Couldn't connect to identity service"); let request = FindUserIdRequest { identifier: Some(Identifier::Username(device_info.username)), }; let response = client .find_user_id(request) .await .expect("Request failed") .into_inner(); assert_eq!( response.user_id, Some(device_info.user_id), "User ID should match" ); } #[tokio::test] async fn find_username_for_user() { let device_info = register_user_device(None, None).await; let mut client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), device_info.user_id.clone(), device_info.device_id, device_info.access_token, PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), ) .await .expect("Couldn't connect to identity service"); let request = UserIdentitiesRequest { user_ids: vec![device_info.user_id.clone()], }; let response = client .find_user_identities(request) .await .expect("request failed") .into_inner(); let expected_username = device_info.username; assert!( matches!( response.identities.get(&device_info.user_id), Some(Identity { username, .. }) if *username == expected_username ), "username doesn't match" ); } + +#[tokio::test] +async fn test_removing_blob_holders_on_logout() { + let blob_url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); + let blob_client = BlobServiceClient::new(blob_url); + + // Register user device + let user = register_user_device(None, None).await; + let mut user_identity_client = get_auth_client( + &service_addr::IDENTITY_GRPC.to_string(), + user.user_id, + user.device_id.clone(), + user.access_token, + PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), + ) + .await + .expect("Couldn't connect to identity service"); + + // 1. Upload a blob with holder belonging to the device + let blob_data = BlobData { + // holder is prefixed with device ID in base64url format + holder: format!("{}:holder1", base64_to_base64url(&user.device_id)), + hash: format!("blob_{}", generate_random_olm_key()), + chunks_sizes: vec![ByteSize::b(100).as_u64() as usize], + }; + let first_upload = commtest::blob::put::run(&blob_client, &blob_data) + .await + .expect("Failed to upload blob"); + assert!( + !first_upload.holder_already_exists(), + "Holder should not exist yet" + ); + + // 2. Check if assigned holder exists + let upload_before_logout = commtest::blob::put::run(&blob_client, &blob_data) + .await + .expect("Failed to check if holder exists (before logout)"); + assert!( + upload_before_logout.holder_already_exists(), + "Holder should exist now" + ); + + // 3. Log out device + user_identity_client + .log_out_user(Empty {}) + .await + .expect("Failed to log out user"); + + // identity runs holder removal asynchronously so wait a bit + tokio::time::sleep(Duration::from_millis(500)).await; + + // 4. Check if assigned holder doesn't exist now + let upload_after_logout = commtest::blob::put::run(&blob_client, &blob_data) + .await + .expect("Failed to check if holder exists (after logout)"); + assert!( + !upload_after_logout.holder_already_exists(), + "Holder should be removed now" + ); +} diff --git a/services/docker-compose.tests.yml b/services/docker-compose.tests.yml index e1c27b26f..141cacad5 100644 --- a/services/docker-compose.tests.yml +++ b/services/docker-compose.tests.yml @@ -1,88 +1,89 @@ version: '3.9' volumes: localstack: commtest_build_artifacts: services: commtest: depends_on: - tunnelbroker-server - backup-server - blob-server - identity-server # There are no tests for these services: # - feature-flags-server # - reports-server build: dockerfile: services/commtest/Dockerfile context: ../ platform: '${PLATFORM:-linux/amd64}' volumes: # This one caches build directory and allows to run tests multiple times without rebuilding - commtest_build_artifacts:/home/root/app/commtest/target env_file: test-commons.env environment: # tested services endpoints TUNNELBROKER_WS_ENDPOINT: 'ws://tunnelbroker-server:51001' TUNNELBROKER_GRPC_ENDPOINT: 'http://tunnelbroker-server:${COMM_SERVICES_PORT_TUNNELBROKER}' BACKUP_SERVICE_URL: 'http://backup-server:${COMM_SERVICES_PORT_BACKUP}' BLOB_SERVICE_URL: 'http://blob-server:${COMM_SERVICES_PORT_BLOB}' IDENTITY_GRPC_ENDPOINT: 'http://identity-server:${COMM_SERVICES_PORT_IDENTITY}' # override localstack endpoint in terraform setup TF_VAR_localstack_endpoint: 'http://localstack:4566' # others COMM_NUMBER_OF_THREADS: '4' BLOB_SERVICE_EXECUTABLE: /shared/bin/blob RUST_LOG: blob=trace,comm_lib=debug tunnelbroker-server: image: tunnelbroker pull_policy: build platform: '${PLATFORM:-linux/amd64}' restart: on-failure env_file: test-commons.env environment: COMM_TUNNELBROKER_IDENTITY_ENDPOINT: 'http://identity-server:50054' AMQP_URI: 'amqp://comm:comm@rabbitmq:5672' RUST_LOG: tunnelbroker=trace backup-server: image: backup pull_policy: build platform: '${PLATFORM:-linux/amd64}' env_file: test-commons.env environment: BLOB_SERVICE_URL: 'http://blob-server:50053' COMM_SERVICES_DISABLE_CSAT_VERIFICATION: 'true' blob-server: image: blob pull_policy: build # Until blob cleanup is supported in tests, enable auto-deletion command: ['blob', 'server', '--instant-delete'] platform: '${PLATFORM:-linux/amd64}' env_file: test-commons.env environment: RUST_LOG: blob=trace,comm_lib=debug COMM_SERVICES_DISABLE_CSAT_VERIFICATION: 'true' identity-server: image: identity pull_policy: build platform: '${PLATFORM:-linux/amd64}' env_file: test-commons.env environment: TUNNELBROKER_GRPC_ENDPOINT: 'http://tunnelbroker-server:50051' + BLOB_SERVICE_URL: 'http://blob-server:${COMM_SERVICES_PORT_BLOB}' build: args: - generate_keypair=true localstack: environment: - PERSISTENCE=0 rabbitmq: healthcheck: test: rabbitmq-diagnostics -q ping interval: 15s timeout: 10s retries: 5