Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3364878
D5529.id18231.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
3 KB
Referenced Files
None
Subscribers
None
D5529.id18231.diff
View Options
diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs
--- a/services/tunnelbroker/src/cxx_bridge.rs
+++ b/services/tunnelbroker/src/cxx_bridge.rs
@@ -36,5 +36,9 @@
notifyToken: &str,
) -> NewSessionResult;
pub fn getSessionItem(sessionID: &str) -> Result<SessionItem>;
+ pub fn updateSessionItemIsOnline(
+ sessionID: &str,
+ isOnline: bool,
+ ) -> Result<()>;
}
}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
@@ -14,3 +14,4 @@
rust::Str deviceOS,
rust::Str notifyToken);
SessionItem getSessionItem(rust::Str sessionID);
+void updateSessionItemIsOnline(rust::Str sessionID, bool isOnline);
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
@@ -156,3 +156,8 @@
.deviceOS = sessionItem->getDeviceOs(),
.isOnline = sessionItem->getIsOnline()};
}
+
+void updateSessionItemIsOnline(rust::Str sessionID, bool isOnline) {
+ comm::network::database::DatabaseManager::getInstance()
+ .updateSessionItemIsOnline(std::string{sessionID}, isOnline);
+}
diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs
--- a/services/tunnelbroker/src/server/mod.rs
+++ b/services/tunnelbroker/src/server/mod.rs
@@ -1,13 +1,16 @@
use super::constants;
use super::cxx_bridge::ffi::{
getSessionItem, newSessionHandler, sessionSignatureHandler,
+ updateSessionItemIsOnline,
};
use anyhow::Result;
use futures::Stream;
use std::pin::Pin;
use tokio::sync::mpsc;
+use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status, Streaming};
+use tracing::debug;
use tunnelbroker::tunnelbroker_service_server::{
TunnelbrokerService, TunnelbrokerServiceServer,
};
@@ -95,7 +98,54 @@
Err(err) => return Err(Status::unauthenticated(err.what())),
};
- let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE);
+ let (tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE);
+
+ // Through this function, we will write to the output stream from different Tokio
+ // tasks and update the device's online status if the write was unsuccessful
+ async fn tx_writer<T>(
+ session_id: &str,
+ channel: &tokio::sync::mpsc::Sender<T>,
+ payload: T,
+ ) -> Result<(), String> {
+ let result = channel.send(payload).await;
+ match result {
+ Ok(result) => Ok(result),
+ Err(err) => {
+ drop(channel);
+ if let Err(err) = updateSessionItemIsOnline(&session_id, false) {
+ return Err(err.what().to_string());
+ }
+ return Err(err.to_string());
+ }
+ }
+ }
+
+ if let Err(err) = updateSessionItemIsOnline(&session_id, true) {
+ return Err(Status::internal(err.what()));
+ }
+
+ // Spawning asynchronous Tokio task with the client pinging loop inside to
+ // make sure that the client is online
+ tokio::spawn({
+ let session_id = session_id.clone();
+ let tx = tx.clone();
+ async move {
+ loop {
+ sleep(Duration::from_millis(constants::GRPC_PING_INTERVAL_MS)).await;
+ let result = tx_writer(
+ &session_id,
+ &tx,
+ Ok(tunnelbroker::MessageToClient {
+ data: Some(tunnelbroker::message_to_client::Data::Ping(())),
+ }),
+ );
+ if let Err(err) = result.await {
+ debug!("Failed to write ping to a channel: {}", err);
+ break;
+ };
+ }
+ }
+ });
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Nov 26, 5:35 AM (5 h, 7 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2580769
Default Alt Text
D5529.id18231.diff (3 KB)
Attached To
Mode
D5529: [services] Tunnelbroker - Adding the pinging loop and online status of the client check
Attached
Detach File
Event Timeline
Log In to Comment