diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -12,7 +12,6 @@ pub mod websockets; use crate::farcaster::FarcasterClient; -use crate::notifs::NotifClient; use crate::token_distributor::{TokenDistributor, TokenDistributorConfig}; use amqp_client::amqp; use anyhow::{anyhow, Result}; @@ -65,8 +64,6 @@ .await .expect("Failed to create AMQP connection"); - let notif_client = NotifClient::new(db_client.clone()); - let farcaster_api_url = CONFIG.farcaster_api_url.clone(); let farcaster_client = FarcasterClient::new( farcaster_api_url, @@ -80,18 +77,23 @@ let websocket_server = websockets::run_server( db_client.clone(), &amqp_connection, - notif_client.clone(), farcaster_client.clone(), ); let auth_service = AuthService::new(&aws_config, &CONFIG.identity_endpoint); - let services_token = auth_service.get_services_token().await?; + let services_token = + auth_service.get_services_token().await.inspect_err(|err| { + tracing::error!("Failed to get services auth token: {:?}", err); + })?; let grpc_client = get_services_auth_client( &CONFIG.identity_endpoint, services_token.as_str().to_owned(), PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), ) - .await?; + .await + .inspect_err(|err| { + tracing::error!("Failed to create Identity gRPC client: {:?}", err); + })?; let token_config = TokenDistributorConfig::default(); let mut token_distributor = TokenDistributor::new( diff --git a/services/tunnelbroker/src/notifs/base.rs b/services/tunnelbroker/src/notifs/base.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/notifs/base.rs @@ -0,0 +1,147 @@ +use crate::config::CONFIG; +use crate::constants::error_types; +use crate::notifs::apns::{APNsClient, APNsNotif}; +use crate::notifs::fcm::firebase_message::FCMMessage; +use crate::notifs::fcm::FCMClient; +use crate::notifs::web_push::{WebPushClient, WebPushNotif}; +use crate::notifs::wns::{WNSClient, WNSNotif}; +use ::web_push::WebPushError; +use tracing::{error, info}; + +#[derive(derive_more::From)] +pub enum Notif { + APNs(APNsNotif), + Fcm(FCMMessage), + WebPush(WebPushNotif), + Wns(WNSNotif), +} + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +pub enum NotifClientError { + APNs(super::apns::error::Error), + Fcm(super::fcm::error::Error), + WebPush(super::web_push::error::Error), + Wns(super::wns::error::Error), + #[display(fmt = "Notif client not initialized: {}", _0)] + MissingClient(#[error(ignore)] super::NotifType), +} + +impl NotifClientError { + pub fn should_invalidate_token(&self) -> bool { + use super::apns::error::Error as APNS; + use super::fcm::error::Error as FCM; + use super::web_push::error::Error as WEB; + use super::wns::error::Error as WNS; + match self { + Self::APNs(error) => { + matches!(error, APNS::ResponseError(body) if body.reason.should_invalidate_token()) + } + Self::Fcm(FCM::FCMError(e)) => e.should_invalidate_token(), + Self::WebPush(WEB::WebPush(web_err)) => { + use WebPushError::{EndpointNotFound, EndpointNotValid}; + matches!(web_err, EndpointNotValid(_) | EndpointNotFound(_)) + } + Self::Wns(WNS::WNSNotification(e)) => e.should_invalidate_token(), + _ => false, + } + } +} + +/// Base internal structure for direct communicationn with notif providers. +/// Does not handle device token management. +#[derive(Clone)] +pub(super) struct BaseNotifClient { + apns: Option, + fcm: Option, + web_push: Option, + wns: Option, +} + +macro_rules! create_client { + ($client:ident, $config:expr, $name:expr, $error_type: ident) => {{ + let created_client = match $config { + Some(config) => match $client::new(&config) { + Ok(client_instance) => { + info!("{} client created successfully", $name); + Some(client_instance) + } + Err(err) => { + error!( + errorType = $error_type, + "Error creating {} client: {}", $name, err + ); + None + } + }, + None => { + error!(errorType = $error_type, "{} config is missing", $name); + None + } + }; + + created_client + }}; +} + +impl BaseNotifClient { + pub(super) fn new() -> Self { + use error_types::{APNS_ERROR, FCM_ERROR, WEB_PUSH_ERROR, WNS_ERROR}; + + let apns_cfg = &CONFIG.apns_config; + let fcm_cfg = &CONFIG.fcm_config; + let web_cfg = &CONFIG.web_push_config; + let wns_cfg = &CONFIG.wns_config; + + let apns = create_client!(APNsClient, apns_cfg, "APNs", APNS_ERROR); + let fcm = create_client!(FCMClient, fcm_cfg, "FCM", FCM_ERROR); + let wns = create_client!(WNSClient, wns_cfg, "WNS", WNS_ERROR); + let web_push = + create_client!(WebPushClient, web_cfg, "Web Push", WEB_PUSH_ERROR); + + Self { + apns, + fcm, + web_push, + wns, + } + } + + pub(super) async fn send_notif( + &self, + notif: Notif, + ) -> Result<(), NotifClientError> { + use super::NotifType as ClientType; + use NotifClientError::MissingClient; + + match notif { + Notif::APNs(apns_notif) => { + let Some(apns) = &self.apns else { + return Err(MissingClient(ClientType::APNs)); + }; + apns.send(apns_notif).await?; + } + Notif::Fcm(fcm_message) => { + let Some(fcm) = &self.fcm else { + return Err(MissingClient(ClientType::FCM)); + }; + fcm.send(fcm_message).await?; + } + Notif::WebPush(web_push_notif) => { + let Some(web_client) = &self.web_push else { + return Err(MissingClient(ClientType::WebPush)); + }; + web_client.send(web_push_notif).await?; + } + + Notif::Wns(wns_notif) => { + let Some(wns) = &self.wns else { + return Err(MissingClient(ClientType::WNS)); + }; + wns.send(wns_notif).await?; + } + }; + Ok(()) + } +} diff --git a/services/tunnelbroker/src/notifs/fcm/firebase_message.rs b/services/tunnelbroker/src/notifs/fcm/firebase_message.rs --- a/services/tunnelbroker/src/notifs/fcm/firebase_message.rs +++ b/services/tunnelbroker/src/notifs/fcm/firebase_message.rs @@ -38,7 +38,7 @@ } impl AndroidMessagePriority { - pub fn from_str(value: &str) -> Option { + pub fn from_raw(value: &str) -> Option { match value { "NORMAL" => Some(AndroidMessagePriority::Normal), "HIGH" => Some(AndroidMessagePriority::High), diff --git a/services/tunnelbroker/src/notifs/mod.rs b/services/tunnelbroker/src/notifs/mod.rs --- a/services/tunnelbroker/src/notifs/mod.rs +++ b/services/tunnelbroker/src/notifs/mod.rs @@ -1,467 +1,18 @@ -use crate::amqp_client::AmqpClient; -use crate::config::CONFIG; -use crate::constants::error_types; -use crate::database::DatabaseClient; -use crate::notifs::apns::headers::NotificationHeaders; -use crate::notifs::apns::{APNsClient, APNsNotif}; -use crate::notifs::fcm::firebase_message::{ - AndroidConfig, AndroidMessagePriority, FCMMessage, -}; -use crate::notifs::fcm::FCMClient; -use crate::notifs::web_push::error::Error::WebPush as NotifsWebPushError; -use crate::notifs::web_push::{WebPushClient, WebPushNotif}; -use crate::notifs::wns::error::Error::WNSNotification as NotifsWNSError; -use crate::notifs::wns::{WNSClient, WNSNotif}; -use crate::websockets::session::SessionError; -use ::web_push::WebPushError; -use tracing::{debug, error, info}; -use tunnelbroker_messages::bad_device_token::BadDeviceToken; -use tunnelbroker_messages::MessageSentStatus; -use tunnelbroker_messages::{MessageToDeviceRequest, Platform}; - pub mod apns; pub mod fcm; pub mod web_push; pub mod wns; -#[derive(PartialEq)] -pub enum NotifClientType { +mod base; +mod session_client; + +pub use base::{Notif, NotifClientError}; +pub use session_client::SessionNotifClient; + +#[derive(Debug, derive_more::Display, PartialEq)] +pub enum NotifType { APNs, FCM, WebPush, WNS, } - -impl NotifClientType { - pub fn supported_platform(&self, platform: Platform) -> bool { - match self { - NotifClientType::APNs => { - platform == Platform::IOS || platform == Platform::MacOS - } - NotifClientType::FCM => platform == Platform::Android, - NotifClientType::WebPush => platform == Platform::Web, - NotifClientType::WNS => platform == Platform::Windows, - } - } -} - -#[derive(Clone)] -pub struct NotifClient { - apns: Option, - fcm: Option, - web_push: Option, - wns: Option, - db_client: DatabaseClient, -} - -impl NotifClient { - pub fn new(db_client: DatabaseClient) -> NotifClient { - let apns_config = CONFIG.apns_config.clone(); - - let apns = match apns_config { - Some(config) => match APNsClient::new(&config) { - Ok(apns_client) => { - info!("APNs client created successfully"); - Some(apns_client) - } - Err(err) => { - error!( - errorType = error_types::APNS_ERROR, - "Error creating APNs client: {}", err - ); - None - } - }, - None => { - error!( - errorType = error_types::APNS_ERROR, - "APNs config is missing" - ); - None - } - }; - - let fcm_config = CONFIG.fcm_config.clone(); - let fcm = match fcm_config { - Some(config) => match FCMClient::new(&config) { - Ok(fcm_client) => { - info!("FCM client created successfully"); - Some(fcm_client) - } - Err(err) => { - error!( - errorType = error_types::FCM_ERROR, - "Error creating FCM client: {}", err - ); - None - } - }, - None => { - error!(errorType = error_types::FCM_ERROR, "FCM config is missing"); - None - } - }; - - let web_push_config = CONFIG.web_push_config.clone(); - let web_push = match web_push_config { - Some(config) => match WebPushClient::new(&config) { - Ok(web_client) => { - info!("Web Push client created successfully"); - Some(web_client) - } - Err(err) => { - error!( - errorType = error_types::WEB_PUSH_ERROR, - "Error creating Web Push client: {}", err - ); - None - } - }, - None => { - error!( - errorType = error_types::WEB_PUSH_ERROR, - "Web Push config is missing" - ); - None - } - }; - - let wns_config = CONFIG.wns_config.clone(); - let wns = match wns_config { - Some(config) => match WNSClient::new(&config) { - Ok(wns_client) => { - info!("WNS client created successfully"); - Some(wns_client) - } - Err(err) => { - error!( - errorType = error_types::WNS_ERROR, - "Error creating WNS client: {}", err - ); - None - } - }, - None => { - error!(errorType = error_types::WNS_ERROR, "WNS config is missing"); - None - } - }; - - NotifClient { - apns, - fcm, - web_push, - wns, - db_client, - } - } - - async fn invalidate_device_token( - &self, - device_id: String, - invalidated_token: String, - amqp_client: &mut AmqpClient, - ) -> Result<(), SessionError> { - let bad_device_token_message = BadDeviceToken { invalidated_token }; - let payload = serde_json::to_string(&bad_device_token_message)?; - let message_request = MessageToDeviceRequest { - client_message_id: uuid::Uuid::new_v4().to_string(), - device_id: device_id.to_string(), - payload, - }; - - amqp_client - .handle_message_to_device(&message_request) - .await?; - - self - .db_client - .mark_device_token_as_invalid(&device_id) - .await - .map_err(SessionError::DatabaseError)?; - - Ok(()) - } - - async fn get_device_token( - &self, - device_id: String, - client: NotifClientType, - ) -> Result { - let db_token = self - .db_client - .get_device_token(&device_id) - .await - .map_err(SessionError::DatabaseError)?; - - match db_token { - Some(token) => { - if let Some(platform) = token.platform { - if !client.supported_platform(platform) { - return Err(SessionError::InvalidNotifProvider); - } - } - if token.token_invalid { - Err(SessionError::InvalidDeviceToken) - } else { - Ok(token.device_token) - } - } - None => Err(SessionError::MissingDeviceToken), - } - } - - pub async fn send_apns_notif( - &self, - notif: tunnelbroker_messages::notif::APNsNotif, - amqp_client: &mut AmqpClient, - ) -> Option { - debug!("Received APNs notif for {}", notif.device_id); - - let Ok(headers) = - serde_json::from_str::(¬if.headers) - else { - return Some(MessageSentStatus::SerializationError(notif.headers)); - }; - - let device_token = match self - .get_device_token(notif.device_id.clone(), NotifClientType::APNs) - .await - { - Ok(token) => token, - Err(e) => { - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(e), - )); - } - }; - - let apns_notif = APNsNotif { - device_token: device_token.clone(), - headers, - payload: notif.payload, - }; - - if let Some(apns) = self.apns.clone() { - let response = apns.send(apns_notif).await; - if let Err(apns::error::Error::ResponseError(body)) = &response { - if body.reason.should_invalidate_token() { - if let Err(e) = self - .invalidate_device_token( - notif.device_id, - device_token.clone(), - amqp_client, - ) - .await - { - error!( - errorType = error_types::DDB_ERROR, - "Error invalidating device token {}: {:?}", device_token, e - ); - }; - } - } - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - response, - )); - } - - Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(SessionError::MissingAPNsClient), - )) - } - - pub async fn send_fcm_notif( - &self, - notif: tunnelbroker_messages::notif::FCMNotif, - amqp_client: &mut AmqpClient, - ) -> Option { - debug!("Received FCM notif for {}", notif.device_id); - - let Some(priority) = AndroidMessagePriority::from_str(¬if.priority) - else { - return Some(MessageSentStatus::SerializationError(notif.priority)); - }; - - let Ok(data) = serde_json::from_str(¬if.data) else { - return Some(MessageSentStatus::SerializationError(notif.data)); - }; - - let device_token = match self - .get_device_token(notif.device_id.clone(), NotifClientType::FCM) - .await - { - Ok(token) => token, - Err(e) => { - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(e), - )) - } - }; - - let fcm_message = FCMMessage { - data, - token: device_token.to_string(), - android: AndroidConfig { priority }, - }; - - if let Some(fcm) = self.fcm.clone() { - let result = fcm.send(fcm_message).await; - - if let Err(crate::notifs::fcm::error::Error::FCMError(fcm_error)) = - &result - { - if fcm_error.should_invalidate_token() { - if let Err(e) = self - .invalidate_device_token( - notif.device_id, - device_token.clone(), - amqp_client, - ) - .await - { - error!( - errorType = error_types::DDB_ERROR, - "Error invalidating device token {}: {:?}", device_token, e - ); - }; - } - } - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - result, - )); - } - - Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(SessionError::MissingFCMClient), - )) - } - - pub async fn send_web_notif( - &self, - notif: tunnelbroker_messages::notif::WebPushNotif, - amqp_client: &mut AmqpClient, - ) -> Option { - debug!("Received WebPush notif for {}", notif.device_id); - - let Some(web_push_client) = self.web_push.clone() else { - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(SessionError::MissingWebPushClient), - )); - }; - - let device_token = match self - .get_device_token(notif.device_id.clone(), NotifClientType::WebPush) - .await - { - Ok(token) => token, - Err(e) => { - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(e), - )) - } - }; - - let web_push_notif = WebPushNotif { - device_token: device_token.clone(), - payload: notif.payload, - }; - - let result = web_push_client.send(web_push_notif).await; - if let Err(NotifsWebPushError(web_push_error)) = &result { - if matches!( - web_push_error, - WebPushError::EndpointNotValid(_) | WebPushError::EndpointNotFound(_) - ) { - if let Err(e) = self - .invalidate_device_token( - notif.device_id, - device_token.clone(), - amqp_client, - ) - .await - { - error!( - errorType = error_types::DDB_ERROR, - "Error invalidating device token {}: {:?}", device_token, e - ); - }; - } else { - error!( - errorType = error_types::WEB_PUSH_ERROR, - "Failed sending Web Push notification to: {}. Error: {}", - device_token, - web_push_error - ); - } - } - Some(MessageSentStatus::from_result( - ¬if.client_message_id, - result, - )) - } - - pub async fn send_wns_notif( - &self, - notif: tunnelbroker_messages::notif::WNSNotif, - amqp_client: &mut AmqpClient, - ) -> Option { - debug!("Received WNS notif for {}", notif.device_id); - - let Some(wns_client) = self.wns.clone() else { - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(SessionError::MissingWNSClient), - )); - }; - - let device_token = match self - .get_device_token(notif.device_id.clone(), NotifClientType::WNS) - .await - { - Ok(token) => token, - Err(e) => { - return Some(MessageSentStatus::from_result( - ¬if.client_message_id, - Err(e), - )) - } - }; - - let wns_notif = WNSNotif { - device_token: device_token.clone(), - payload: notif.payload, - }; - - let result = wns_client.send(wns_notif).await; - if let Err(NotifsWNSError(err)) = &result { - if err.should_invalidate_token() { - if let Err(e) = self - .invalidate_device_token( - notif.device_id, - device_token.clone(), - amqp_client, - ) - .await - { - error!( - errorType = error_types::DDB_ERROR, - "Error invalidating device token {}: {:?}", device_token, e - ); - }; - } - } - Some(MessageSentStatus::from_result( - ¬if.client_message_id, - result, - )) - } -} diff --git a/services/tunnelbroker/src/notifs/session_client.rs b/services/tunnelbroker/src/notifs/session_client.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/notifs/session_client.rs @@ -0,0 +1,299 @@ +use crate::amqp_client::AmqpClient; +use crate::constants::error_types; +use crate::database::DatabaseClient; +use crate::notifs::apns::headers::NotificationHeaders; +use crate::notifs::apns::APNsNotif; +use crate::notifs::fcm::firebase_message::{ + AndroidConfig, AndroidMessagePriority, FCMMessage, +}; +use crate::notifs::web_push::WebPushNotif; +use crate::notifs::wns::WNSNotif; +use crate::websockets::session::SessionError; +use tracing::{debug, error}; +use tunnelbroker_messages::bad_device_token::BadDeviceToken; +use tunnelbroker_messages::MessageToDeviceRequest; +use tunnelbroker_messages::{MessageSentStatus, Platform}; + +use super::base::BaseNotifClient; +use super::{NotifClientError, NotifType}; + +impl NotifType { + pub fn supported_platform(&self, platform: Platform) -> bool { + match self { + NotifType::APNs => { + platform == Platform::IOS || platform == Platform::MacOS + } + NotifType::FCM => platform == Platform::Android, + NotifType::WebPush => platform == Platform::Web, + NotifType::WNS => platform == Platform::Windows, + } + } +} + +/// Notification client isntance intended to be used in +/// websocket sessions with client devices. +#[derive(Clone)] +pub struct SessionNotifClient { + inner: BaseNotifClient, + db_client: DatabaseClient, +} + +impl SessionNotifClient { + pub fn new(db_client: DatabaseClient) -> SessionNotifClient { + SessionNotifClient { + inner: BaseNotifClient::new(), + db_client, + } + } + + async fn invalidate_device_token( + &self, + device_id: String, + invalidated_token: String, + amqp_client: &mut AmqpClient, + ) -> Result<(), SessionError> { + let bad_device_token_message = BadDeviceToken { invalidated_token }; + let payload = serde_json::to_string(&bad_device_token_message)?; + let message_request = MessageToDeviceRequest { + client_message_id: uuid::Uuid::new_v4().to_string(), + device_id: device_id.to_string(), + payload, + }; + + amqp_client + .handle_message_to_device(&message_request) + .await?; + + self + .db_client + .mark_device_token_as_invalid(&device_id) + .await + .map_err(SessionError::DatabaseError)?; + + Ok(()) + } + + async fn get_device_token( + &self, + device_id: String, + client: NotifType, + ) -> Result { + let db_token = self + .db_client + .get_device_token(&device_id) + .await + .map_err(SessionError::DatabaseError)?; + + match db_token { + Some(token) => { + if let Some(platform) = token.platform { + if !client.supported_platform(platform) { + return Err(SessionError::InvalidNotifProvider); + } + } + if token.token_invalid { + Err(SessionError::InvalidDeviceToken) + } else { + Ok(token.device_token) + } + } + None => Err(SessionError::MissingDeviceToken), + } + } + + async fn handle_invalid_token( + &self, + result: &Result, + amqp_client: &mut AmqpClient, + device_id: String, + device_token: String, + ) { + let Err(e) = &result else { + return; + }; + + if !e.should_invalidate_token() { + return; + } + + if let Err(e) = self + .invalidate_device_token(device_id, device_token.clone(), amqp_client) + .await + { + error!( + errorType = error_types::DDB_ERROR, + "Error invalidating device token {}: {:?}", device_token, e + ); + }; + } + + pub async fn send_apns_notif( + &self, + notif: tunnelbroker_messages::notif::APNsNotif, + amqp_client: &mut AmqpClient, + ) -> Option { + debug!("Received APNs notif for {}", notif.device_id); + + let Ok(headers) = + serde_json::from_str::(¬if.headers) + else { + return Some(MessageSentStatus::SerializationError(notif.headers)); + }; + + let device_token = match self + .get_device_token(notif.device_id.clone(), NotifType::APNs) + .await + { + Ok(token) => token, + Err(e) => { + return Some(MessageSentStatus::from_result( + ¬if.client_message_id, + Err(e), + )); + } + }; + + let apns_notif = APNsNotif { + device_token: device_token.clone(), + headers, + payload: notif.payload, + }; + + let result = self.inner.send_notif(apns_notif.into()).await; + self + .handle_invalid_token(&result, amqp_client, notif.device_id, device_token) + .await; + + Self::return_notif_sent_status(¬if.client_message_id, result) + } + + pub async fn send_fcm_notif( + &self, + notif: tunnelbroker_messages::notif::FCMNotif, + amqp_client: &mut AmqpClient, + ) -> Option { + debug!("Received FCM notif for {}", notif.device_id); + + let Some(priority) = AndroidMessagePriority::from_raw(¬if.priority) + else { + return Some(MessageSentStatus::SerializationError(notif.priority)); + }; + + let Ok(data) = serde_json::from_str(¬if.data) else { + return Some(MessageSentStatus::SerializationError(notif.data)); + }; + + let device_token = match self + .get_device_token(notif.device_id.clone(), NotifType::FCM) + .await + { + Ok(token) => token, + Err(e) => { + return Some(MessageSentStatus::from_result( + ¬if.client_message_id, + Err(e), + )) + } + }; + + let fcm_message = FCMMessage { + data, + token: device_token.to_string(), + android: AndroidConfig { priority }, + }; + + let result = self.inner.send_notif(fcm_message.into()).await; + self + .handle_invalid_token(&result, amqp_client, notif.device_id, device_token) + .await; + + Self::return_notif_sent_status(¬if.client_message_id, result) + } + + pub async fn send_web_notif( + &self, + notif: tunnelbroker_messages::notif::WebPushNotif, + amqp_client: &mut AmqpClient, + ) -> Option { + debug!("Received WebPush notif for {}", notif.device_id); + + let device_token = match self + .get_device_token(notif.device_id.clone(), NotifType::WebPush) + .await + { + Ok(token) => token, + Err(e) => { + return Some(MessageSentStatus::from_result( + ¬if.client_message_id, + Err(e), + )) + } + }; + + let web_push_notif = WebPushNotif { + device_token: device_token.clone(), + payload: notif.payload, + }; + + let result = self.inner.send_notif(web_push_notif.into()).await; + self + .handle_invalid_token(&result, amqp_client, notif.device_id, device_token) + .await; + + Self::return_notif_sent_status(¬if.client_message_id, result) + } + + pub async fn send_wns_notif( + &self, + notif: tunnelbroker_messages::notif::WNSNotif, + amqp_client: &mut AmqpClient, + ) -> Option { + debug!("Received WNS notif for {}", notif.device_id); + + let device_token = match self + .get_device_token(notif.device_id.clone(), NotifType::WNS) + .await + { + Ok(token) => token, + Err(e) => { + return Some(MessageSentStatus::from_result( + ¬if.client_message_id, + Err(e), + )) + } + }; + + let wns_notif = WNSNotif { + device_token: device_token.clone(), + payload: notif.payload, + }; + + let result = self.inner.send_notif(wns_notif.into()).await; + self + .handle_invalid_token(&result, amqp_client, notif.device_id, device_token) + .await; + + Self::return_notif_sent_status(¬if.client_message_id, result) + } + + fn return_notif_sent_status( + client_message_id: &str, + result: Result<(), NotifClientError>, + ) -> Option { + let Err(NotifClientError::MissingClient(client_type)) = &result else { + return Some(MessageSentStatus::from_result(client_message_id, result)); + }; + + let session_error = match client_type { + NotifType::APNs => SessionError::MissingAPNsClient, + NotifType::FCM => SessionError::MissingFCMClient, + NotifType::WebPush => SessionError::MissingWebPushClient, + NotifType::WNS => SessionError::MissingWNSClient, + }; + + Some(MessageSentStatus::from_result( + client_message_id, + Err(session_error), + )) + } +} diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -3,7 +3,7 @@ use crate::amqp::AmqpConnection; use crate::constants::{SOCKET_HEARTBEAT_TIMEOUT, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::DatabaseClient; -use crate::notifs::NotifClient; +use crate::notifs::SessionNotifClient; use crate::websockets::session::SessionError; use crate::FarcasterClient; use crate::CONFIG; @@ -43,7 +43,7 @@ addr: SocketAddr, amqp: AmqpConnection, db_client: DatabaseClient, - notif_client: NotifClient, + notif_client: SessionNotifClient, farcaster_client: FarcasterClient, } @@ -113,7 +113,6 @@ pub async fn run_server( db_client: DatabaseClient, amqp_connection: &AmqpConnection, - notif_client: NotifClient, farcaster_client: FarcasterClient, ) -> Result<(), BoxedError> { let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") @@ -128,6 +127,7 @@ while let Ok((stream, addr)) = listener.accept().await { let amqp = amqp_connection.clone(); + let notif_client = SessionNotifClient::new(db_client.clone()); let connection = http .serve_connection( stream, @@ -135,7 +135,7 @@ amqp, db_client: db_client.clone(), addr, - notif_client: notif_client.clone(), + notif_client, farcaster_client: farcaster_client.clone(), }, ) @@ -180,7 +180,7 @@ addr: SocketAddr, db_client: DatabaseClient, amqp_connection: AmqpConnection, - notif_client: NotifClient, + notif_client: SessionNotifClient, farcaster_client: FarcasterClient, ) { debug!("Incoming connection from: {}", addr); @@ -337,7 +337,7 @@ frame: Message, db_client: DatabaseClient, amqp: AmqpConnection, - notif_client: NotifClient, + notif_client: SessionNotifClient, farcaster_client: FarcasterClient, ) -> Result, ErrorWithStreamHandle> { let device_info = match get_device_info_from_frame(frame).await { diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -18,7 +18,7 @@ use crate::amqp_client::AmqpClient; use crate::database::{self, DatabaseClient}; use crate::farcaster::FarcasterClient; -use crate::notifs::NotifClient; +use crate::notifs::SessionNotifClient; use crate::{farcaster, identity}; use tunnelbroker_messages::farcaster::{ FarcasterAPIRequest, FarcasterAPIResponse, FarcasterAPIResponseData, @@ -46,7 +46,7 @@ pub device_info: DeviceInfo, // Each websocket has an AMQP connection associated with a particular device amqp_client: AmqpClient, - notif_client: NotifClient, + notif_client: SessionNotifClient, farcaster_client: FarcasterClient, } @@ -168,7 +168,7 @@ db_client: DatabaseClient, device_info: DeviceInfo, amqp: AmqpConnection, - notif_client: NotifClient, + notif_client: SessionNotifClient, farcaster_client: FarcasterClient, ) -> Result> { let amqp_client =