diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -12,6 +12,15 @@ sessionID: String, grpcStatus: GrpcResult, } + struct SessionItem { + deviceID: String, + publicKey: String, + notifyToken: String, + deviceType: String, + appVersion: String, + deviceOS: String, + isOnline: bool, + } unsafe extern "C++" { include!("tunnelbroker/src/libcpp/Tunnelbroker.h"); @@ -26,5 +35,6 @@ deviceOS: &str, notifyToken: &str, ) -> NewSessionResult; + pub fn getSessionItem(sessionID: &str) -> Result; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -13,3 +13,4 @@ rust::Str deviceAppVersion, rust::Str deviceOS, rust::Str notifyToken); +SessionItem getSessionItem(rust::Str sessionID); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -134,3 +134,25 @@ } return NewSessionResult{.sessionID = newSessionID}; } + +SessionItem getSessionItem(rust::Str sessionID) { + const std::string stringSessionID = std::string{sessionID}; + if (!comm::network::tools::validateSessionID(stringSessionID)) { + throw std::invalid_argument("Invalid format for 'sessionID'"); + } + std::shared_ptr sessionItem = + comm::network::database::DatabaseManager::getInstance().findSessionItem( + stringSessionID); + if (sessionItem == nullptr) { + throw std::invalid_argument( + "No any sessions found for 'sessionID': " + stringSessionID); + } + return SessionItem{ + .deviceID = sessionItem->getDeviceID(), + .publicKey = sessionItem->getPubKey(), + .notifyToken = sessionItem->getNotifyToken(), + .deviceType = sessionItem->getDeviceType(), + .appVersion = sessionItem->getAppVersion(), + .deviceOS = sessionItem->getDeviceOs(), + .isOnline = sessionItem->getIsOnline()}; +} diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,8 +1,10 @@ +use super::constants; use super::cxx_bridge; use futures::Stream; use std::pin::Pin; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status, Streaming}; use tunnelbroker::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; @@ -75,11 +77,30 @@ dyn Stream> + Send, >, >; + async fn messages_stream( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let session_id: String; + if let Some(metadata_session_id) = request.metadata().get("sessionID") { + session_id = String::from(metadata_session_id.to_str().unwrap()); + } else { + return Err(Status::invalid_argument( + "No 'sessionID' in metadata was provided", + )); + } + + let session_item = cxx_bridge::ffi::getSessionItem(&session_id); + if let Err(err) = session_item { + return Err(Status::unauthenticated(err.what())); + } + let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::MessagesStreamStream + )) } // These empty old API handlers are deprecated and should be removed.