diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -61,5 +61,9 @@ sessionID: &str, isOnline: bool, ) -> Result<()>; + pub fn updateSessionItemDeviceToken( + sessionID: &str, + newNotifToken: &str, + ) -> Result<()>; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -17,3 +17,4 @@ rust::Str notifyToken); SessionItem getSessionItem(rust::Str sessionID); void updateSessionItemIsOnline(rust::Str sessionID, bool isOnline); +void updateSessionItemDeviceToken(rust::Str sessionID, rust::Str newNotifToken); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -175,3 +175,11 @@ comm::network::database::DatabaseManager::getInstance() .updateSessionItemIsOnline(std::string{sessionID}, isOnline); } + +void updateSessionItemDeviceToken( + rust::Str sessionID, + rust::Str newNotifToken) { + comm::network::database::DatabaseManager::getInstance() + .updateSessionItemDeviceToken( + std::string{sessionID}, std::string{newNotifToken}); +} diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,16 +1,19 @@ use super::constants; use super::cxx_bridge::ffi::{ getSessionItem, newSessionHandler, sessionSignatureHandler, - updateSessionItemIsOnline, GRPCStatusCodes, + updateSessionItemDeviceToken, updateSessionItemIsOnline, GRPCStatusCodes, }; use anyhow::Result; use futures::Stream; use std::pin::Pin; use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; -use tracing::debug; +use tracing::{debug, error}; +use tunnelbroker::message_to_tunnelbroker::Data::{ + MessagesToSend, NewNotifyToken, ProcessedMessages, +}; use tunnelbroker::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; @@ -149,6 +152,54 @@ } }); + let mut input_stream = request.into_inner(); + // Spawning asynchronous Tokio task for handling incoming messages from the client + tokio::spawn(async move { + while let Some(result) = input_stream.next().await { + if let Err(err) = result { + debug!("Error in input stream: {}", err); + break; + } + if let Some(message_data) = result.unwrap().data { + match message_data { + NewNotifyToken(new_token) => { + if let Err(err) = + updateSessionItemDeviceToken(&session_id, &new_token) + { + error!( + "Error in updating the device notification token in the database: {}", + err.what() + ); + let writer_result = tx_writer( + &session_id, + &tx, + Err( + Status::internal( + "Error in updating the device notification token in the database" + ) + ), + ); + if let Err(err) = writer_result.await { + debug!( + "Failed to write internal error to a channel: {}", + err + ); + }; + } + } + MessagesToSend(_) => (), + ProcessedMessages(_) => (), + } + } + } + if let Err(err) = updateSessionItemIsOnline(&session_id, false) { + error!( + "Error in updating the session online state in the database: {}", + err.what() + ); + } + }); + let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::MessagesStreamStream