diff --git a/services/identity/src/http/mod.rs b/services/identity/src/http/mod.rs new file mode 100644 --- /dev/null +++ b/services/identity/src/http/mod.rs @@ -0,0 +1,23 @@ +use http::StatusCode; +use hyper::{Body, Request, Response}; + +type HttpRequest = Request; +type HttpResponse = Response; + +/// Main router for HTTP requests +#[tracing::instrument(skip_all, name = "http_request", fields(request_id))] +pub(super) async fn handle_http_request( + req: HttpRequest, + _db_client: crate::DatabaseClient, +) -> Result { + tracing::Span::current() + .record("request_id", uuid::Uuid::new_v4().to_string()); + + let response = match req.uri().path() { + "/health" => Response::new(Body::from("OK")), + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found"))?, + }; + Ok(response) +} diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs --- a/services/identity/src/main.rs +++ b/services/identity/src/main.rs @@ -14,6 +14,7 @@ pub mod error; mod grpc_services; mod grpc_utils; +mod http; mod id; mod keygen; mod nonce; @@ -78,9 +79,10 @@ ); let inner_auth_service = AuthenticatedService::new(database_client.clone()); + let db_client = database_client.clone(); let auth_service = AuthServer::with_interceptor(inner_auth_service, move |req| { - grpc_services::authenticated::auth_interceptor(req, &database_client) + grpc_services::authenticated::auth_interceptor(req, &db_client) .and_then(grpc_services::shared::version_interceptor) }); @@ -100,7 +102,7 @@ .add_service(auth_service) .serve(addr); - let websocket_server = websockets::run_server(); + let websocket_server = websockets::run_server(database_client); return tokio::select! { websocket_result = websocket_server => websocket_result, diff --git a/services/identity/src/websockets/mod.rs b/services/identity/src/websockets/mod.rs --- a/services/identity/src/websockets/mod.rs +++ b/services/identity/src/websockets/mod.rs @@ -5,7 +5,7 @@ use futures::lock::Mutex; use futures_util::{SinkExt, StreamExt}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Body, Request, Response}; use hyper_tungstenite::tungstenite::Message; use hyper_tungstenite::HyperWebsocket; use identity_search_messages::{ @@ -48,6 +48,7 @@ struct WebsocketService { addr: SocketAddr, + db_client: crate::DatabaseClient, } impl hyper::service::Service> for WebsocketService { @@ -65,6 +66,7 @@ fn call(&mut self, mut req: Request) -> Self::Future { let addr = self.addr; + let db_client = self.db_client.clone(); let future = async move { debug!( @@ -83,20 +85,17 @@ return Ok(response); } - let response = match req.uri().path() { - "/health" => Response::new(Body::from("OK")), - _ => Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("Not found"))?, - }; - Ok(response) + // If not a websocker upgrade, treat it as regular HTTP request + crate::http::handle_http_request(req, db_client).await }; Box::pin(future) } } #[tracing::instrument(skip_all)] -pub async fn run_server() -> Result<(), errors::BoxedError> { +pub async fn run_server( + db_client: crate::DatabaseClient, +) -> Result<(), errors::BoxedError> { let addr: SocketAddr = IDENTITY_SERVICE_WEBSOCKET_ADDR.parse()?; let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); @@ -107,8 +106,9 @@ http.http1_keep_alive(true); while let Ok((stream, addr)) = listener.accept().await { + let db_client = db_client.clone(); let connection = http - .serve_connection(stream, WebsocketService { addr }) + .serve_connection(stream, WebsocketService { addr, db_client }) .with_upgrades(); tokio::spawn(async move {