diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -31,6 +31,15 @@ sessionID: String, grpcStatus: GrpcResult, } + struct SessionItem { + deviceID: String, + publicKey: String, + notifyToken: String, + deviceType: i32, + appVersion: String, + deviceOS: String, + isOnline: bool, + } unsafe extern "C++" { include!("tunnelbroker/src/libcpp/Tunnelbroker.h"); @@ -47,15 +56,6 @@ deviceOS: &str, notifyToken: &str, ) -> NewSessionResult; - pub fn sessionSignatureHandler(deviceID: &str) -> SessionSignatureResult; - pub fn newSessionHandler( - deviceID: &str, - publicKey: &str, - signature: &str, - deviceType: i32, - deviceAppVersion: &str, - deviceOS: &str, - notifyToken: &str, - ) -> NewSessionResult; + pub fn getSessionItem(sessionID: &str) -> Result; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -24,3 +24,4 @@ rust::Str deviceAppVersion, rust::Str deviceOS, rust::Str notifyToken); +SessionItem getSessionItem(rust::Str sessionID); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -149,77 +149,24 @@ .grpcStatus = {.statusCode = GRPCStatusCodes::Ok}}; } -NewSessionResult newSessionHandler( - rust::Str deviceID, - rust::Str publicKey, - rust::Str signature, - int32_t deviceType, - rust::Str deviceAppVersion, - rust::Str deviceOS, - rust::Str notifyToken) { - std::shared_ptr deviceSessionItem; - std::shared_ptr sessionSignItem; - std::shared_ptr publicKeyItem; - const std::string stringDeviceID{deviceID}; - if (!comm::network::tools::validateDeviceID(stringDeviceID)) { - return NewSessionResult{ - .grpcStatus = { - .statusCode = 3, - .errorText = "Format validation failed for deviceID"}}; +SessionItem getSessionItem(rust::Str sessionID) { + const std::string stringSessionID = std::string{sessionID}; + if (!comm::network::tools::validateSessionID(stringSessionID)) { + throw std::invalid_argument("Invalid format for 'sessionID'"); } - const std::string stringPublicKey{publicKey}; - const std::string newSessionID = comm::network::tools::generateUUID(); - try { - sessionSignItem = comm::network::database::DatabaseManager::getInstance() - .findSessionSignItem(stringDeviceID); - if (sessionSignItem == nullptr) { - return NewSessionResult{ - .grpcStatus = { - .statusCode = 5, - .errorText = "Session signature request not found for deviceID"}}; - } - publicKeyItem = comm::network::database::DatabaseManager::getInstance() - .findPublicKeyItem(stringDeviceID); - if (publicKeyItem == nullptr) { - std::shared_ptr newPublicKeyItem = - std::make_shared( - stringDeviceID, stringPublicKey); - comm::network::database::DatabaseManager::getInstance().putPublicKeyItem( - *newPublicKeyItem); - } else if (stringPublicKey != publicKeyItem->getPublicKey()) { - return NewSessionResult{ - .grpcStatus = { - .statusCode = 7, - .errorText = "The public key doesn't match for deviceID"}}; - } - const std::string verificationMessage = sessionSignItem->getSign(); - if (!comm::network::crypto::rsaVerifyString( - stringPublicKey, verificationMessage, std::string{signature})) { - return NewSessionResult{ - .grpcStatus = { - .statusCode = 7, - .errorText = - "Signature for the verification message is not valid"}}; - } - comm::network::database::DatabaseManager::getInstance() - .removeSessionSignItem(stringDeviceID); - - deviceSessionItem = - std::make_shared( - newSessionID, - stringDeviceID, - stringPublicKey, - std::string{notifyToken}, - deviceType, - std::string{deviceAppVersion}, - std::string{deviceOS}); - comm::network::database::DatabaseManager::getInstance().putSessionItem( - *deviceSessionItem); - } catch (std::runtime_error &e) { - LOG(ERROR) << "gRPC: " - << "Error while processing 'NewSession' request: " << e.what(); - return NewSessionResult{ - .grpcStatus = {.statusCode = 13, .errorText = e.what()}}; + std::shared_ptr sessionItem = + comm::network::database::DatabaseManager::getInstance().findSessionItem( + stringSessionID); + if (sessionItem == nullptr) { + throw std::invalid_argument( + "No sessions found for 'sessionID': " + stringSessionID); } - return NewSessionResult{.sessionID = newSessionID}; + return SessionItem{ + .deviceID = sessionItem->getDeviceID(), + .publicKey = sessionItem->getPubKey(), + .notifyToken = sessionItem->getNotifyToken(), + .deviceType = static_cast(sessionItem->getDeviceType()), + .appVersion = sessionItem->getAppVersion(), + .deviceOS = sessionItem->getDeviceOs(), + .isOnline = sessionItem->getIsOnline()}; } diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,12 +1,13 @@ use super::constants; use super::cxx_bridge::ffi::{ - newSessionHandler, sessionSignatureHandler, GRPCStatusCodes, + getSessionItem, newSessionHandler, sessionSignatureHandler, GRPCStatusCodes, }; use anyhow::Result; use futures::Stream; use std::pin::Pin; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status, Streaming}; use tunnelbroker::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; @@ -76,11 +77,33 @@ dyn Stream> + Send, >, >; + async fn messages_stream( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let session_id = match request.metadata().get("sessionID") { + Some(metadata_session_id) => metadata_session_id + .to_str() + .expect("metadata session id was not valid UTF8") + .to_string(), + None => { + return Err(Status::invalid_argument( + "No 'sessionID' in metadata was provided", + )) + } + }; + let _session_item = match getSessionItem(&session_id) { + Ok(database_item) => database_item, + Err(err) => return Err(Status::unauthenticated(err.what())), + }; + + let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::MessagesStreamStream + )) } // These empty old API handlers are deprecated and should be removed.