Page MenuHomePhabricator

D12301.id40912.diff
No OneTemporary

D12301.id40912.diff

diff --git a/services/identity/src/http/mod.rs b/services/identity/src/http/mod.rs
new file mode 100644
--- /dev/null
+++ b/services/identity/src/http/mod.rs
@@ -0,0 +1,23 @@
+use http::StatusCode;
+use hyper::{Body, Request, Response};
+
+type HttpRequest = Request<Body>;
+type HttpResponse = Response<Body>;
+
+/// Main router for HTTP requests
+#[tracing::instrument(skip_all, name = "http_request", fields(request_id))]
+pub(super) async fn handle_http_request(
+ req: HttpRequest,
+ _db_client: crate::DatabaseClient,
+) -> Result<HttpResponse, crate::websockets::errors::BoxedError> {
+ tracing::Span::current()
+ .record("request_id", uuid::Uuid::new_v4().to_string());
+
+ let response = match req.uri().path() {
+ "/health" => Response::new(Body::from("OK")),
+ _ => Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::from("Not found"))?,
+ };
+ Ok(response)
+}
diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs
--- a/services/identity/src/main.rs
+++ b/services/identity/src/main.rs
@@ -14,6 +14,7 @@
pub mod error;
mod grpc_services;
mod grpc_utils;
+mod http;
mod id;
mod keygen;
mod nonce;
@@ -78,9 +79,10 @@
);
let inner_auth_service =
AuthenticatedService::new(database_client.clone());
+ let db_client = database_client.clone();
let auth_service =
AuthServer::with_interceptor(inner_auth_service, move |req| {
- grpc_services::authenticated::auth_interceptor(req, &database_client)
+ grpc_services::authenticated::auth_interceptor(req, &db_client)
.and_then(grpc_services::shared::version_interceptor)
});
@@ -100,7 +102,7 @@
.add_service(auth_service)
.serve(addr);
- let websocket_server = websockets::run_server();
+ let websocket_server = websockets::run_server(database_client);
return tokio::select! {
websocket_result = websocket_server => websocket_result,
diff --git a/services/identity/src/websockets/mod.rs b/services/identity/src/websockets/mod.rs
--- a/services/identity/src/websockets/mod.rs
+++ b/services/identity/src/websockets/mod.rs
@@ -5,7 +5,7 @@
use futures::lock::Mutex;
use futures_util::{SinkExt, StreamExt};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Body, Request, Response};
use hyper_tungstenite::tungstenite::Message;
use hyper_tungstenite::HyperWebsocket;
use identity_search_messages::{
@@ -48,6 +48,7 @@
struct WebsocketService {
addr: SocketAddr,
+ db_client: crate::DatabaseClient,
}
impl hyper::service::Service<Request<Body>> for WebsocketService {
@@ -65,6 +66,7 @@
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let addr = self.addr;
+ let db_client = self.db_client.clone();
let future = async move {
debug!(
@@ -83,20 +85,17 @@
return Ok(response);
}
- let response = match req.uri().path() {
- "/health" => Response::new(Body::from("OK")),
- _ => Response::builder()
- .status(StatusCode::NOT_FOUND)
- .body(Body::from("Not found"))?,
- };
- Ok(response)
+ // If not a websocker upgrade, treat it as regular HTTP request
+ crate::http::handle_http_request(req, db_client).await
};
Box::pin(future)
}
}
#[tracing::instrument(skip_all)]
-pub async fn run_server() -> Result<(), errors::BoxedError> {
+pub async fn run_server(
+ db_client: crate::DatabaseClient,
+) -> Result<(), errors::BoxedError> {
let addr: SocketAddr = IDENTITY_SERVICE_WEBSOCKET_ADDR.parse()?;
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
@@ -107,8 +106,9 @@
http.http1_keep_alive(true);
while let Ok((stream, addr)) = listener.accept().await {
+ let db_client = db_client.clone();
let connection = http
- .serve_connection(stream, WebsocketService { addr })
+ .serve_connection(stream, WebsocketService { addr, db_client })
.with_upgrades();
tokio::spawn(async move {

File Metadata

Mime Type
text/plain
Expires
Sun, Dec 1, 7:07 AM (21 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2604155
Default Alt Text
D12301.id40912.diff (3 KB)

Event Timeline