diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs index d1ddad7b7..b87ebd85d 100644 --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -1,51 +1,61 @@ #[cxx::bridge] pub mod ffi { enum GRPCStatusCodes { Ok, Cancelled, Unknown, InvalidArgument, DeadlineExceeded, NotFound, AlreadyExists, PermissionDenied, ResourceExhausted, FailedPrecondition, Aborted, OutOfRange, Unimplemented, Internal, Unavailable, DataLoss, Unauthenticated, } struct GrpcResult { statusCode: GRPCStatusCodes, errorText: String, } struct SessionSignatureResult { toSign: String, grpcStatus: GrpcResult, } struct NewSessionResult { sessionID: String, grpcStatus: GrpcResult, } + struct SessionItem { + deviceID: String, + publicKey: String, + notifyToken: String, + deviceType: i32, + appVersion: String, + deviceOS: String, + isOnline: bool, + } unsafe extern "C++" { include!("tunnelbroker/src/libcpp/Tunnelbroker.h"); pub fn initialize(); pub fn getConfigParameter(parameter: &str) -> Result; pub fn isSandbox() -> Result; pub fn sessionSignatureHandler(deviceID: &str) -> SessionSignatureResult; pub fn newSessionHandler( deviceID: &str, publicKey: &str, signature: &str, deviceType: i32, deviceAppVersion: &str, deviceOS: &str, notifyToken: &str, ) -> NewSessionResult; + pub fn getSessionItem(sessionID: &str) -> Result; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp index bebaf5e96..6be2f9ba7 100644 --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -1,150 +1,172 @@ #include "Tunnelbroker.h" #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include "rust/cxx.h" #include "tunnelbroker/src/cxx_bridge.rs.h" #include void initialize() { comm::network::tools::InitLogging("tunnelbroker"); comm::network::config::ConfigManager::getInstance().load(); Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { comm::network::config::ConfigManager::getInstance().getParameter( comm::network::config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), comm::network::config::ConfigManager::getInstance().getParameter( comm::network::config::ConfigManager:: OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), comm::network::config::ConfigManager::getInstance().getParameter( comm::network::config::ConfigManager:: OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE), comm::network::config::ConfigManager::getInstance().getParameter( comm::network::config::ConfigManager:: OPTION_DYNAMODB_MESSAGES_TABLE)}; for (const std::string &table : tablesList) { if (!comm::network::database::DatabaseManager::getInstance() .isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; comm::network::AmqpManager::getInstance().init(); } rust::String getConfigParameter(rust::Str parameter) { return rust::String{ comm::network::config::ConfigManager::getInstance().getParameter( std::string{parameter})}; } bool isSandbox() { return comm::network::tools::isSandbox(); } SessionSignatureResult sessionSignatureHandler(rust::Str deviceID) { const std::string requestedDeviceID(deviceID); if (!comm::network::tools::validateDeviceID(requestedDeviceID)) { return SessionSignatureResult{ .grpcStatus = { .statusCode = GRPCStatusCodes::InvalidArgument, .errorText = "Format validation failed for deviceID: " + requestedDeviceID}}; } const std::string toSign = comm::network::tools::generateRandomString( comm::network::SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared( toSign, requestedDeviceID); comm::network::database::DatabaseManager::getInstance().putSessionSignItem( *SessionSignItem); return SessionSignatureResult{ .toSign = toSign, .grpcStatus = {.statusCode = GRPCStatusCodes::Ok}}; } NewSessionResult newSessionHandler( rust::Str deviceID, rust::Str publicKey, rust::Str signature, int32_t deviceType, rust::Str deviceAppVersion, rust::Str deviceOS, rust::Str notifyToken) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string stringDeviceID{deviceID}; if (!comm::network::tools::validateDeviceID(stringDeviceID)) { return NewSessionResult{ .grpcStatus = { .statusCode = GRPCStatusCodes::InvalidArgument, .errorText = "Format validation failed for deviceID"}}; } const std::string stringPublicKey{publicKey}; const std::string newSessionID = comm::network::tools::generateUUID(); try { sessionSignItem = comm::network::database::DatabaseManager::getInstance() .findSessionSignItem(stringDeviceID); if (sessionSignItem == nullptr) { return NewSessionResult{ .grpcStatus = { .statusCode = GRPCStatusCodes::NotFound, .errorText = "Session signature request not found for deviceID"}}; } publicKeyItem = comm::network::database::DatabaseManager::getInstance() .findPublicKeyItem(stringDeviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared( stringDeviceID, stringPublicKey); comm::network::database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (stringPublicKey != publicKeyItem->getPublicKey()) { return NewSessionResult{ .grpcStatus = { .statusCode = GRPCStatusCodes::PermissionDenied, .errorText = "The public key doesn't match for deviceID"}}; } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( stringPublicKey, verificationMessage, std::string{signature})) { return NewSessionResult{ .grpcStatus = { .statusCode = GRPCStatusCodes::PermissionDenied, .errorText = "Signature for the verification message is not valid"}}; } comm::network::database::DatabaseManager::getInstance() .removeSessionSignItem(stringDeviceID); deviceSessionItem = std::make_shared( newSessionID, stringDeviceID, stringPublicKey, std::string{notifyToken}, deviceType, std::string{deviceAppVersion}, std::string{deviceOS}); comm::network::database::DatabaseManager::getInstance().putSessionItem( *deviceSessionItem); } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'NewSession' request: " << e.what(); return NewSessionResult{ .grpcStatus = { .statusCode = GRPCStatusCodes::Internal, .errorText = e.what()}}; } return NewSessionResult{ .sessionID = newSessionID, .grpcStatus = {.statusCode = GRPCStatusCodes::Ok}}; } + +SessionItem getSessionItem(rust::Str sessionID) { + const std::string stringSessionID = std::string{sessionID}; + if (!comm::network::tools::validateSessionID(stringSessionID)) { + throw std::invalid_argument("Invalid format for 'sessionID'"); + } + std::shared_ptr sessionItem = + comm::network::database::DatabaseManager::getInstance().findSessionItem( + stringSessionID); + if (sessionItem == nullptr) { + throw std::invalid_argument( + "No sessions found for 'sessionID': " + stringSessionID); + } + return SessionItem{ + .deviceID = sessionItem->getDeviceID(), + .publicKey = sessionItem->getPubKey(), + .notifyToken = sessionItem->getNotifyToken(), + .deviceType = static_cast(sessionItem->getDeviceType()), + .appVersion = sessionItem->getAppVersion(), + .deviceOS = sessionItem->getDeviceOs(), + .isOnline = sessionItem->getIsOnline()}; +} diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h index a07f66fc8..40ddfaaf1 100644 --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -1,17 +1,18 @@ #pragma once #include "rust/cxx.h" #include "tunnelbroker/src/cxx_bridge.rs.h" void initialize(); rust::String getConfigParameter(rust::Str parameter); bool isSandbox(); SessionSignatureResult sessionSignatureHandler(rust::Str deviceID); NewSessionResult newSessionHandler( rust::Str deviceID, rust::Str publicKey, rust::Str signature, int32_t deviceType, rust::Str deviceAppVersion, rust::Str deviceOS, rust::Str notifyToken); +SessionItem getSessionItem(rust::Str sessionID); diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs index cdf267d7c..1f01e4729 100644 --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,136 +1,159 @@ use super::constants; use super::cxx_bridge::ffi::{ - newSessionHandler, sessionSignatureHandler, GRPCStatusCodes, + getSessionItem, newSessionHandler, sessionSignatureHandler, GRPCStatusCodes, }; use anyhow::Result; use futures::Stream; use std::pin::Pin; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status, Streaming}; use tunnelbroker::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; mod tools; mod tunnelbroker { tonic::include_proto!("tunnelbroker"); } #[derive(Debug, Default)] struct TunnelbrokerServiceHandlers {} #[tonic::async_trait] impl TunnelbrokerService for TunnelbrokerServiceHandlers { async fn session_signature( &self, request: Request, ) -> Result, Status> { let result = sessionSignatureHandler(&request.into_inner().device_id); if result.grpcStatus.statusCode != GRPCStatusCodes::Ok { return Err(tools::create_tonic_status( result.grpcStatus.statusCode, &result.grpcStatus.errorText, )); } Ok(Response::new(tunnelbroker::SessionSignatureResponse { to_sign: result.toSign, })) } async fn new_session( &self, request: Request, ) -> Result, Status> { let inner_request = request.into_inner(); let notify_token = inner_request.notify_token.unwrap_or(String::new()); if !tunnelbroker::new_session_request::DeviceTypes::is_valid( inner_request.device_type, ) { return Err(tools::create_tonic_status( GRPCStatusCodes::InvalidArgument, "Unsupported device type", )); }; let result = newSessionHandler( &inner_request.device_id, &inner_request.public_key, &inner_request.signature, inner_request.device_type, &inner_request.device_app_version, &inner_request.device_os, ¬ify_token, ); if result.grpcStatus.statusCode != GRPCStatusCodes::Ok { return Err(tools::create_tonic_status( result.grpcStatus.statusCode, &result.grpcStatus.errorText, )); } Ok(Response::new(tunnelbroker::NewSessionResponse { session_id: result.sessionID, })) } type MessagesStreamStream = Pin< Box< dyn Stream> + Send, >, >; + async fn messages_stream( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let session_id = match request.metadata().get("sessionID") { + Some(metadata_session_id) => metadata_session_id + .to_str() + .expect("metadata session id was not valid UTF8") + .to_string(), + None => { + return Err(Status::invalid_argument( + "No 'sessionID' in metadata was provided", + )) + } + }; + let _session_item = match getSessionItem(&session_id) { + Ok(database_item) => database_item, + Err(err) => return Err(Status::unauthenticated(err.what())), + }; + + let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::MessagesStreamStream + )) } // These empty old API handlers are deprecated and should be removed. // They are implemented only to fix the building process. async fn check_if_primary_device_online( &self, _request: Request, ) -> Result, Status> { Err(Status::cancelled("Deprecated")) } async fn become_new_primary_device( &self, _request: Request, ) -> Result, Status> { Err(Status::cancelled("Deprecated")) } async fn send_pong( &self, _request: Request, ) -> Result, Status> { Err(Status::cancelled("Deprecated")) } async fn send( &self, _request: Request, ) -> Result, Status> { Err(Status::cancelled("Deprecated")) } type GetStream = Pin< Box> + Send>, >; async fn get( &self, _request: Request, ) -> Result, Status> { Err(Status::cancelled("Deprecated")) } } pub async fn run_grpc_server() -> Result<()> { let addr = format!("[::1]:{}", constants::GRPC_SERVER_PORT).parse()?; Server::builder() .add_service(TunnelbrokerServiceServer::new( TunnelbrokerServiceHandlers::default(), )) .serve(addr) .await?; Ok(()) }