Page MenuHomePhabricator

D10453.id34990.diff
No OneTemporary

D10453.id34990.diff

diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/src/http/handlers/log.rs
@@ -0,0 +1,153 @@
+use actix::{Actor, ActorContext, AsyncContext, StreamHandler};
+use actix_http::ws::{CloseCode, Item};
+use actix_web::{
+ web::{self, Bytes, BytesMut},
+ Error, HttpRequest, HttpResponse,
+};
+use actix_web_actors::ws::{self, WebsocketContext};
+use comm_lib::auth::UserIdentity;
+use std::{
+ sync::Arc,
+ time::{Duration, Instant},
+};
+use tracing::{info, instrument, warn};
+
+pub async fn handle_ws(
+ user: UserIdentity,
+ path: web::Path<String>,
+ req: HttpRequest,
+ stream: web::Payload,
+) -> Result<HttpResponse, Error> {
+ let backup_id = path.into_inner();
+ ws::start(
+ LogWSActor {
+ info: Arc::new(ConnectionInfo { user, backup_id }),
+ last_msg: Instant::now(),
+ buffer: BytesMut::new(),
+ },
+ &req,
+ stream,
+ )
+}
+
+struct ConnectionInfo {
+ user: UserIdentity,
+ backup_id: String,
+}
+
+struct LogWSActor {
+ info: Arc<ConnectionInfo>,
+ last_msg: Instant,
+ buffer: BytesMut,
+}
+
+impl LogWSActor {
+ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
+ const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
+
+ fn handle_msg_sync(
+ &self,
+ ctx: &mut WebsocketContext<LogWSActor>,
+ bytes: Bytes,
+ ) {
+ ctx.binary(bytes);
+ }
+}
+
+impl Actor for LogWSActor {
+ type Context = ws::WebsocketContext<Self>;
+
+ #[instrument(skip_all, fields(user = %self.info.user, backup_id = self.info.backup_id))]
+ fn started(&mut self, ctx: &mut Self::Context) {
+ info!("Socket opened");
+ ctx.run_interval(Self::HEARTBEAT_INTERVAL, |actor, ctx| {
+ if Instant::now().duration_since(actor.last_msg)
+ > Self::CONNECTION_TIMEOUT
+ {
+ warn!("Socket timeout, closing connection");
+ ctx.stop();
+ return;
+ }
+
+ ctx.ping(&[]);
+ });
+ }
+
+ #[instrument(skip_all, fields(user = %self.info.user, backup_id = self.info.backup_id))]
+ fn stopped(&mut self, _: &mut Self::Context) {
+ info!("Socket closed");
+ }
+}
+
+impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for LogWSActor {
+ #[instrument(skip_all, fields(user = %self.info.user, backup_id = self.info.backup_id))]
+ fn handle(
+ &mut self,
+ msg: Result<ws::Message, ws::ProtocolError>,
+ ctx: &mut Self::Context,
+ ) {
+ let msg = match msg {
+ Ok(msg) => msg,
+ Err(err) => {
+ warn!("Error during socket message handling: {err}");
+ ctx.close(Some(CloseCode::Error.into()));
+ ctx.stop();
+ return;
+ }
+ };
+
+ self.last_msg = Instant::now();
+
+ match msg {
+ ws::Message::Binary(bytes) => self.handle_msg_sync(ctx, bytes),
+ // Continuations
+ ws::Message::Continuation(Item::FirstBinary(bytes)) => {
+ if !self.buffer.is_empty() {
+ warn!("Socket received continuation before previous was completed");
+ ctx.close(Some(CloseCode::Error.into()));
+ ctx.stop();
+ return;
+ }
+ self.buffer.extend_from_slice(&bytes);
+ }
+ ws::Message::Continuation(Item::Continue(bytes)) => {
+ if self.buffer.is_empty() {
+ warn!("Socket received continuation message before it was started");
+ ctx.close(Some(CloseCode::Error.into()));
+ ctx.stop();
+ return;
+ }
+ self.buffer.extend_from_slice(&bytes);
+ }
+ ws::Message::Continuation(Item::Last(bytes)) => {
+ if self.buffer.is_empty() {
+ warn!(
+ "Socket received last continuation message before it was started"
+ );
+ ctx.close(Some(CloseCode::Error.into()));
+ ctx.stop();
+ return;
+ }
+ self.buffer.extend_from_slice(&bytes);
+ let bytes = self.buffer.split();
+
+ self.handle_msg_sync(ctx, bytes.into());
+ }
+ // Heartbeat
+ ws::Message::Ping(message) => ctx.pong(&message),
+ ws::Message::Pong(_) => (),
+ // Other
+ ws::Message::Text(_) | ws::Message::Continuation(Item::FirstText(_)) => {
+ warn!("Socket received unsupported message");
+ ctx.close(Some(CloseCode::Unsupported.into()));
+ ctx.stop();
+ }
+ ws::Message::Close(reason) => {
+ info!("Socket was closed");
+ ctx.close(reason);
+ ctx.stop();
+ }
+ ws::Message::Nop => (),
+ }
+ }
+}
diff --git a/services/backup/src/http/mod.rs b/services/backup/src/http/mod.rs
--- a/services/backup/src/http/mod.rs
+++ b/services/backup/src/http/mod.rs
@@ -6,10 +6,11 @@
};
use tracing::info;
-use crate::{database::DatabaseClient, CONFIG};
+use crate::{database::DatabaseClient, http::handlers::log::handle_ws, CONFIG};
mod handlers {
pub(super) mod backup;
+ pub(super) mod log;
}
pub async fn run_http_server(
@@ -34,7 +35,7 @@
.app_data(blob.clone())
.route("/health", web::get().to(HttpResponse::Ok))
.service(
- // Services that don't require authetication
+ // Backup services that don't require authetication
web::scope("/backups/latest")
.service(
web::resource("{username}/backup_id")
@@ -45,7 +46,7 @@
)),
)
.service(
- // Services requiring authetication
+ // Backup services requiring authetication
web::scope("/backups")
.wrap(get_comm_authentication_middleware())
.service(
@@ -60,6 +61,11 @@
.route(web::get().to(handlers::backup::download_user_data)),
),
)
+ .service(
+ web::scope("/logs")
+ .wrap(get_comm_authentication_middleware())
+ .service(web::resource("{backup_d}").route(web::get().to(handle_ws))),
+ )
})
.bind(("0.0.0.0", CONFIG.http_port))?
.run()

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 21, 12:14 PM (20 h, 9 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687640
Default Alt Text
D10453.id34990.diff (5 KB)

Event Timeline