diff --git a/shared/tunnelbroker-client/src/lib.rs b/shared/tunnelbroker-client/src/lib.rs --- a/shared/tunnelbroker-client/src/lib.rs +++ b/shared/tunnelbroker-client/src/lib.rs @@ -1,6 +1,9 @@ +use anyhow::Result; +use futures_util::stream; use lazy_static::lazy_static; use std::sync::Arc; use tokio::runtime::{Builder, Runtime}; +use tokio::sync::mpsc; use tonic::transport::Channel; use tunnelbroker::tunnelbroker_service_client::TunnelbrokerServiceClient; @@ -26,3 +29,46 @@ .block_on(TunnelbrokerServiceClient::connect(addr)) .expect("Failed to create Tokio runtime for the Tunnelbroker client") } + +pub async fn publish_( + client: &mut TunnelbrokerServiceClient, + to_device_id: String, + payload: String, +) -> anyhow::Result<()> { + let messages = vec![tunnelbroker::MessageToTunnelbroker { + data: Some(tunnelbroker::message_to_tunnelbroker::Data::MessagesToSend( + tunnelbroker::MessagesToSend { + messages: vec![tunnelbroker::MessageToTunnelbrokerStruct { + to_device_id, + payload, + blob_hashes: vec![], + }], + }, + )), + }]; + client + .messages_stream(stream::iter(messages)) + .await + .expect("Failed to send messages to the Tunnelbroker stream"); + Ok(()) +} + +pub async fn publish_message( + tx: &mpsc::Sender, + to_device_id: String, + payload: String, +) -> Result<()> { + let messages = tunnelbroker::MessageToTunnelbroker { + data: Some(tunnelbroker::message_to_tunnelbroker::Data::MessagesToSend( + tunnelbroker::MessagesToSend { + messages: vec![tunnelbroker::MessageToTunnelbrokerStruct { + to_device_id, + payload, + blob_hashes: vec![], + }], + }, + )), + }; + tx.send(messages).await?; + Ok(()) +}