diff --git a/services/commtest/src/blob/put.rs b/services/commtest/src/blob/put.rs --- a/services/commtest/src/blob/put.rs +++ b/services/commtest/src/blob/put.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use reqwest::StatusCode; + use crate::blob::blob_utils::{BlobData, BlobServiceClient}; use crate::tools::{generate_stable_nbytes, Error}; @@ -7,10 +9,30 @@ struct AssignHolderResponse { data_exists: bool, } + +#[derive(Debug)] +pub enum PutResult { + HolderEstablished { data_exists: bool }, + HolderAlreadyExists, +} + +impl PutResult { + pub fn blob_was_uploaded(&self) -> bool { + match self { + Self::HolderEstablished { data_exists } => !data_exists, + _ => false, + } + } + + pub fn holder_already_exists(&self) -> bool { + matches!(self, Self::HolderAlreadyExists) + } +} + pub async fn run( client: &BlobServiceClient, blob_data: &BlobData, -) -> Result { +) -> Result { let url = client.blob_service_url.join("/blob")?; let holder = blob_data.holder.clone(); @@ -27,11 +49,15 @@ .send() .await?; + if assign_holder_response.status() == StatusCode::CONFLICT { + return Ok(PutResult::HolderAlreadyExists); + } + let AssignHolderResponse { data_exists } = assign_holder_response.json::<_>().await?; if data_exists { - return Ok(data_exists); + return Ok(PutResult::HolderEstablished { data_exists }); } // 2. Upload blob @@ -57,5 +83,5 @@ return Err(Error::HttpStatus(response.status())); } - Ok(data_exists) + Ok(PutResult::HolderEstablished { data_exists }) } diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -46,8 +46,8 @@ ]; for item in &blob_data { - let data_exists: bool = put::run(&client, item).await?; - assert!(!data_exists, "test data should not exist"); + let result = put::run(&client, item).await?; + assert!(result.blob_was_uploaded(), "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -47,9 +47,8 @@ let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { - let data_exists: bool = - put::run(&client_cloned, &item_cloned).await.unwrap(); - assert!(!data_exists, "test data should not exist"); + let result = put::run(&client_cloned, &item_cloned).await.unwrap(); + assert!(result.blob_was_uploaded(), "test data should not exist"); })); } diff --git a/services/commtest/tests/identity_integration_tests.rs b/services/commtest/tests/identity_integration_tests.rs --- a/services/commtest/tests/identity_integration_tests.rs +++ b/services/commtest/tests/identity_integration_tests.rs @@ -1,7 +1,14 @@ +use std::time::Duration; + +use bytesize::ByteSize; +use comm_lib::tools::base64_to_base64url; +use commtest::blob::blob_utils::{BlobData, BlobServiceClient}; use commtest::identity::device::{ register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::identity::olm_account::generate_random_olm_key; use commtest::service_addr; +use grpc_clients::identity::protos::unauth::Empty; use grpc_clients::identity::PlatformMetadata; use grpc_clients::identity::{ get_auth_client, get_unauthenticated_client, @@ -73,3 +80,64 @@ "username doesn't match" ); } + +#[tokio::test] +async fn test_removing_blob_holders_on_logout() { + let blob_url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); + let blob_client = BlobServiceClient::new(blob_url); + + // Register user device + let user = register_user_device(None, None).await; + let mut user_identity_client = get_auth_client( + &service_addr::IDENTITY_GRPC.to_string(), + user.user_id, + user.device_id.clone(), + user.access_token, + PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), + ) + .await + .expect("Couldn't connect to identity service"); + + // 1. Upload a blob with holder belonging to the device + let blob_data = BlobData { + // holder is prefixed with device ID in base64url format + holder: format!("{}:holder1", base64_to_base64url(&user.device_id)), + hash: format!("blob_{}", generate_random_olm_key()), + chunks_sizes: vec![ByteSize::b(100).as_u64() as usize], + }; + let first_upload = commtest::blob::put::run(&blob_client, &blob_data) + .await + .expect("Failed to upload blob"); + assert!( + !first_upload.holder_already_exists(), + "Holder should not exist yet" + ); + + // 2. Check if assigned holder exists + let upload_before_logout = commtest::blob::put::run(&blob_client, &blob_data) + .await + .expect("Failed to check if holder exists (before logout)"); + assert!( + upload_before_logout.holder_already_exists(), + "Holder should exist now" + ); + + // 3. Log out device + user_identity_client + .log_out_user(Empty {}) + .await + .expect("Failed to log out user"); + + // identity runs holder removal asynchronously so wait a bit + tokio::time::sleep(Duration::from_millis(500)).await; + + // 4. Check if assigned holder doesn't exist now + let upload_after_logout = commtest::blob::put::run(&blob_client, &blob_data) + .await + .expect("Failed to check if holder exists (after logout)"); + assert!( + !upload_after_logout.holder_already_exists(), + "Holder should be removed now" + ); +} diff --git a/services/docker-compose.tests.yml b/services/docker-compose.tests.yml --- a/services/docker-compose.tests.yml +++ b/services/docker-compose.tests.yml @@ -73,6 +73,7 @@ env_file: test-commons.env environment: TUNNELBROKER_GRPC_ENDPOINT: 'http://tunnelbroker-server:50051' + BLOB_SERVICE_URL: 'http://blob-server:${COMM_SERVICES_PORT_BLOB}' build: args: - generate_keypair=true