Changeset View
Changeset View
Standalone View
Standalone View
services/tunnelbroker/src/grpc/mod.rs
mod proto { | mod proto { | ||||
tonic::include_proto!("tunnelbroker"); | tonic::include_proto!("tunnelbroker"); | ||||
} | } | ||||
use proto::tunnelbroker_service_server::{ | use proto::tunnelbroker_service_server::{ | ||||
TunnelbrokerService, TunnelbrokerServiceServer, | TunnelbrokerService, TunnelbrokerServiceServer, | ||||
}; | }; | ||||
use proto::Empty; | use proto::Empty; | ||||
use tonic::transport::Server; | use tonic::transport::Server; | ||||
use tonic::Status; | use tracing::{debug, error}; | ||||
use tracing::debug; | |||||
use crate::database::{handle_ddb_error, DatabaseClient}; | |||||
use crate::{constants, ACTIVE_CONNECTIONS, CONFIG}; | use crate::{constants, ACTIVE_CONNECTIONS, CONFIG}; | ||||
#[derive(Debug, Default)] | struct TunnelbrokerGRPC { | ||||
struct TunnelbrokerGRPC {} | client: DatabaseClient, | ||||
} | |||||
#[tonic::async_trait] | #[tonic::async_trait] | ||||
impl TunnelbrokerService for TunnelbrokerGRPC { | impl TunnelbrokerService for TunnelbrokerGRPC { | ||||
async fn send_message_to_device( | async fn send_message_to_device( | ||||
&self, | &self, | ||||
request: tonic::Request<proto::MessageToDevice>, | request: tonic::Request<proto::MessageToDevice>, | ||||
) -> Result<tonic::Response<proto::Empty>, tonic::Status> { | ) -> Result<tonic::Response<proto::Empty>, tonic::Status> { | ||||
let message = request.into_inner(); | let message = request.into_inner(); | ||||
debug!("Received message for {}", &message.device_id); | debug!("Received message for {}", &message.device_id); | ||||
// TODO: Persist messages for inactive connections | if let Some(tx) = ACTIVE_CONNECTIONS.get(&message.device_id) { | ||||
let tx = ACTIVE_CONNECTIONS | if let Err(_) = tx.send(message.payload) { | ||||
.get(&message.device_id) | error!("Unable to send message to device: {}", &message.device_id); | ||||
.ok_or(Status::unavailable("Device does not exist"))?; | ACTIVE_CONNECTIONS.remove(&message.device_id); | ||||
tx.send(message.payload).expect("Unable to send message"); | } | ||||
} else { | |||||
self | |||||
.client | |||||
.persist_message(&message.device_id, &message.payload) | |||||
.await | |||||
.map_err(handle_ddb_error)?; | |||||
} | |||||
let response = tonic::Response::new(Empty {}); | let response = tonic::Response::new(Empty {}); | ||||
Ok(response) | Ok(response) | ||||
} | } | ||||
} | } | ||||
pub async fn run_server() -> Result<(), tonic::transport::Error> { | pub async fn run_server( | ||||
client: DatabaseClient, | |||||
) -> Result<(), tonic::transport::Error> { | |||||
let addr = format!("[::1]:{}", CONFIG.grpc_port) | let addr = format!("[::1]:{}", CONFIG.grpc_port) | ||||
.parse() | .parse() | ||||
.expect("Unable to parse gRPC address"); | .expect("Unable to parse gRPC address"); | ||||
tracing::info!("Websocket server listening on {}", &addr); | tracing::info!("Websocket server listening on {}", &addr); | ||||
Server::builder() | Server::builder() | ||||
.http2_keepalive_interval(Some(constants::GRPC_KEEP_ALIVE_PING_INTERVAL)) | .http2_keepalive_interval(Some(constants::GRPC_KEEP_ALIVE_PING_INTERVAL)) | ||||
.http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT)) | .http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT)) | ||||
.add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC::default())) | .add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC { client })) | ||||
.serve(addr) | .serve(addr) | ||||
.await | .await | ||||
} | } |