diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -1,8 +1,11 @@ pub mod backup_item; pub mod log_item; -use self::backup_item::{BackupItem, OrderedBackupItem}; -use crate::constants::backup_table; +use self::{ + backup_item::{BackupItem, OrderedBackupItem}, + log_item::LogItem, +}; +use crate::constants::{backup_table, log_table}; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::{AttributeValue, ReturnValue}, @@ -203,3 +206,24 @@ Ok(removed_backups) } } + +/// Backup log functions +impl DatabaseClient { + pub async fn put_log_item(&self, log_item: LogItem) -> Result<(), Error> { + let item = log_item.into(); + + self + .client + .put_item() + .table_name(log_table::TABLE_NAME) + .set_item(Some(item)) + .send() + .await + .map_err(|e| { + error!("DynamoDB client failed to put log item"); + Error::AwsSdk(e.into()) + })?; + + Ok(()) + } +} diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs --- a/services/backup/src/http/handlers/log.rs +++ b/services/backup/src/http/handlers/log.rs @@ -1,26 +1,39 @@ use crate::constants::WS_FRAME_SIZE; -use actix::{Actor, ActorContext, AsyncContext, StreamHandler}; +use crate::database::{log_item::LogItem, DatabaseClient}; +use actix::{Actor, ActorContext, ActorFutureExt, AsyncContext, StreamHandler}; use actix_http::ws::{CloseCode, Item}; use actix_web::{ web::{self, Bytes, BytesMut}, Error, HttpRequest, HttpResponse, }; use actix_web_actors::ws::{self, WebsocketContext}; +use comm_lib::{ + backup::{LogWSRequest, LogWSResponse, UploadLogRequest}, + blob::{ + client::{BlobServiceClient, BlobServiceError}, + types::BlobInfo, + }, + database::{self, blob::BlobOrDBContent}, +}; use std::{ sync::Arc, time::{Duration, Instant}, }; -use tracing::{info, instrument, warn}; +use tracing::{error, info, instrument, warn}; pub async fn handle_ws( path: web::Path, req: HttpRequest, stream: web::Payload, + blob_client: web::Data, + db_client: web::Data, ) -> Result { let backup_id = path.into_inner(); ws::WsResponseBuilder::new( LogWSActor { info: Arc::new(ConnectionInfo { backup_id }), + blob_client: blob_client.as_ref().clone(), + db_client: db_client.as_ref().clone(), last_msg_time: Instant::now(), buffer: BytesMut::new(), }, @@ -35,8 +48,20 @@ backup_id: String, } +#[derive( + Debug, derive_more::From, derive_more::Display, derive_more::Error, +)] +enum LogWSError { + BincodeError(bincode::Error), + BlobError(BlobServiceError), + DBError(database::Error), +} + struct LogWSActor { info: Arc, + blob_client: BlobServiceClient, + db_client: DatabaseClient, + last_msg_time: Instant, buffer: BytesMut, } @@ -50,7 +75,99 @@ ctx: &mut WebsocketContext, bytes: Bytes, ) { - ctx.binary(bytes); + let fut = Self::handle_msg( + self.info.clone(), + self.blob_client.clone(), + self.db_client.clone(), + bytes.into(), + ); + + let fut = actix::fut::wrap_future(fut).map( + |responses, + _: &mut LogWSActor, + ctx: &mut WebsocketContext| { + let responses = match responses { + Ok(responses) => responses, + Err(err) => { + error!("Error: {err:?}"); + vec![LogWSResponse::ServerError] + } + }; + + for response in responses { + match bincode::serialize(&response) { + Ok(bytes) => ctx.binary(bytes), + Err(error) => { + error!( + "Error serializing a response: {response:?}. Error: {error}" + ); + } + }; + } + }, + ); + + ctx.spawn(fut); + } + + async fn handle_msg( + info: Arc, + blob_client: BlobServiceClient, + db_client: DatabaseClient, + bytes: Bytes, + ) -> Result, LogWSError> { + let request = bincode::deserialize(&bytes)?; + + match request { + LogWSRequest::UploadLog(UploadLogRequest { + log_id, + content, + attachments, + }) => { + let mut attachment_blob_infos = Vec::new(); + + for attachment in attachments.unwrap_or_default() { + let blob_info = + Self::create_attachment(&blob_client, attachment).await?; + + attachment_blob_infos.push(blob_info); + } + + let mut log_item = LogItem { + backup_id: info.backup_id.clone(), + log_id, + content: BlobOrDBContent::new(content), + attachments: attachment_blob_infos, + }; + + log_item.ensure_size_constraints(&blob_client).await?; + db_client.put_log_item(log_item).await?; + + Ok(vec![LogWSResponse::LogUploaded { log_id }]) + } + } + } + + async fn create_attachment( + blob_client: &BlobServiceClient, + attachment: String, + ) -> Result { + let blob_info = BlobInfo { + blob_hash: attachment, + holder: uuid::Uuid::new_v4().to_string(), + }; + + if !blob_client + .assign_holder(&blob_info.blob_hash, &blob_info.holder) + .await? + { + warn!( + "Blob attachment with hash {:?} doesn't exist", + blob_info.blob_hash + ); + } + + Ok(blob_info) } } diff --git a/shared/comm-lib/src/backup/mod.rs b/shared/comm-lib/src/backup/mod.rs --- a/shared/comm-lib/src/backup/mod.rs +++ b/shared/comm-lib/src/backup/mod.rs @@ -5,3 +5,21 @@ #[serde(rename = "backupID")] pub backup_id: String, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UploadLogRequest { + pub log_id: usize, + pub content: Vec, + pub attachments: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum LogWSRequest { + UploadLog(UploadLogRequest), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum LogWSResponse { + LogUploaded { log_id: usize }, + ServerError, +}