Page MenuHomePhabricator

D10456.id35390.diff
No OneTemporary

D10456.id35390.diff

diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs
--- a/services/backup/src/database/mod.rs
+++ b/services/backup/src/database/mod.rs
@@ -1,8 +1,11 @@
pub mod backup_item;
pub mod log_item;
-use self::backup_item::{BackupItem, OrderedBackupItem};
-use crate::constants::backup_table;
+use self::{
+ backup_item::{BackupItem, OrderedBackupItem},
+ log_item::LogItem,
+};
+use crate::constants::{backup_table, log_table};
use aws_sdk_dynamodb::{
operation::get_item::GetItemOutput,
types::{AttributeValue, ReturnValue},
@@ -203,3 +206,24 @@
Ok(removed_backups)
}
}
+
+/// Backup log functions
+impl DatabaseClient {
+ pub async fn put_log_item(&self, log_item: LogItem) -> Result<(), Error> {
+ let item = log_item.into();
+
+ self
+ .client
+ .put_item()
+ .table_name(log_table::TABLE_NAME)
+ .set_item(Some(item))
+ .send()
+ .await
+ .map_err(|e| {
+ error!("DynamoDB client failed to put log item");
+ Error::AwsSdk(e.into())
+ })?;
+
+ Ok(())
+ }
+}
diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs
--- a/services/backup/src/http/handlers/log.rs
+++ b/services/backup/src/http/handlers/log.rs
@@ -1,26 +1,39 @@
use crate::constants::WS_FRAME_SIZE;
-use actix::{Actor, ActorContext, AsyncContext, StreamHandler};
+use crate::database::{log_item::LogItem, DatabaseClient};
+use actix::{Actor, ActorContext, ActorFutureExt, AsyncContext, StreamHandler};
use actix_http::ws::{CloseCode, Item};
use actix_web::{
web::{self, Bytes, BytesMut},
Error, HttpRequest, HttpResponse,
};
use actix_web_actors::ws::{self, WebsocketContext};
+use comm_lib::{
+ backup::{LogWSRequest, LogWSResponse, UploadLogRequest},
+ blob::{
+ client::{BlobServiceClient, BlobServiceError},
+ types::BlobInfo,
+ },
+ database::{self, blob::BlobOrDBContent},
+};
use std::{
sync::Arc,
time::{Duration, Instant},
};
-use tracing::{info, instrument, warn};
+use tracing::{error, info, instrument, warn};
pub async fn handle_ws(
path: web::Path<String>,
req: HttpRequest,
stream: web::Payload,
+ blob_client: web::Data<BlobServiceClient>,
+ db_client: web::Data<DatabaseClient>,
) -> Result<HttpResponse, Error> {
let backup_id = path.into_inner();
ws::WsResponseBuilder::new(
LogWSActor {
info: Arc::new(ConnectionInfo { backup_id }),
+ blob_client: blob_client.as_ref().clone(),
+ db_client: db_client.as_ref().clone(),
last_msg_time: Instant::now(),
buffer: BytesMut::new(),
},
@@ -35,8 +48,20 @@
backup_id: String,
}
+#[derive(
+ Debug, derive_more::From, derive_more::Display, derive_more::Error,
+)]
+enum LogWSError {
+ BincodeError(bincode::Error),
+ BlobError(BlobServiceError),
+ DBError(database::Error),
+}
+
struct LogWSActor {
info: Arc<ConnectionInfo>,
+ blob_client: BlobServiceClient,
+ db_client: DatabaseClient,
+
last_msg_time: Instant,
buffer: BytesMut,
}
@@ -50,7 +75,99 @@
ctx: &mut WebsocketContext<LogWSActor>,
bytes: Bytes,
) {
- ctx.binary(bytes);
+ let fut = Self::handle_msg(
+ self.info.clone(),
+ self.blob_client.clone(),
+ self.db_client.clone(),
+ bytes.into(),
+ );
+
+ let fut = actix::fut::wrap_future(fut).map(
+ |responses,
+ _: &mut LogWSActor,
+ ctx: &mut WebsocketContext<LogWSActor>| {
+ let responses = match responses {
+ Ok(responses) => responses,
+ Err(err) => {
+ error!("Error: {err:?}");
+ vec![LogWSResponse::ServerError]
+ }
+ };
+
+ for response in responses {
+ match bincode::serialize(&response) {
+ Ok(bytes) => ctx.binary(bytes),
+ Err(error) => {
+ error!(
+ "Error serializing a response: {response:?}. Error: {error}"
+ );
+ }
+ };
+ }
+ },
+ );
+
+ ctx.spawn(fut);
+ }
+
+ async fn handle_msg(
+ info: Arc<ConnectionInfo>,
+ blob_client: BlobServiceClient,
+ db_client: DatabaseClient,
+ bytes: Bytes,
+ ) -> Result<Vec<LogWSResponse>, LogWSError> {
+ let request = bincode::deserialize(&bytes)?;
+
+ match request {
+ LogWSRequest::UploadLog(UploadLogRequest {
+ log_id,
+ content,
+ attachments,
+ }) => {
+ let mut attachment_blob_infos = Vec::new();
+
+ for attachment in attachments.unwrap_or_default() {
+ let blob_info =
+ Self::create_attachment(&blob_client, attachment).await?;
+
+ attachment_blob_infos.push(blob_info);
+ }
+
+ let mut log_item = LogItem {
+ backup_id: info.backup_id.clone(),
+ log_id,
+ content: BlobOrDBContent::new(content),
+ attachments: attachment_blob_infos,
+ };
+
+ log_item.ensure_size_constraints(&blob_client).await?;
+ db_client.put_log_item(log_item).await?;
+
+ Ok(vec![LogWSResponse::LogUploaded { log_id }])
+ }
+ }
+ }
+
+ async fn create_attachment(
+ blob_client: &BlobServiceClient,
+ attachment: String,
+ ) -> Result<BlobInfo, BlobServiceError> {
+ let blob_info = BlobInfo {
+ blob_hash: attachment,
+ holder: uuid::Uuid::new_v4().to_string(),
+ };
+
+ if !blob_client
+ .assign_holder(&blob_info.blob_hash, &blob_info.holder)
+ .await?
+ {
+ warn!(
+ "Blob attachment with hash {:?} doesn't exist",
+ blob_info.blob_hash
+ );
+ }
+
+ Ok(blob_info)
}
}
diff --git a/shared/comm-lib/src/backup/mod.rs b/shared/comm-lib/src/backup/mod.rs
--- a/shared/comm-lib/src/backup/mod.rs
+++ b/shared/comm-lib/src/backup/mod.rs
@@ -5,3 +5,21 @@
#[serde(rename = "backupID")]
pub backup_id: String,
}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UploadLogRequest {
+ pub log_id: usize,
+ pub content: Vec<u8>,
+ pub attachments: Option<Vec<String>>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub enum LogWSRequest {
+ UploadLog(UploadLogRequest),
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub enum LogWSResponse {
+ LogUploaded { log_id: usize },
+ ServerError,
+}

File Metadata

Mime Type
text/plain
Expires
Sun, Oct 6, 9:27 PM (22 h, 51 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2250942
Default Alt Text
D10456.id35390.diff (6 KB)

Event Timeline