diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -3,6 +3,7 @@ pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const ID_SEPARATOR: &str = ":"; pub const ATTACHMENT_HOLDER_SEPARATOR: &str = ";"; +pub const WS_FRAME_SIZE: usize = 1_048_576; // 1MiB // Configuration defaults pub const DEFAULT_HTTP_PORT: u16 = 50052; diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs new file mode 100644 --- /dev/null +++ b/services/backup/src/http/handlers/log.rs @@ -0,0 +1,154 @@ +use crate::constants::WS_FRAME_SIZE; +use actix::{Actor, ActorContext, AsyncContext, StreamHandler}; +use actix_http::ws::{CloseCode, Item}; +use actix_web::{ + web::{self, Bytes, BytesMut}, + Error, HttpRequest, HttpResponse, +}; +use actix_web_actors::ws::{self, WebsocketContext}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use tracing::{info, instrument, warn}; + +pub async fn handle_ws( + path: web::Path, + req: HttpRequest, + stream: web::Payload, +) -> Result { + let backup_id = path.into_inner(); + ws::WsResponseBuilder::new( + LogWSActor { + info: Arc::new(ConnectionInfo { backup_id }), + last_msg_time: Instant::now(), + buffer: BytesMut::new(), + }, + &req, + stream, + ) + .frame_size(WS_FRAME_SIZE) + .start() +} + +struct ConnectionInfo { + backup_id: String, +} + +struct LogWSActor { + info: Arc, + last_msg_time: Instant, + buffer: BytesMut, +} + +impl LogWSActor { + const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); + const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); + + fn handle_msg_sync( + &self, + ctx: &mut WebsocketContext, + bytes: Bytes, + ) { + ctx.binary(bytes); + } +} + +impl Actor for LogWSActor { + type Context = ws::WebsocketContext; + + #[instrument(skip_all, fields(backup_id = self.info.backup_id))] + fn started(&mut self, ctx: &mut Self::Context) { + info!("Socket opened"); + ctx.run_interval(Self::HEARTBEAT_INTERVAL, |actor, ctx| { + if Instant::now().duration_since(actor.last_msg_time) + > Self::CONNECTION_TIMEOUT + { + warn!("Socket timeout, closing connection"); + ctx.stop(); + return; + } + + ctx.ping(&[]); + }); + } + + #[instrument(skip_all, fields(backup_id = self.info.backup_id))] + fn stopped(&mut self, _: &mut Self::Context) { + info!("Socket closed"); + } +} + +impl StreamHandler> for LogWSActor { + #[instrument(skip_all, fields(backup_id = self.info.backup_id))] + fn handle( + &mut self, + msg: Result, + ctx: &mut Self::Context, + ) { + let msg = match msg { + Ok(msg) => msg, + Err(err) => { + warn!("Error during socket message handling: {err}"); + ctx.close(Some(CloseCode::Error.into())); + ctx.stop(); + return; + } + }; + + self.last_msg_time = Instant::now(); + + match msg { + ws::Message::Binary(bytes) => self.handle_msg_sync(ctx, bytes), + // Continuations - this is mostly boilerplate code. Some websocket + // clients may split a message into these ones + ws::Message::Continuation(Item::FirstBinary(bytes)) => { + if !self.buffer.is_empty() { + warn!("Socket received continuation before previous was completed"); + ctx.close(Some(CloseCode::Error.into())); + ctx.stop(); + return; + } + self.buffer.extend_from_slice(&bytes); + } + ws::Message::Continuation(Item::Continue(bytes)) => { + if self.buffer.is_empty() { + warn!("Socket received continuation message before it was started"); + ctx.close(Some(CloseCode::Error.into())); + ctx.stop(); + return; + } + self.buffer.extend_from_slice(&bytes); + } + ws::Message::Continuation(Item::Last(bytes)) => { + if self.buffer.is_empty() { + warn!( + "Socket received last continuation message before it was started" + ); + ctx.close(Some(CloseCode::Error.into())); + ctx.stop(); + return; + } + self.buffer.extend_from_slice(&bytes); + let bytes = self.buffer.split(); + + self.handle_msg_sync(ctx, bytes.into()); + } + // Heartbeat + ws::Message::Ping(message) => ctx.pong(&message), + ws::Message::Pong(_) => (), + // Other + ws::Message::Text(_) | ws::Message::Continuation(Item::FirstText(_)) => { + warn!("Socket received unsupported message"); + ctx.close(Some(CloseCode::Unsupported.into())); + ctx.stop(); + } + ws::Message::Close(reason) => { + info!("Socket was closed"); + ctx.close(reason); + ctx.stop(); + } + ws::Message::Nop => (), + } + } +} diff --git a/services/backup/src/http/mod.rs b/services/backup/src/http/mod.rs --- a/services/backup/src/http/mod.rs +++ b/services/backup/src/http/mod.rs @@ -6,10 +6,11 @@ }; use tracing::info; -use crate::{database::DatabaseClient, CONFIG}; +use crate::{database::DatabaseClient, http::handlers::log::handle_ws, CONFIG}; mod handlers { pub(super) mod backup; + pub(super) mod log; } pub async fn run_http_server( @@ -34,7 +35,7 @@ .app_data(blob.clone()) .route("/health", web::get().to(HttpResponse::Ok)) .service( - // Services that don't require authetication + // Backup services that don't require authetication web::scope("/backups/latest") .service( web::resource("{username}/backup_id") @@ -45,7 +46,7 @@ )), ) .service( - // Services requiring authetication + // Backup services requiring authetication web::scope("/backups") .wrap(get_comm_authentication_middleware()) .service( @@ -60,6 +61,11 @@ .route(web::get().to(handlers::backup::download_user_data)), ), ) + .service( + web::scope("/logs") + .wrap(get_comm_authentication_middleware()) + .service(web::resource("{backup_d}").route(web::get().to(handle_ws))), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run()