diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs
--- a/services/tunnelbroker/src/cxx_bridge.rs
+++ b/services/tunnelbroker/src/cxx_bridge.rs
@@ -31,6 +31,15 @@
     sessionID: String,
     grpcStatus: GrpcResult,
   }
+  struct SessionItem {
+    deviceID: String,
+    publicKey: String,
+    notifyToken: String,
+    deviceType: i32,
+    appVersion: String,
+    deviceOS: String,
+    isOnline: bool,
+  }
 
   unsafe extern "C++" {
     include!("tunnelbroker/src/libcpp/Tunnelbroker.h");
@@ -47,5 +56,6 @@
       deviceOS: &str,
       notifyToken: &str,
     ) -> NewSessionResult;
+    pub fn getSessionItem(sessionID: &str) -> Result<SessionItem>;
   }
 }
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
@@ -15,3 +15,4 @@
     rust::Str deviceAppVersion,
     rust::Str deviceOS,
     rust::Str notifyToken);
+SessionItem getSessionItem(rust::Str sessionID);
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
@@ -148,3 +148,25 @@
       .sessionID = newSessionID,
       .grpcStatus = {.statusCode = GRPCStatusCodes::Ok}};
 }
+
+SessionItem getSessionItem(rust::Str sessionID) {
+  const std::string stringSessionID = std::string{sessionID};
+  if (!comm::network::tools::validateSessionID(stringSessionID)) {
+    throw std::invalid_argument("Invalid format for 'sessionID'");
+  }
+  std::shared_ptr<comm::network::database::DeviceSessionItem> sessionItem =
+      comm::network::database::DatabaseManager::getInstance().findSessionItem(
+          stringSessionID);
+  if (sessionItem == nullptr) {
+    throw std::invalid_argument(
+        "No sessions found for 'sessionID': " + stringSessionID);
+  }
+  return SessionItem{
+      .deviceID = sessionItem->getDeviceID(),
+      .publicKey = sessionItem->getPubKey(),
+      .notifyToken = sessionItem->getNotifyToken(),
+      .deviceType = static_cast<int>(sessionItem->getDeviceType()),
+      .appVersion = sessionItem->getAppVersion(),
+      .deviceOS = sessionItem->getDeviceOs(),
+      .isOnline = sessionItem->getIsOnline()};
+}
diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs
--- a/services/tunnelbroker/src/server/mod.rs
+++ b/services/tunnelbroker/src/server/mod.rs
@@ -1,12 +1,13 @@
 use super::constants;
 use super::cxx_bridge::ffi::{
-  newSessionHandler, sessionSignatureHandler, GRPCStatusCodes,
+  getSessionItem, newSessionHandler, sessionSignatureHandler, GRPCStatusCodes,
 };
 use anyhow::Result;
 use futures::Stream;
 use std::pin::Pin;
-use tonic::transport::Server;
-use tonic::{Request, Response, Status, Streaming};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::{transport::Server, Request, Response, Status, Streaming};
 use tunnelbroker::tunnelbroker_service_server::{
   TunnelbrokerService, TunnelbrokerServiceServer,
 };
@@ -76,11 +77,33 @@
       dyn Stream<Item = Result<tunnelbroker::MessageToClient, Status>> + Send,
     >,
   >;
+
   async fn messages_stream(
     &self,
-    _request: Request<Streaming<tunnelbroker::MessageToTunnelbroker>>,
+    request: Request<Streaming<tunnelbroker::MessageToTunnelbroker>>,
   ) -> Result<Response<Self::MessagesStreamStream>, Status> {
-    Err(Status::unimplemented("Not implemented yet"))
+    let session_id = match request.metadata().get("sessionID") {
+      Some(metadata_session_id) => metadata_session_id
+        .to_str()
+        .expect("metadata session id was not valid UTF8")
+        .to_string(),
+      None => {
+        return Err(Status::invalid_argument(
+          "No 'sessionID' in metadata was provided",
+        ))
+      }
+    };
+    let _session_item = match getSessionItem(&session_id) {
+      Ok(database_item) => database_item,
+      Err(err) => return Err(Status::unauthenticated(err.what())),
+    };
+
+    let (_tx, rx) = mpsc::channel(constants::GRPC_TX_QUEUE_SIZE);
+
+    let output_stream = ReceiverStream::new(rx);
+    Ok(Response::new(
+      Box::pin(output_stream) as Self::MessagesStreamStream
+    ))
   }
 
   // These empty old API handlers are deprecated and should be removed.