diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -11,7 +11,7 @@ use tracing::debug; use tunnelbroker_messages::MessageToDevice; -use crate::constants::CLIENT_RMQ_MSG_PRIORITY; +use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::{handle_ddb_error, DatabaseClient}; use crate::{constants, CONFIG}; @@ -72,6 +72,35 @@ Ok(response) } + async fn force_close_device_connection( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let message = request.into_inner(); + + debug!("Connection close request for device {}", &message.device_id); + + self + .amqp_channel + .basic_publish( + "", + &message.device_id, + BasicPublishOptions::default(), + WS_SESSION_CLOSE_AMQP_MSG.as_bytes(), + BasicProperties::default() + // Connection close request should have higher priority + .with_priority(CLIENT_RMQ_MSG_PRIORITY + 1) + // The message should expire quickly. If the device isn't connected + // (there's no consumer), there's no point in keeping this message. + .with_expiration("1000".into()), + ) + .await + .map_err(handle_amqp_error)?; + + let response = tonic::Response::new(Empty {}); + Ok(response) + } + async fn delete_device_data( &self, request: tonic::Request, diff --git a/shared/protos/tunnelbroker.proto b/shared/protos/tunnelbroker.proto --- a/shared/protos/tunnelbroker.proto +++ b/shared/protos/tunnelbroker.proto @@ -16,6 +16,10 @@ // Deleting data associated with device. rpc DeleteDeviceData(DeleteDeviceDataRequest) returns (Empty) {} + + // Force close connection with device. + rpc ForceCloseDeviceConnection(DeviceConnectionCloseRequest) returns + (Empty) {} } message Empty {} @@ -31,3 +35,8 @@ // The primary identity key of a device string device_id = 1; } + +message DeviceConnectionCloseRequest { + // The primary identity key of a device + string device_id = 1; +}