diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -40,5 +40,9 @@ sessionID: &str, isOnline: bool, ) -> Result<()>; + pub fn updateSessionItemDeviceToken( + sessionID: &str, + newNotifToken: &str, + ) -> Result<()>; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -15,3 +15,4 @@ rust::Str notifyToken); SessionItem getSessionItem(rust::Str sessionID); void updateSessionItemIsOnline(rust::Str sessionID, bool isOnline); +void updateSessionItemDeviceToken(rust::Str sessionID, rust::Str newNotifToken); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -161,3 +161,11 @@ comm::network::database::DatabaseManager::getInstance() .updateSessionItemIsOnline(std::string{sessionID}, isOnline); } + +void updateSessionItemDeviceToken( + rust::Str sessionID, + rust::Str newNotifToken) { + comm::network::database::DatabaseManager::getInstance() + .updateSessionItemDeviceToken( + std::string{sessionID}, std::string{newNotifToken}); +} diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,15 +1,18 @@ use super::constants; use super::cxx_bridge::ffi::{ getSessionItem, newSessionHandler, sessionSignatureHandler, - updateSessionItemIsOnline, + updateSessionItemDeviceToken, updateSessionItemIsOnline, }; use futures::Stream; use std::pin::Pin; use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; use tracing::debug; +use tunnelbroker::message_to_tunnelbroker::Data::{ + MessagesToSend, NewNotifyToken, ProcessedMessages, +}; use tunnelbroker::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; @@ -147,6 +150,36 @@ } }); + let mut input_stream = request.into_inner(); + // Spawning asynchronous Tokio task for handling incoming messages from the client + tokio::spawn(async move { + while let Some(result) = input_stream.next().await { + if let Err(err) = result { + debug!("Error in input stream: {}", err); + break; + } + if let Some(message_data) = result.unwrap().data { + match message_data { + NewNotifyToken(new_token) => { + if let Err(err) = + updateSessionItemDeviceToken(&session_id, &new_token) + { + eprintln!( + "Error on updating the device notification token: {}", + err.what() + ); + } + } + MessagesToSend(_) => (), + ProcessedMessages(_) => (), + } + } + } + if let Err(err) = updateSessionItemIsOnline(&session_id, false) { + eprintln!("Error on updating the session online state: {}", err.what()); + } + }); + let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::MessagesStreamStream