diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -6096,6 +6096,7 @@ "tracing", "tracing-subscriber", "tunnelbroker_messages", + "urlencoding", "uuid", "web-push", ] diff --git a/services/docker-compose.tests.yml b/services/docker-compose.tests.yml --- a/services/docker-compose.tests.yml +++ b/services/docker-compose.tests.yml @@ -42,6 +42,8 @@ restart: on-failure env_file: test-commons.env environment: + BLOB_SERVICE_URL: 'http://blob-server:50053' + BLOB_SERVICE_PUBLIC_URL: 'http://blob-server:50053' COMM_TUNNELBROKER_IDENTITY_ENDPOINT: 'http://identity-server:50054' AMQP_URI: 'amqp://comm:comm@rabbitmq:5672' RUST_LOG: tunnelbroker=trace @@ -61,7 +63,7 @@ image: blob pull_policy: build # Until blob cleanup is supported in tests, enable auto-deletion - command: [ 'blob', 'server', '--instant-delete' ] + command: ['blob', 'server', '--instant-delete'] platform: '${PLATFORM:-linux/amd64}' env_file: test-commons.env environment: diff --git a/services/terraform/remote/service_tunnelbroker.tf b/services/terraform/remote/service_tunnelbroker.tf --- a/services/terraform/remote/service_tunnelbroker.tf +++ b/services/terraform/remote/service_tunnelbroker.tf @@ -108,6 +108,14 @@ name = "COMM_TUNNELBROKER_IDENTITY_ENDPOINT", value = local.identity_local_url }, + { + name = "BLOB_SERVICE_URL", + value = local.blob_local_url + }, + { + name = "BLOB_SERVICE_PUBLIC_URL", + value = "https://${local.blob_service_domain_name}" + }, { name = "COMM_SERVICES_USE_JSON_LOGS", value = local.comm_services_use_json_logs diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -11,8 +11,9 @@ anyhow = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } comm-lib = { path = "../../shared/comm-lib", features = [ - "aws", - "grpc_clients", + "aws", + "blob-client", + "grpc_clients", ] } futures-util = { workspace = true } grpc_clients = { path = "../../shared/grpc_clients" } @@ -33,13 +34,14 @@ uuid = { workspace = true, features = ["v4"] } jsonwebtoken = "9.3.0" web-push = { version = "0.11.0", features = [ - "hyper-client", + "hyper-client", ], default-features = false } reqwest = { workspace = true, features = ["json", "native-tls", "rustls-tls"] } serde.workspace = true tokio-executor-trait = "2.1" tokio-reactor-trait = "1.1" tokio-util = { workspace = true } +urlencoding = { workspace = true } [build-dependencies] tonic-build = "0.8" diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs --- a/services/tunnelbroker/src/config.rs +++ b/services/tunnelbroker/src/config.rs @@ -34,6 +34,14 @@ #[arg(env = "COMM_TUNNELBROKER_IDENTITY_ENDPOINT")] #[arg(long, default_value = "http://localhost:50054")] pub identity_endpoint: String, + /// Blob service URL + #[arg(env = "BLOB_SERVICE_URL")] + #[arg(long, default_value = "http://localhost:50053")] + pub blob_service_url: reqwest::Url, + /// Blob service public URL + #[arg(env = "BLOB_SERVICE_PUBLIC_URL")] + #[arg(long, default_value = "http://localhost:50053")] + pub blob_service_public_url: reqwest::Url, /// APNs secrets #[arg(env = ENV_APNS_CONFIG)] #[arg(long)] diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -26,6 +26,7 @@ pub const PUSH_SERVICE_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); pub const FARCASTER_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); +pub const MEDIA_MIRROR_TIMEOUT: Duration = Duration::from_secs(5); pub mod dynamodb { // This table holds messages which could not be immediately delivered to diff --git a/services/tunnelbroker/src/database/mod.rs b/services/tunnelbroker/src/database/mod.rs --- a/services/tunnelbroker/src/database/mod.rs +++ b/services/tunnelbroker/src/database/mod.rs @@ -4,7 +4,7 @@ }; use comm_lib::aws::ddb::operation::put_item::PutItemError; use comm_lib::aws::ddb::operation::query::QueryError; -use comm_lib::aws::ddb::types::AttributeValue; +use comm_lib::aws::ddb::types::{AttributeValue, PutRequest, WriteRequest}; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, Error, @@ -299,6 +299,43 @@ Ok(()) } + pub async fn mark_messages_to_device_for_deletion( + &self, + device_id: &str, + ) -> Result<(), Error> { + const MESSAGE_TTL_AFTER_DELETION_REQUEST: chrono::Duration = + chrono::Duration::hours(24); + const EXPIRATION_TIME: &str = "expirationTimeUnix"; + + let messages = self.retrieve_messages(device_id).await.map_err(|e| { + error!("DynamoDB client failed to retrieve messages: {:?}", e); + Error::AwsSdk(e.into()) + })?; + + let expires_at = chrono::Utc::now() + MESSAGE_TTL_AFTER_DELETION_REQUEST; + let expiration_attr = AttributeValue::N(expires_at.timestamp().to_string()); + + let update_requests = messages + .into_iter() + .map(|mut attrs| { + attrs.insert(EXPIRATION_TIME.to_string(), expiration_attr.clone()); + let put_request = + PutRequest::builder().set_item(Some(attrs)).build().unwrap(); + WriteRequest::builder().put_request(put_request).build() + }) + .collect(); + + comm_lib::database::batch_operations::batch_write( + &self.client, + undelivered_messages::TABLE_NAME, + update_requests, + Default::default(), + ) + .await?; + + Ok(()) + } + pub async fn mark_device_token_as_invalid( &self, device_id: &str, diff --git a/services/tunnelbroker/src/farcaster/error.rs b/services/tunnelbroker/src/farcaster/error.rs --- a/services/tunnelbroker/src/farcaster/error.rs +++ b/services/tunnelbroker/src/farcaster/error.rs @@ -8,4 +8,7 @@ MissingFarcasterToken, InvalidRequest, DatabaseError(comm_lib::database::Error), + BlobError(comm_lib::blob::client::BlobServiceError), + AuthError(comm_lib::auth::AuthServiceError), + Timeout, } diff --git a/services/tunnelbroker/src/farcaster/mod.rs b/services/tunnelbroker/src/farcaster/mod.rs --- a/services/tunnelbroker/src/farcaster/mod.rs +++ b/services/tunnelbroker/src/farcaster/mod.rs @@ -1,10 +1,13 @@ -use crate::amqp_client::amqp::AmqpConnection; -use crate::constants::FARCASTER_REQUEST_TIMEOUT; +use crate::constants::{FARCASTER_REQUEST_TIMEOUT, MEDIA_MIRROR_TIMEOUT}; use crate::database::DatabaseClient; use crate::farcaster::error::Error::MissingFarcasterToken; +use crate::{amqp_client::amqp::AmqpConnection, config::CONFIG}; +use comm_lib::auth::AuthService; +use comm_lib::blob::client::S2SAuthedBlobClient; +use comm_lib::blob::types::http::MirroredMediaInfo; use lapin::{BasicProperties, ExchangeKind}; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; -use tracing::{debug, error}; +use tracing::{debug, info, warn}; use tunnelbroker_messages::farcaster::{APIMethod, FarcasterAPIRequest}; pub mod error; @@ -15,6 +18,7 @@ http_client: reqwest::Client, db_client: DatabaseClient, amqp_connection: AmqpConnection, + blob_client: S2SAuthedBlobClient, } impl FarcasterClient { @@ -22,6 +26,7 @@ farcaster_api_url: reqwest::Url, db_client: DatabaseClient, amqp_connection: &AmqpConnection, + auth_service: &AuthService, ) -> Result { let http_client = reqwest::Client::builder() .timeout(FARCASTER_REQUEST_TIMEOUT) @@ -32,12 +37,145 @@ http_client, db_client, amqp_connection: amqp_connection.clone(), + blob_client: S2SAuthedBlobClient::new( + auth_service, + CONFIG.blob_service_url.clone(), + ), }) } pub async fn api_request( &self, request: FarcasterAPIRequest, + ) -> Result<(reqwest::StatusCode, String), error::Error> { + let endpoint = request.endpoint.clone(); + let (status, response_text) = self.api_request_inner(request).await?; + + if endpoint != "direct-cast-conversation-messages" { + return Ok((status, response_text)); + } + + let Some((updated_response, medias)) = Self::replace_urls(&response_text) + else { + return Ok((status, response_text)); + }; + info!( + "Found {} medias in payload. Attempting to mirror them to Blob Service.", + medias.len() + ); + + if let Err(err) = self.mirror_media_to_blob(medias).await { + if matches!(err, error::Error::Timeout) { + info!("Timeout when mirroring multimedia. Falling back to originals."); + return Ok((status, response_text)); + } + warn!("Failed to mirror multimedia to blob: {err:?}"); + return Ok((status, response_text)); + } + + Ok((status, updated_response)) + } + + async fn mirror_media_to_blob( + &self, + medias: Vec, + ) -> Result<(), error::Error> { + if medias.is_empty() { + return Ok(()); + } + let blob_client = self.blob_client.clone(); + let task = tokio::spawn(async move { + blob_client.auth().await?.mirror_multimedia(medias).await?; + Ok::<_, error::Error>(()) + }); + + match tokio::time::timeout(MEDIA_MIRROR_TIMEOUT, task).await { + Ok(Ok(task_result)) => task_result, + _ => Err(error::Error::Timeout), + } + } + + // This fn expects a response from FC `direct-cast-conversation-messages`; + // specifically, it expects a JSON in form: + // ``` + // { + // result: { + // messages: [ + // { + // metadata?: { + // medias?: [ + // { staticRaster: string, ... }, + // ... + // ] + // }, + // ... + // }, + // ... + // ] + // } + // } + // ``` + // It returns `None` when response doesn't match in any way + fn replace_urls( + response_text: &str, + ) -> Option<(String, Vec)> { + let media_base_url = CONFIG.blob_service_public_url.join("media/").ok()?; + + let mut found_medias: Vec = Vec::new(); + + let mut response: serde_json::Value = + serde_json::from_str(response_text).ok()?; + let messages = response + .get_mut("result")? + .get_mut("messages")? + .as_array_mut()?; + + for message in messages { + let Some(medias) = message + .get_mut("metadata") + .and_then(|metadata| metadata.get_mut("medias")) + .and_then(|m| m.as_array_mut()) + else { + continue; + }; + + for media in medias { + let original_media = media.clone(); + let Some(url) = media.get_mut("staticRaster") else { + continue; + }; + let Some(original_url) = url.as_str() else { + continue; + }; + if original_url.starts_with(media_base_url.as_str()) { + continue; + } + + let original_metadata = serde_json::to_string(&original_media).ok()?; + found_medias.push(MirroredMediaInfo { + url: original_url.to_string(), + original_metadata, + }); + + let urlencoded_original = urlencoding::encode(original_url); + let new_url = + media_base_url.join(&urlencoded_original).ok()?.to_string(); + tracing::trace!( + "Replaced media URL '{}' with '{}'", + original_url, + &new_url + ); + *url = serde_json::Value::String(new_url); + } + } + + let serialized_response = serde_json::to_string(&response).ok()?; + Some((serialized_response, found_medias)) + } + + pub async fn api_request_inner( + &self, + request: FarcasterAPIRequest, ) -> Result<(reqwest::StatusCode, String), error::Error> { debug!( "Received Farcaster {:?} {} {} request from {}", @@ -90,7 +228,9 @@ .headers(headers) .body(request.payload), APIMethod::STREAM => { - error!("STREAM method should be handled earlier, not in api_request"); + tracing::error!( + "STREAM method should be handled earlier, not in api_request" + ); return Err(error::Error::InvalidRequest); } }; @@ -137,3 +277,37 @@ Ok(()) } } + +#[cfg(test)] +mod test { + #[test] + fn test_media_blob_url_creation() { + // part normally handled by service config + let base_url_str = "https://blob.service.com"; + let base_url: reqwest::Url = base_url_str.parse().unwrap(); + + // media url part + let media_base_url = base_url.join("media/").unwrap(); + assert_eq!( + media_base_url.as_str(), + "https://blob.service.com/media/", + "Media base URL is incorrect" + ); + + // test urlencoding of the original + let original_url = "https://example.com/image.jpg"; + let original_url_urlencoded = urlencoding::encode(original_url); + assert_eq!( + original_url_urlencoded, + "https%3A%2F%2Fexample.com%2Fimage.jpg", + ); + + // test replaced URL part + let expected_url = + format!("{base_url_str}/media/{original_url_urlencoded}"); + let new_url = media_base_url + .join(&original_url_urlencoded) + .expect("join failed"); + assert_eq!(expected_url, new_url.as_str(), "URLs don't match"); + } +} diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -108,6 +108,12 @@ .await .map_err(|_| tonic::Status::failed_precondition("unexpected error"))?; + self + .client + .mark_messages_to_device_for_deletion(&message.device_id) + .await + .map_err(|_| tonic::Status::failed_precondition("unexpected error"))?; + let response = tonic::Response::new(Empty {}); Ok(response) } diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -59,6 +59,8 @@ config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; let db_client = database::DatabaseClient::new(&aws_config); + let auth_service = + comm_lib::auth::AuthService::new(&aws_config, &CONFIG.identity_endpoint); let amqp_connection = amqp::AmqpConnection::connect() .await .expect("Failed to create AMQP connection"); @@ -70,6 +72,7 @@ farcaster_api_url, db_client.clone(), &amqp_connection, + &auth_service, ) .expect("Unable to create Farcaster client"); @@ -96,6 +99,7 @@ token_config, &amqp_connection, grpc_client, + &auth_service, ); tokio::select! { diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -7,6 +7,7 @@ pub(crate) use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::token_connection::TokenConnection; use crate::{amqp_client::amqp::AmqpConnection, log::redact_sensitive_data}; +use comm_lib::auth::AuthService; use comm_lib::database::Error; use futures_util::future; use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; @@ -21,6 +22,7 @@ connections: HashMap, amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, + auth_service: AuthService, } impl TokenDistributor { @@ -29,6 +31,7 @@ config: TokenDistributorConfig, amqp_connection: &AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, + auth_service: &AuthService, ) -> Self { info!( "Initializing TokenDistributor - max_connections: {}, \ @@ -57,6 +60,7 @@ connections: HashMap::new(), amqp_connection: amqp_connection.clone(), grpc_client, + auth_service: auth_service.clone(), } } @@ -197,6 +201,7 @@ self.amqp_connection.clone(), cancel_token.clone(), self.grpc_client.clone(), + &self.auth_service, ); // Store the cancellation token diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -1,10 +1,15 @@ use crate::amqp_client::amqp::{ send_message_to_device, AmqpChannel, AmqpConnection, }; +use crate::config::CONFIG; +use crate::constants::MEDIA_MIRROR_TIMEOUT; use crate::database::DatabaseClient; use crate::log::redact_sensitive_data; use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; +use comm_lib::auth::AuthService; +use comm_lib::blob::client::S2SAuthedBlobClient; +use comm_lib::blob::types::http::MirroredMediaInfo; use futures_util::{SinkExt, StreamExt}; use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; use grpc_clients::identity::protos::auth::PeersDeviceListsRequest; @@ -15,7 +20,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; use tunnelbroker_messages::farcaster::{ - FarcasterMessage, FarcasterPayload, NewFarcasterMessage, + DirectCastMessage, FarcasterMessage, FarcasterPayload, NewFarcasterMessage, RefreshDirectCastConversationPayload, }; @@ -27,6 +32,7 @@ amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, amqp_channel: AmqpChannel, + blob_client: S2SAuthedBlobClient, } impl TokenConnection { @@ -38,6 +44,7 @@ amqp_connection: AmqpConnection, cancellation_token: CancellationToken, grpc_client: ChainedInterceptedServicesAuthClient, + auth_service: &AuthService, ) { let connection = Self { db: db.clone(), @@ -47,6 +54,10 @@ amqp_connection: amqp_connection.clone(), grpc_client, amqp_channel: AmqpChannel::new(&amqp_connection), + blob_client: S2SAuthedBlobClient::new( + auth_service, + CONFIG.blob_service_url.clone(), + ), }; tokio::spawn(async move { @@ -489,13 +500,28 @@ ); let conversation_id = &payload.conversation_id; - let direct_cast_message = &payload.message; debug!( "Processing refresh for conversation ID: {}", conversation_id ); + let mut direct_cast_message = payload.message.clone(); + if let Some(medias) = replace_media_urls(&mut direct_cast_message) { + if let Err(err) = self.mirror_media_to_blob(medias).await { + if matches!(err, crate::farcaster::error::Error::Timeout) { + info!( + "Timeout when mirroring multimedia. Falling back to originals." + ); + } else { + warn!("Failed to mirror multimedia to blob: {err:?}"); + } + direct_cast_message = payload.message.clone(); + } + } else { + direct_cast_message = payload.message.clone(); + } + let request = PeersDeviceListsRequest { user_ids: vec![self.user_id.clone()], }; @@ -514,7 +540,7 @@ .users_devices_platform_details .get(&self.user_id); let message = NewFarcasterMessage { - message: direct_cast_message.clone(), + message: direct_cast_message, }; if let Some(user_devices) = user_devices_option { for (device_id, platform_details) in @@ -544,4 +570,67 @@ Ok(()) } + + async fn mirror_media_to_blob( + &self, + medias: Vec, + ) -> Result<(), crate::farcaster::error::Error> { + if medias.is_empty() { + return Ok(()); + } + let blob_client = self.blob_client.clone(); + let task = tokio::spawn(async move { + blob_client.auth().await?.mirror_multimedia(medias).await?; + Ok::<_, crate::farcaster::error::Error>(()) + }); + + match tokio::time::timeout(MEDIA_MIRROR_TIMEOUT, task).await { + Ok(Ok(task_result)) => task_result, + _ => Ok(()), + } + } +} + +fn replace_media_urls( + message: &mut DirectCastMessage, +) -> Option> { + let media_base_url = CONFIG.blob_service_public_url.join("media/").ok()?; + + let mut found_medias: Vec = Vec::new(); + + let medias = message + .extra + .get_mut("metadata") + .and_then(|metadata| metadata.get_mut("medias")) + .and_then(|m| m.as_array_mut())?; + + for media in medias { + let original_media = media.clone(); + let Some(url) = media.get_mut("staticRaster") else { + continue; + }; + let Some(original_url) = url.as_str() else { + continue; + }; + if original_url.starts_with(media_base_url.as_str()) { + continue; + } + + let original_metadata = serde_json::to_string(&original_media).ok()?; + found_medias.push(MirroredMediaInfo { + url: original_url.to_string(), + original_metadata, + }); + + let urlencoded_original = urlencoding::encode(original_url); + let new_url = media_base_url.join(&urlencoded_original).ok()?.to_string(); + tracing::trace!( + "Replaced media URL '{}' with '{}'", + original_url, + &new_url + ); + *url = serde_json::Value::String(new_url); + } + + Some(found_medias) } diff --git a/shared/comm-lib/src/blob/client.rs b/shared/comm-lib/src/blob/client.rs --- a/shared/comm-lib/src/blob/client.rs +++ b/shared/comm-lib/src/blob/client.rs @@ -14,7 +14,9 @@ pub use reqwest::Url; use crate::{ - auth::{AuthorizationCredential, UserIdentity}, + auth::{ + AuthService, AuthServiceError, AuthorizationCredential, UserIdentity, + }, blob::types::http::{ AssignHolderRequest, AssignHolderResponse, RemoveHolderRequest, RemoveHoldersRequest, @@ -27,7 +29,8 @@ use super::types::{ http::{ AssignHoldersRequest, AssignHoldersResponse, BlobSizesRequest, - BlobSizesResponse, RemoveHoldersResponse, + BlobSizesResponse, MirrorMultimediaRequest, MirroredMediaInfo, + RemoveHoldersResponse, }, BlobInfo, }; @@ -71,6 +74,38 @@ } } +/// Service-to-service token authenticated [`BlobServiceClient`]. +/// Authentication is performed lazily each time when +/// [`S2SAuthedBlobClient::auth()`] is called. +#[cfg(feature = "aws")] +#[derive(Clone)] +pub struct S2SAuthedBlobClient { + blob_client: BlobServiceClient, + auth_service: AuthService, +} + +impl S2SAuthedBlobClient { + pub fn new( + auth_service: &AuthService, + blob_service_url: reqwest::Url, + ) -> Self { + Self { + blob_client: BlobServiceClient::new(blob_service_url), + auth_service: auth_service.clone(), + } + } + + pub fn unauth(&self) -> BlobServiceClient { + self.blob_client.clone() + } + + pub async fn auth(&self) -> Result { + let token = self.auth_service.get_services_token().await?; + let blob_client = self.blob_client.with_authentication(token.into()); + Ok(blob_client) + } +} + /// A client interface to Blob service. // /// The `BlobServiceClient` holds a connection pool internally, so it is advised that @@ -539,6 +574,38 @@ } }); } + + pub async fn mirror_multimedia( + &self, + medias: Vec, + ) -> BlobResult<()> { + self.ensure_caller_is_service("mirror_multimedia")?; + debug!("Mirror multimedia request"); + + let url = self + .blob_service_url + .join("/media/mirror") + .map_err(|err| BlobServiceError::URLError(err.to_string()))?; + + let payload = MirrorMultimediaRequest { medias }; + trace!("Request payload: {:?}", payload); + + let response = self + .request(Method::POST, url)? + .json(&payload) + .send() + .await?; + + if !response.status().is_success() { + return error_response_result(response).await; + } + + debug!( + "Request successful. Mirrored {} multimedia.", + payload.medias.len() + ); + Ok(()) + } } // private helper methods