Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3525665
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs
index d1ddad7b7..b87ebd85d 100644
--- a/services/tunnelbroker/src/cxx_bridge.rs
+++ b/services/tunnelbroker/src/cxx_bridge.rs
@@ -1,51 +1,61 @@
#[cxx::bridge]
pub mod ffi {
enum GRPCStatusCodes {
Ok,
Cancelled,
Unknown,
InvalidArgument,
DeadlineExceeded,
NotFound,
AlreadyExists,
PermissionDenied,
ResourceExhausted,
FailedPrecondition,
Aborted,
OutOfRange,
Unimplemented,
Internal,
Unavailable,
DataLoss,
Unauthenticated,
}
struct GrpcResult {
statusCode: GRPCStatusCodes,
errorText: String,
}
struct SessionSignatureResult {
toSign: String,
grpcStatus: GrpcResult,
}
struct NewSessionResult {
sessionID: String,
grpcStatus: GrpcResult,
}
+ struct SessionItem {
+ deviceID: String,
+ publicKey: String,
+ notifyToken: String,
+ deviceType: i32,
+ appVersion: String,
+ deviceOS: String,
+ isOnline: bool,
+ }
unsafe extern "C++" {
include!("tunnelbroker/src/libcpp/Tunnelbroker.h");
pub fn initialize();
pub fn getConfigParameter(parameter: &str) -> Result<String>;
pub fn isSandbox() -> Result<bool>;
pub fn sessionSignatureHandler(deviceID: &str) -> SessionSignatureResult;
pub fn newSessionHandler(
deviceID: &str,
publicKey: &str,
signature: &str,
deviceType: i32,
deviceAppVersion: &str,
deviceOS: &str,
notifyToken: &str,
) -> NewSessionResult;
+ pub fn getSessionItem(sessionID: &str) -> Result<SessionItem>;
}
}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
index bebaf5e96..6be2f9ba7 100644
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
@@ -1,150 +1,172 @@
#include "Tunnelbroker.h"
#include "AmqpManager.h"
#include "AwsTools.h"
#include "ConfigManager.h"
#include "CryptoTools.h"
#include "DatabaseManager.h"
#include "GlobalTools.h"
#include "Tools.h"
#include "rust/cxx.h"
#include "tunnelbroker/src/cxx_bridge.rs.h"
#include <glog/logging.h>
void initialize() {
comm::network::tools::InitLogging("tunnelbroker");
comm::network::config::ConfigManager::getInstance().load();
Aws::InitAPI({});
// List of AWS DynamoDB tables to check if they are created and can be
// accessed before any AWS API methods
const std::list<std::string> tablesList = {
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE),
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::
OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE),
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::
OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE),
comm::network::config::ConfigManager::getInstance().getParameter(
comm::network::config::ConfigManager::
OPTION_DYNAMODB_MESSAGES_TABLE)};
for (const std::string &table : tablesList) {
if (!comm::network::database::DatabaseManager::getInstance()
.isTableAvailable(table)) {
throw std::runtime_error(
"Error: AWS DynamoDB table '" + table + "' is not available");
}
};
comm::network::AmqpManager::getInstance().init();
}
rust::String getConfigParameter(rust::Str parameter) {
return rust::String{
comm::network::config::ConfigManager::getInstance().getParameter(
std::string{parameter})};
}
bool isSandbox() {
return comm::network::tools::isSandbox();
}
SessionSignatureResult sessionSignatureHandler(rust::Str deviceID) {
const std::string requestedDeviceID(deviceID);
if (!comm::network::tools::validateDeviceID(requestedDeviceID)) {
return SessionSignatureResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::InvalidArgument,
.errorText =
"Format validation failed for deviceID: " + requestedDeviceID}};
}
const std::string toSign = comm::network::tools::generateRandomString(
comm::network::SIGNATURE_REQUEST_LENGTH);
std::shared_ptr<comm::network::database::SessionSignItem> SessionSignItem =
std::make_shared<comm::network::database::SessionSignItem>(
toSign, requestedDeviceID);
comm::network::database::DatabaseManager::getInstance().putSessionSignItem(
*SessionSignItem);
return SessionSignatureResult{
.toSign = toSign, .grpcStatus = {.statusCode = GRPCStatusCodes::Ok}};
}
NewSessionResult newSessionHandler(
rust::Str deviceID,
rust::Str publicKey,
rust::Str signature,
int32_t deviceType,
rust::Str deviceAppVersion,
rust::Str deviceOS,
rust::Str notifyToken) {
std::shared_ptr<comm::network::database::DeviceSessionItem> deviceSessionItem;
std::shared_ptr<comm::network::database::SessionSignItem> sessionSignItem;
std::shared_ptr<comm::network::database::PublicKeyItem> publicKeyItem;
const std::string stringDeviceID{deviceID};
if (!comm::network::tools::validateDeviceID(stringDeviceID)) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::InvalidArgument,
.errorText = "Format validation failed for deviceID"}};
}
const std::string stringPublicKey{publicKey};
const std::string newSessionID = comm::network::tools::generateUUID();
try {
sessionSignItem = comm::network::database::DatabaseManager::getInstance()
.findSessionSignItem(stringDeviceID);
if (sessionSignItem == nullptr) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::NotFound,
.errorText = "Session signature request not found for deviceID"}};
}
publicKeyItem = comm::network::database::DatabaseManager::getInstance()
.findPublicKeyItem(stringDeviceID);
if (publicKeyItem == nullptr) {
std::shared_ptr<comm::network::database::PublicKeyItem> newPublicKeyItem =
std::make_shared<comm::network::database::PublicKeyItem>(
stringDeviceID, stringPublicKey);
comm::network::database::DatabaseManager::getInstance().putPublicKeyItem(
*newPublicKeyItem);
} else if (stringPublicKey != publicKeyItem->getPublicKey()) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::PermissionDenied,
.errorText = "The public key doesn't match for deviceID"}};
}
const std::string verificationMessage = sessionSignItem->getSign();
if (!comm::network::crypto::rsaVerifyString(
stringPublicKey, verificationMessage, std::string{signature})) {
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::PermissionDenied,
.errorText =
"Signature for the verification message is not valid"}};
}
comm::network::database::DatabaseManager::getInstance()
.removeSessionSignItem(stringDeviceID);
deviceSessionItem =
std::make_shared<comm::network::database::DeviceSessionItem>(
newSessionID,
stringDeviceID,
stringPublicKey,
std::string{notifyToken},
deviceType,
std::string{deviceAppVersion},
std::string{deviceOS});
comm::network::database::DatabaseManager::getInstance().putSessionItem(
*deviceSessionItem);
} catch (std::runtime_error &e) {
LOG(ERROR) << "gRPC: "
<< "Error while processing 'NewSession' request: " << e.what();
return NewSessionResult{
.grpcStatus = {
.statusCode = GRPCStatusCodes::Internal, .errorText = e.what()}};
}
return NewSessionResult{
.sessionID = newSessionID,
.grpcStatus = {.statusCode = GRPCStatusCodes::Ok}};
}
+
+SessionItem getSessionItem(rust::Str sessionID) {
+ const std::string stringSessionID = std::string{sessionID};
+ if (!comm::network::tools::validateSessionID(stringSessionID)) {
+ throw std::invalid_argument("Invalid format for 'sessionID'");
+ }
+ std::shared_ptr<comm::network::database::DeviceSessionItem> sessionItem =
+ comm::network::database::DatabaseManager::getInstance().findSessionItem(
+ stringSessionID);
+ if (sessionItem == nullptr) {
+ throw std::invalid_argument(
+ "No sessions found for 'sessionID': " + stringSessionID);
+ }
+ return SessionItem{
+ .deviceID = sessionItem->getDeviceID(),
+ .publicKey = sessionItem->getPubKey(),
+ .notifyToken = sessionItem->getNotifyToken(),
+ .deviceType = static_cast<int>(sessionItem->getDeviceType()),
+ .appVersion = sessionItem->getAppVersion(),
+ .deviceOS = sessionItem->getDeviceOs(),
+ .isOnline = sessionItem->getIsOnline()};
+}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
index a07f66fc8..40ddfaaf1 100644
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
@@ -1,17 +1,18 @@
#pragma once
#include "rust/cxx.h"
#include "tunnelbroker/src/cxx_bridge.rs.h"
void initialize();
rust::String getConfigParameter(rust::Str parameter);
bool isSandbox();
SessionSignatureResult sessionSignatureHandler(rust::Str deviceID);
NewSessionResult newSessionHandler(
rust::Str deviceID,
rust::Str publicKey,
rust::Str signature,
int32_t deviceType,
rust::Str deviceAppVersion,
rust::Str deviceOS,
rust::Str notifyToken);
+SessionItem getSessionItem(rust::Str sessionID);
diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs
index cdf267d7c..1f01e4729 100644
--- a/services/tunnelbroker/src/server/mod.rs
+++ b/services/tunnelbroker/src/server/mod.rs
@@ -1,136 +1,159 @@
use super::constants;
use super::cxx_bridge::ffi::{
- newSessionHandler, sessionSignatureHandler, GRPCStatusCodes,
+ getSessionItem, newSessionHandler, sessionSignatureHandler, GRPCStatusCodes,
};
use anyhow::Result;
use futures::Stream;
use std::pin::Pin;
-use tonic::transport::Server;
-use tonic::{Request, Response, Status, Streaming};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::{transport::Server, Request, Response, Status, Streaming};
use tunnelbroker::tunnelbroker_service_server::{
TunnelbrokerService, TunnelbrokerServiceServer,
};
mod tools;
mod tunnelbroker {
tonic::include_proto!("tunnelbroker");
}
#[derive(Debug, Default)]
struct TunnelbrokerServiceHandlers {}
#[tonic::async_trait]
impl TunnelbrokerService for TunnelbrokerServiceHandlers {
async fn session_signature(
&self,
request: Request<tunnelbroker::SessionSignatureRequest>,
) -> Result<Response<tunnelbroker::SessionSignatureResponse>, Status> {
let result = sessionSignatureHandler(&request.into_inner().device_id);
if result.grpcStatus.statusCode != GRPCStatusCodes::Ok {
return Err(tools::create_tonic_status(
result.grpcStatus.statusCode,
&result.grpcStatus.errorText,
));
}
Ok(Response::new(tunnelbroker::SessionSignatureResponse {
to_sign: result.toSign,
}))
}
async fn new_session(
&self,
request: Request<tunnelbroker::NewSessionRequest>,
) -> Result<Response<tunnelbroker::NewSessionResponse>, Status> {
let inner_request = request.into_inner();
let notify_token = inner_request.notify_token.unwrap_or(String::new());
if !tunnelbroker::new_session_request::DeviceTypes::is_valid(
inner_request.device_type,
) {
return Err(tools::create_tonic_status(
GRPCStatusCodes::InvalidArgument,
"Unsupported device type",
));
};
let result = newSessionHandler(
&inner_request.device_id,
&inner_request.public_key,
&inner_request.signature,
inner_request.device_type,
&inner_request.device_app_version,
&inner_request.device_os,
¬ify_token,
);
if result.grpcStatus.statusCode != GRPCStatusCodes::Ok {
return Err(tools::create_tonic_status(
result.grpcStatus.statusCode,
&result.grpcStatus.errorText,
));
}
Ok(Response::new(tunnelbroker::NewSessionResponse {
session_id: result.sessionID,
}))
}
type MessagesStreamStream = Pin<
Box<
dyn Stream<Item = Result<tunnelbroker::MessageToClient, Status>> + Send,
>,
>;
+
async fn messages_stream(
&self,
- _request: Request<Streaming<tunnelbroker::MessageToTunnelbroker>>,
+ request: Request<Streaming<tunnelbroker::MessageToTunnelbroker>>,
) -> Result<Response<Self::MessagesStreamStream>, Status> {
- Err(Status::unimplemented("Not implemented yet"))
+ let session_id = match request.metadata().get("sessionID") {
+ Some(metadata_session_id) => metadata_session_id
+ .to_str()
+ .expect("metadata session id was not valid UTF8")
+ .to_string(),
+ None => {
+ return Err(Status::invalid_argument(
+ "No 'sessionID' in metadata was provided",
+ ))
+ }
+ };
+ let _session_item = match getSessionItem(&session_id) {
+ Ok(database_item) => database_item,
+ Err(err) => return Err(Status::unauthenticated(err.what())),
+ };
+
+ let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE);
+
+ let output_stream = ReceiverStream::new(rx);
+ Ok(Response::new(
+ Box::pin(output_stream) as Self::MessagesStreamStream
+ ))
}
// These empty old API handlers are deprecated and should be removed.
// They are implemented only to fix the building process.
async fn check_if_primary_device_online(
&self,
_request: Request<tunnelbroker::CheckRequest>,
) -> Result<Response<tunnelbroker::CheckResponse>, Status> {
Err(Status::cancelled("Deprecated"))
}
async fn become_new_primary_device(
&self,
_request: Request<tunnelbroker::NewPrimaryRequest>,
) -> Result<Response<tunnelbroker::NewPrimaryResponse>, Status> {
Err(Status::cancelled("Deprecated"))
}
async fn send_pong(
&self,
_request: Request<tunnelbroker::PongRequest>,
) -> Result<Response<()>, Status> {
Err(Status::cancelled("Deprecated"))
}
async fn send(
&self,
_request: Request<tunnelbroker::SendRequest>,
) -> Result<Response<()>, Status> {
Err(Status::cancelled("Deprecated"))
}
type GetStream = Pin<
Box<dyn Stream<Item = Result<tunnelbroker::GetResponse, Status>> + Send>,
>;
async fn get(
&self,
_request: Request<tunnelbroker::GetRequest>,
) -> Result<Response<Self::GetStream>, Status> {
Err(Status::cancelled("Deprecated"))
}
}
pub async fn run_grpc_server() -> Result<()> {
let addr = format!("[::1]:{}", constants::GRPC_SERVER_PORT).parse()?;
Server::builder()
.add_service(TunnelbrokerServiceServer::new(
TunnelbrokerServiceHandlers::default(),
))
.serve(addr)
.await?;
Ok(())
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Dec 25, 6:05 PM (7 h, 19 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2700657
Default Alt Text
(15 KB)
Attached To
Mode
rCOMM Comm
Attached
Detach File
Event Timeline
Log In to Comment