Page MenuHomePhabricator

No OneTemporary

diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs
index d1ddad7b7..b87ebd85d 100644
--- a/services/tunnelbroker/src/cxx_bridge.rs
+++ b/services/tunnelbroker/src/cxx_bridge.rs
@@ -1,51 +1,61 @@
#[cxx::bridge]
pub mod ffi {
enum GRPCStatusCodes {
Ok,
Cancelled,
Unknown,
InvalidArgument,
DeadlineExceeded,
NotFound,
AlreadyExists,
PermissionDenied,
ResourceExhausted,
FailedPrecondition,
Aborted,
OutOfRange,
Unimplemented,
Internal,
Unavailable,
DataLoss,
Unauthenticated,
}
struct GrpcResult {
statusCode: GRPCStatusCodes,
errorText: String,
}
struct SessionSignatureResult {
toSign: String,
grpcStatus: GrpcResult,
}
struct NewSessionResult {
sessionID: String,
grpcStatus: GrpcResult,
}
+ struct SessionItem {
+ deviceID: String,
+ publicKey: String,
+ notifyToken: String,
+ deviceType: i32,
+ appVersion: String,
+ deviceOS: String,
+ isOnline: bool,
+ }
unsafe extern "C++" {
include!("tunnelbroker/src/libcpp/Tunnelbroker.h");
pub fn initialize();
pub fn getConfigParameter(parameter: &str) -> Result<String>;
pub fn isSandbox() -> Result<bool>;
pub fn sessionSignatureHandler(deviceID: &str) -> SessionSignatureResult;
pub fn newSessionHandler(
deviceID: &str,
publicKey: &str,
signature: &str,
deviceType: i32,
deviceAppVersion: &str,
deviceOS: &str,
notifyToken: &str,
) -> NewSessionResult;
+ pub fn getSessionItem(sessionID: &str) -> Result<SessionItem>;
}
}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
index bebaf5e96..6be2f9ba7 100644
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
@@ -1,150 +1,172 @@
#include "Tunnelbroker.h"
#include "AmqpManager.h"
#include "AwsTools.h"
#include "ConfigManager.h"
#include "CryptoTools.h"
#include "DatabaseManager.h"
#include "GlobalTools.h"
#include "Tools.h"
#include "rust/cxx.h"
#include "tunnelbroker/src/cxx_bridge.rs.h"
#include <glog/logging.h>
void initialize() {
comm::network::tools::InitLogging("tunnelbroker");
comm::network::config::ConfigManager::getInstance().load();
Aws::InitAPI({});
// List of AWS DynamoDB tables to check if they are created and can be
// accessed before any AWS API methods
const std::list<std::string> tablesList = {
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE),
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::
OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE),
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::
OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE),
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::
OPTION_DYNAMODB_MESSAGES_TABLE)};
for (const std::string &table : tablesList) {
if (!comm::network::database::DatabaseManager::getInstance()
.isTableAvailable(table)) {
throw std::runtime_error(
"Error: AWS DynamoDB table '" + table + "' is not available");
}
};
comm::network::AmqpManager::getInstance().init();
}
rust::String getConfigParameter(rust::Str parameter) {
return rust::String{
comm::network::config::ConfigManager::getInstance().getParameter(
std::string{parameter})};
}
bool isSandbox() {
return comm::network::tools::isSandbox();
}
SessionSignatureResult sessionSignatureHandler(rust::Str deviceID) {
const std::string requestedDeviceID(deviceID);
if (!comm::network::tools::validateDeviceID(requestedDeviceID)) {
return SessionSignatureResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::InvalidArgument,
.errorText =
"Format validation failed for deviceID: " + requestedDeviceID}};
}
const std::string toSign = comm::network::tools::generateRandomString(
comm::network::SIGNATURE_REQUEST_LENGTH);
std::shared_ptr<comm::network::database::SessionSignItem> SessionSignItem =
std::make_shared<comm::network::database::SessionSignItem>(
toSign, requestedDeviceID);
comm::network::database::DatabaseManager::getInstance().putSessionSignItem(
*SessionSignItem);
return SessionSignatureResult{
.toSign = toSign, .grpcStatus = {.statusCode = GRPCStatusCodes::Ok}};
}
NewSessionResult newSessionHandler(
rust::Str deviceID,
rust::Str publicKey,
rust::Str signature,
int32_t deviceType,
rust::Str deviceAppVersion,
rust::Str deviceOS,
rust::Str notifyToken) {
std::shared_ptr<comm::network::database::DeviceSessionItem> deviceSessionItem;
std::shared_ptr<comm::network::database::SessionSignItem> sessionSignItem;
std::shared_ptr<comm::network::database::PublicKeyItem> publicKeyItem;
const std::string stringDeviceID{deviceID};
if (!comm::network::tools::validateDeviceID(stringDeviceID)) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::InvalidArgument,
.errorText = "Format validation failed for deviceID"}};
}
const std::string stringPublicKey{publicKey};
const std::string newSessionID = comm::network::tools::generateUUID();
try {
sessionSignItem = comm::network::database::DatabaseManager::getInstance()
.findSessionSignItem(stringDeviceID);
if (sessionSignItem == nullptr) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::NotFound,
.errorText = "Session signature request not found for deviceID"}};
}
publicKeyItem = comm::network::database::DatabaseManager::getInstance()
.findPublicKeyItem(stringDeviceID);
if (publicKeyItem == nullptr) {
std::shared_ptr<comm::network::database::PublicKeyItem> newPublicKeyItem =
std::make_shared<comm::network::database::PublicKeyItem>(
stringDeviceID, stringPublicKey);
comm::network::database::DatabaseManager::getInstance().putPublicKeyItem(
*newPublicKeyItem);
} else if (stringPublicKey != publicKeyItem->getPublicKey()) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::PermissionDenied,
.errorText = "The public key doesn't match for deviceID"}};
}
const std::string verificationMessage = sessionSignItem->getSign();
if (!comm::network::crypto::rsaVerifyString(
stringPublicKey, verificationMessage, std::string{signature})) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::PermissionDenied,
.errorText =
"Signature for the verification message is not valid"}};
}
comm::network::database::DatabaseManager::getInstance()
.removeSessionSignItem(stringDeviceID);
deviceSessionItem =
std::make_shared<comm::network::database::DeviceSessionItem>(
newSessionID,
stringDeviceID,
stringPublicKey,
std::string{notifyToken},
deviceType,
std::string{deviceAppVersion},
std::string{deviceOS});
comm::network::database::DatabaseManager::getInstance().putSessionItem(
*deviceSessionItem);
} catch (std::runtime_error &e) {
LOG(ERROR) << "gRPC: "
<< "Error while processing 'NewSession' request: " << e.what();
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::Internal, .errorText = e.what()}};
}
return NewSessionResult{
.sessionID = newSessionID,
.grpcStatus = {.statusCode = GRPCStatusCodes::Ok}};
}
+
+SessionItem getSessionItem(rust::Str sessionID) {
+ const std::string stringSessionID = std::string{sessionID};
+ if (!comm::network::tools::validateSessionID(stringSessionID)) {
+ throw std::invalid_argument("Invalid format for 'sessionID'");
+ }
+ std::shared_ptr<comm::network::database::DeviceSessionItem> sessionItem =
+ comm::network::database::DatabaseManager::getInstance().findSessionItem(
+ stringSessionID);
+ if (sessionItem == nullptr) {
+ throw std::invalid_argument(
+ "No sessions found for 'sessionID': " + stringSessionID);
+ }
+ return SessionItem{
+ .deviceID = sessionItem->getDeviceID(),
+ .publicKey = sessionItem->getPubKey(),
+ .notifyToken = sessionItem->getNotifyToken(),
+ .deviceType = static_cast<int>(sessionItem->getDeviceType()),
+ .appVersion = sessionItem->getAppVersion(),
+ .deviceOS = sessionItem->getDeviceOs(),
+ .isOnline = sessionItem->getIsOnline()};
+}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
index a07f66fc8..40ddfaaf1 100644
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
@@ -1,17 +1,18 @@
#pragma once
#include "rust/cxx.h"
#include "tunnelbroker/src/cxx_bridge.rs.h"
void initialize();
rust::String getConfigParameter(rust::Str parameter);
bool isSandbox();
SessionSignatureResult sessionSignatureHandler(rust::Str deviceID);
NewSessionResult newSessionHandler(
rust::Str deviceID,
rust::Str publicKey,
rust::Str signature,
int32_t deviceType,
rust::Str deviceAppVersion,
rust::Str deviceOS,
rust::Str notifyToken);
+SessionItem getSessionItem(rust::Str sessionID);
diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs
index cdf267d7c..1f01e4729 100644
--- a/services/tunnelbroker/src/server/mod.rs
+++ b/services/tunnelbroker/src/server/mod.rs
@@ -1,136 +1,159 @@
use super::constants;
use super::cxx_bridge::ffi::{
- newSessionHandler, sessionSignatureHandler, GRPCStatusCodes,
+ getSessionItem, newSessionHandler, sessionSignatureHandler, GRPCStatusCodes,
};
use anyhow::Result;
use futures::Stream;
use std::pin::Pin;
-use tonic::transport::Server;
-use tonic::{Request, Response, Status, Streaming};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::{transport::Server, Request, Response, Status, Streaming};
use tunnelbroker::tunnelbroker_service_server::{
TunnelbrokerService, TunnelbrokerServiceServer,
};
mod tools;
mod tunnelbroker {
tonic::include_proto!("tunnelbroker");
}
#[derive(Debug, Default)]
struct TunnelbrokerServiceHandlers {}
#[tonic::async_trait]
impl TunnelbrokerService for TunnelbrokerServiceHandlers {
async fn session_signature(
&self,
request: Request<tunnelbroker::SessionSignatureRequest>,
) -> Result<Response<tunnelbroker::SessionSignatureResponse>, Status> {
let result = sessionSignatureHandler(&request.into_inner().device_id);
if result.grpcStatus.statusCode != GRPCStatusCodes::Ok {
return Err(tools::create_tonic_status(
result.grpcStatus.statusCode,
&result.grpcStatus.errorText,
));
}
Ok(Response::new(tunnelbroker::SessionSignatureResponse {
to_sign: result.toSign,
}))
}
async fn new_session(
&self,
request: Request<tunnelbroker::NewSessionRequest>,
) -> Result<Response<tunnelbroker::NewSessionResponse>, Status> {
let inner_request = request.into_inner();
let notify_token = inner_request.notify_token.unwrap_or(String::new());
if !tunnelbroker::new_session_request::DeviceTypes::is_valid(
inner_request.device_type,
) {
return Err(tools::create_tonic_status(
GRPCStatusCodes::InvalidArgument,
"Unsupported device type",
));
};
let result = newSessionHandler(
&inner_request.device_id,
&inner_request.public_key,
&inner_request.signature,
inner_request.device_type,
&inner_request.device_app_version,
&inner_request.device_os,
&notify_token,
);
if result.grpcStatus.statusCode != GRPCStatusCodes::Ok {
return Err(tools::create_tonic_status(
result.grpcStatus.statusCode,
&result.grpcStatus.errorText,
));
}
Ok(Response::new(tunnelbroker::NewSessionResponse {
session_id: result.sessionID,
}))
}
type MessagesStreamStream = Pin<
Box<
dyn Stream<Item = Result<tunnelbroker::MessageToClient, Status>> + Send,
>,
>;
+
async fn messages_stream(
&self,
- _request: Request<Streaming<tunnelbroker::MessageToTunnelbroker>>,
+ request: Request<Streaming<tunnelbroker::MessageToTunnelbroker>>,
) -> Result<Response<Self::MessagesStreamStream>, Status> {
- Err(Status::unimplemented("Not implemented yet"))
+ let session_id = match request.metadata().get("sessionID") {
+ Some(metadata_session_id) => metadata_session_id
+ .to_str()
+ .expect("metadata session id was not valid UTF8")
+ .to_string(),
+ None => {
+ return Err(Status::invalid_argument(
+ "No 'sessionID' in metadata was provided",
+ ))
+ }
+ };
+ let _session_item = match getSessionItem(&session_id) {
+ Ok(database_item) => database_item,
+ Err(err) => return Err(Status::unauthenticated(err.what())),
+ };
+
+ let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE);
+
+ let output_stream = ReceiverStream::new(rx);
+ Ok(Response::new(
+ Box::pin(output_stream) as Self::MessagesStreamStream
+ ))
}
// These empty old API handlers are deprecated and should be removed.
// They are implemented only to fix the building process.
async fn check_if_primary_device_online(
&self,
_request: Request<tunnelbroker::CheckRequest>,
) -> Result<Response<tunnelbroker::CheckResponse>, Status> {
Err(Status::cancelled("Deprecated"))
}
async fn become_new_primary_device(
&self,
_request: Request<tunnelbroker::NewPrimaryRequest>,
) -> Result<Response<tunnelbroker::NewPrimaryResponse>, Status> {
Err(Status::cancelled("Deprecated"))
}
async fn send_pong(
&self,
_request: Request<tunnelbroker::PongRequest>,
) -> Result<Response<()>, Status> {
Err(Status::cancelled("Deprecated"))
}
async fn send(
&self,
_request: Request<tunnelbroker::SendRequest>,
) -> Result<Response<()>, Status> {
Err(Status::cancelled("Deprecated"))
}
type GetStream = Pin<
Box<dyn Stream<Item = Result<tunnelbroker::GetResponse, Status>> + Send>,
>;
async fn get(
&self,
_request: Request<tunnelbroker::GetRequest>,
) -> Result<Response<Self::GetStream>, Status> {
Err(Status::cancelled("Deprecated"))
}
}
pub async fn run_grpc_server() -> Result<()> {
let addr = format!("[::1]:{}", constants::GRPC_SERVER_PORT).parse()?;
Server::builder()
.add_service(TunnelbrokerServiceServer::new(
TunnelbrokerServiceHandlers::default(),
))
.serve(addr)
.await?;
Ok(())
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Dec 23, 7:10 AM (1 d, 6 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2690534
Default Alt Text
(15 KB)

Event Timeline