diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -13,18 +13,21 @@ use tokio::io::AsyncWrite; use tracing::{debug, error, info, trace}; +use crate::amqp_client::AmqpClient; +use crate::database::{self, DatabaseClient}; +use crate::farcaster::FarcasterClient; +use crate::notifs::NotifClient; +use crate::{farcaster, identity}; +use tunnelbroker_messages::farcaster::{ + FarcasterAPIRequest, FarcasterAPIResponse, FarcasterAPIResponseData, + FarcasterAPIResponseError, +}; use tunnelbroker_messages::{ message_to_device_request_status::MessageSentStatus, session::DeviceTypes, DeviceToTunnelbrokerMessage, Heartbeat, MessageToTunnelbroker, }; use tunnelbroker_messages::{DeviceToTunnelbrokerRequestStatus, Platform}; -use crate::amqp_client::AmqpClient; -use crate::database::{self, DatabaseClient}; -use crate::farcaster::FarcasterClient; -use crate::identity; -use crate::notifs::NotifClient; - #[derive(Clone)] pub struct DeviceInfo { pub device_id: String, @@ -255,13 +258,21 @@ } }; - let message_status = - self.handle_message_from_device(serialized_message).await?; + match serialized_message { + DeviceToTunnelbrokerMessage::FarcasterAPIRequest(request) => { + let response = self.handle_websocket_farcaster_request(request).await; + serde_json::to_string(&response).ok() + } + message_from_device => { + let message_status = + self.handle_message_from_device(message_from_device).await?; - let request_status = DeviceToTunnelbrokerRequestStatus { - client_message_ids: vec![message_status], - }; - serde_json::to_string(&request_status).ok() + let request_status = DeviceToTunnelbrokerRequestStatus { + client_message_ids: vec![message_status], + }; + serde_json::to_string(&request_status).ok() + } + } } pub async fn handle_message_from_device( @@ -400,6 +411,49 @@ } } + async fn handle_websocket_farcaster_request( + &mut self, + request: FarcasterAPIRequest, + ) -> FarcasterAPIResponse { + let request_id = request.request_id.clone(); + + if !self.device_info.is_authenticated { + debug!( + "Unauthenticated device {} tried to call Farcaster API. Aborting.", + self.device_info.device_id + ); + return FarcasterAPIResponse { + request_id, + response: FarcasterAPIResponseData::Unauthenticated, + }; + } + + let response = match self.farcaster_client.api_request(request).await { + Ok((status, response)) => { + if status.is_success() { + FarcasterAPIResponseData::Success(response) + } else { + FarcasterAPIResponseData::ErrorResponse(FarcasterAPIResponseError { + status: status.as_u16() as u32, + message: response, + }) + } + } + Err(farcaster::error::Error::MissingFarcasterToken) => { + FarcasterAPIResponseData::MissingFarcasterDCsToken + } + Err(farcaster::error::Error::InvalidRequest) => { + FarcasterAPIResponseData::InvalidRequest + } + Err(err) => FarcasterAPIResponseData::Error(err.to_string()), + }; + + FarcasterAPIResponse { + request_id, + response, + } + } + pub async fn next_amqp_message( &mut self, ) -> Option> {