Page MenuHomePhabricator

D5534.id18659.diff
No OneTemporary

D5534.id18659.diff

diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs
--- a/services/tunnelbroker/src/cxx_bridge.rs
+++ b/services/tunnelbroker/src/cxx_bridge.rs
@@ -61,5 +61,9 @@
sessionID: &str,
isOnline: bool,
) -> Result<()>;
+ pub fn updateSessionItemDeviceToken(
+ sessionID: &str,
+ newNotifToken: &str,
+ ) -> Result<()>;
}
}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
@@ -17,3 +17,4 @@
rust::Str notifyToken);
SessionItem getSessionItem(rust::Str sessionID);
void updateSessionItemIsOnline(rust::Str sessionID, bool isOnline);
+void updateSessionItemDeviceToken(rust::Str sessionID, rust::Str newNotifToken);
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
@@ -175,3 +175,11 @@
comm::network::database::DatabaseManager::getInstance()
.updateSessionItemIsOnline(std::string{sessionID}, isOnline);
}
+
+void updateSessionItemDeviceToken(
+ rust::Str sessionID,
+ rust::Str newNotifToken) {
+ comm::network::database::DatabaseManager::getInstance()
+ .updateSessionItemDeviceToken(
+ std::string{sessionID}, std::string{newNotifToken});
+}
diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs
--- a/services/tunnelbroker/src/server/mod.rs
+++ b/services/tunnelbroker/src/server/mod.rs
@@ -1,16 +1,19 @@
use super::constants;
use super::cxx_bridge::ffi::{
getSessionItem, newSessionHandler, sessionSignatureHandler,
- updateSessionItemIsOnline, GRPCStatusCodes,
+ updateSessionItemDeviceToken, updateSessionItemIsOnline, GRPCStatusCodes,
};
use anyhow::Result;
use futures::Stream;
use std::pin::Pin;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
-use tokio_stream::wrappers::ReceiverStream;
+use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
-use tracing::debug;
+use tracing::{debug, error};
+use tunnelbroker::message_to_tunnelbroker::Data::{
+ MessagesToSend, NewNotifyToken, ProcessedMessages,
+};
use tunnelbroker::tunnelbroker_service_server::{
TunnelbrokerService, TunnelbrokerServiceServer,
};
@@ -149,6 +152,54 @@
}
});
+ let mut input_stream = request.into_inner();
+ // Spawning asynchronous Tokio task for handling incoming messages from the client
+ tokio::spawn(async move {
+ while let Some(result) = input_stream.next().await {
+ if let Err(err) = result {
+ debug!("Error in input stream: {}", err);
+ break;
+ }
+ if let Some(message_data) = result.unwrap().data {
+ match message_data {
+ NewNotifyToken(new_token) => {
+ if let Err(err) =
+ updateSessionItemDeviceToken(&session_id, &new_token)
+ {
+ error!(
+ "Error in updating the device notification token in the database: {}",
+ err.what()
+ );
+ let writer_result = tx_writer(
+ &session_id,
+ &tx,
+ Err(
+ Status::internal(
+ "Error in updating the device notification token in the database"
+ )
+ ),
+ );
+ if let Err(err) = writer_result.await {
+ debug!(
+ "Failed to write internal error to a channel: {}",
+ err
+ );
+ };
+ }
+ }
+ MessagesToSend(_) => (),
+ ProcessedMessages(_) => (),
+ }
+ }
+ }
+ if let Err(err) = updateSessionItemIsOnline(&session_id, false) {
+ error!(
+ "Error in updating the session online state in the database: {}",
+ err.what()
+ );
+ }
+ });
+
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::MessagesStreamStream

File Metadata

Mime Type
text/plain
Expires
Mon, Dec 23, 2:38 AM (18 h, 36 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2693268
Default Alt Text
D5534.id18659.diff (4 KB)

Event Timeline