diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -31,6 +31,15 @@ sessionID: String, grpcStatus: GrpcResult, } + struct SessionItem { + deviceID: String, + publicKey: String, + notifyToken: String, + deviceType: i32, + appVersion: String, + deviceOS: String, + isOnline: bool, + } unsafe extern "C++" { include!("tunnelbroker/src/libcpp/Tunnelbroker.h"); @@ -47,5 +56,6 @@ deviceOS: &str, notifyToken: &str, ) -> NewSessionResult; + pub fn getSessionItem(sessionID: &str) -> Result; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -15,3 +15,4 @@ rust::Str deviceAppVersion, rust::Str deviceOS, rust::Str notifyToken); +SessionItem getSessionItem(rust::Str sessionID); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -148,3 +148,25 @@ .sessionID = newSessionID, .grpcStatus = {.statusCode = GRPCStatusCodes::Ok}}; } + +SessionItem getSessionItem(rust::Str sessionID) { + const std::string stringSessionID = std::string{sessionID}; + if (!comm::network::tools::validateSessionID(stringSessionID)) { + throw std::invalid_argument("Invalid format for 'sessionID'"); + } + std::shared_ptr sessionItem = + comm::network::database::DatabaseManager::getInstance().findSessionItem( + stringSessionID); + if (sessionItem == nullptr) { + throw std::invalid_argument( + "No sessions found for 'sessionID': " + stringSessionID); + } + return SessionItem{ + .deviceID = sessionItem->getDeviceID(), + .publicKey = sessionItem->getPubKey(), + .notifyToken = sessionItem->getNotifyToken(), + .deviceType = static_cast(sessionItem->getDeviceType()), + .appVersion = sessionItem->getAppVersion(), + .deviceOS = sessionItem->getDeviceOs(), + .isOnline = sessionItem->getIsOnline()}; +} diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,12 +1,13 @@ use super::constants; use super::cxx_bridge::ffi::{ - newSessionHandler, sessionSignatureHandler, GRPCStatusCodes, + getSessionItem, newSessionHandler, sessionSignatureHandler, GRPCStatusCodes, }; use anyhow::Result; use futures::Stream; use std::pin::Pin; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status, Streaming}; use tunnelbroker::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; @@ -76,11 +77,33 @@ dyn Stream> + Send, >, >; + async fn messages_stream( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let session_id = match request.metadata().get("sessionID") { + Some(metadata_session_id) => metadata_session_id + .to_str() + .expect("metadata session id was not valid UTF8") + .to_string(), + None => { + return Err(Status::invalid_argument( + "No 'sessionID' in metadata was provided", + )) + } + }; + let _session_item = match getSessionItem(&session_id) { + Ok(database_item) => database_item, + Err(err) => return Err(Status::unauthenticated(err.what())), + }; + + let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::MessagesStreamStream + )) } // These empty old API handlers are deprecated and should be removed.