diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs index 0cb2c4e92..f1eb28756 100644 --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,45 +1,54 @@ // Assorted constants pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const ID_SEPARATOR: &str = ":"; pub const ATTACHMENT_HOLDER_SEPARATOR: &str = ";"; pub const WS_FRAME_SIZE: usize = 1_048_576; // 1MiB pub const LOG_DEFAULT_PAGE_SIZE: i32 = 20; pub const LOG_BACKUP_ID_SEPARATOR: &str = "#"; // Configuration defaults pub const DEFAULT_HTTP_PORT: u16 = 50052; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // Environment variable names pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; // DynamoDB constants pub mod backup_table { pub const TABLE_NAME: &str = "backup-service-backup"; pub const CREATED_INDEX: &str = "userID-created-index"; pub mod attr { pub const USER_ID: &str = "userID"; pub const BACKUP_ID: &str = "backupID"; pub const CREATED: &str = "created"; pub const USER_DATA: &str = "userData"; pub const USER_KEYS: &str = "userKeys"; pub const ATTACHMENTS: &str = "attachments"; pub const SIWE_BACKUP_MSG: &str = "siweBackupMsg"; } } pub mod log_table { pub const TABLE_NAME: &str = "backup-service-log"; pub mod attr { pub const BACKUP_ID: &str = "backupID"; pub const LOG_ID: &str = "logID"; pub const CONTENT_DB: &str = "content"; pub const CONTENT_BLOB_INFO: &str = "blobInfo"; pub const ATTACHMENTS: &str = "attachments"; } } + +// Error Types + +pub mod error_types { + pub const DDB_ERROR: &str = "DDB Error"; + pub const AUTH_ERROR: &str = "Auth Error"; + pub const BLOB_ERROR: &str = "Blob Error"; + pub const WS_ERROR: &str = "WS Error"; +} diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs index 9dab455ec..22c63e0a0 100644 --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -1,369 +1,389 @@ pub mod backup_item; pub mod log_item; use self::{ backup_item::{BackupItem, OrderedBackupItem}, log_item::LogItem, }; -use crate::constants::{backup_table, log_table, LOG_DEFAULT_PAGE_SIZE}; +use crate::constants::{ + backup_table, error_types, log_table, LOG_DEFAULT_PAGE_SIZE, +}; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::{AttributeValue, DeleteRequest, ReturnValue, WriteRequest}, }; use comm_lib::{ blob::client::BlobServiceClient, database::{ self, batch_operations::ExponentialBackoffConfig, parse_int_attribute, Error, }, }; use tracing::{error, trace, warn}; #[derive(Clone)] pub struct DatabaseClient { client: aws_sdk_dynamodb::Client, } impl DatabaseClient { pub fn new(aws_config: &aws_config::SdkConfig) -> Self { DatabaseClient { client: aws_sdk_dynamodb::Client::new(aws_config), } } } /// Backup functions impl DatabaseClient { pub async fn put_backup_item( &self, backup_item: BackupItem, ) -> Result<(), Error> { let item = backup_item.into(); self .client .put_item() .table_name(backup_table::TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { - error!("DynamoDB client failed to put backup item"); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to put backup item" + ); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = BackupItem::item_key(user_id, backup_id); let output = self .client .get_item() .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { - error!("DynamoDB client failed to find backup item"); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to find backup item" + ); Error::AwsSdk(e.into()) })?; let GetItemOutput { item: Some(item), .. } = output else { return Ok(None); }; let backup_item = item.try_into()?; Ok(Some(backup_item)) } pub async fn find_last_backup_item( &self, user_id: &str, ) -> Result, Error> { let mut found_backups = self.query_ordered_backups_index(user_id, Some(1)).await?; let latest_backup = found_backups.pop(); Ok(latest_backup) } pub async fn remove_backup_item( &self, user_id: &str, backup_id: &str, blob_client: &BlobServiceClient, ) -> Result, Error> { let item_key = BackupItem::item_key(user_id, backup_id); let response = self .client .delete_item() .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .return_values(ReturnValue::AllOld) .send() .await .map_err(|e| { - error!("DynamoDB client failed to remove backup item"); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to remove backup item" + ); Error::AwsSdk(e.into()) })?; let result = response .attributes .map(BackupItem::try_from) .transpose() .map_err(Error::from)?; if let Some(backup_item) = &result { backup_item.revoke_user_keys_holders(blob_client); backup_item.revoke_user_data_holders(blob_client); } self .remove_log_items_for_backup(user_id, backup_id, blob_client) .await?; Ok(result) } /// For the purposes of the initial backup version this function /// removes all backups except for the latest one pub async fn remove_old_backups( &self, user_id: &str, blob_client: &BlobServiceClient, ) -> Result, Error> { let items = self.query_ordered_backups_index(user_id, None).await?; let mut removed_backups = vec![]; let Some(latest) = items.iter().map(|item| item.created).max() else { return Ok(removed_backups); }; for item in items { if item.created == latest { trace!( "Skipping removal of the latest backup item: {}", item.backup_id ); continue; } trace!("Removing backup item: {item:?}"); if let Some(backup) = self .remove_backup_item(user_id, &item.backup_id, blob_client) .await? { removed_backups.push(backup); } else { warn!("Backup was found during query, but wasn't found when deleting") }; } Ok(removed_backups) } } /// Backup log functions impl DatabaseClient { pub async fn put_log_item( &self, log_item: LogItem, blob_client: &BlobServiceClient, ) -> Result<(), Error> { let item = log_item.into(); let result = self .client .put_item() .table_name(log_table::TABLE_NAME) .set_item(Some(item)) .return_values(ReturnValue::AllOld) .send() .await .map_err(|e| { - error!("DynamoDB client failed to put log item"); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to put log item" + ); Error::AwsSdk(e.into()) })?; let Some(replaced_log_attrs) = result.attributes else { return Ok(()); }; let Ok(replaced_log) = LogItem::try_from(replaced_log_attrs) else { warn!("Couldn't parse replaced log item"); return Ok(()); }; replaced_log.revoke_holders(blob_client); Ok(()) } pub async fn fetch_log_items( &self, user_id: &str, backup_id: &str, from_id: Option, ) -> Result<(Vec, Option), Error> { let id = LogItem::partition_key(user_id, backup_id); let mut query = self .client .query() .table_name(log_table::TABLE_NAME) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", log_table::attr::BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(id.clone()), ) .limit(LOG_DEFAULT_PAGE_SIZE); if let Some(from_id) = from_id { query = query .exclusive_start_key(log_table::attr::BACKUP_ID, AttributeValue::S(id)) .exclusive_start_key( log_table::attr::LOG_ID, AttributeValue::N(from_id.to_string()), ); } let response = query.send().await.map_err(|e| { - error!("DynamoDB client failed to fetch logs"); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to fetch logs" + ); Error::AwsSdk(e.into()) })?; let last_id = response .last_evaluated_key() .map(|key| { parse_int_attribute( log_table::attr::LOG_ID, key.get(log_table::attr::LOG_ID).cloned(), ) }) .transpose()?; let items = response .items .unwrap_or_default() .into_iter() .map(LogItem::try_from) .collect::, _>>()?; Ok((items, last_id)) } pub async fn remove_log_items_for_backup( &self, user_id: &str, backup_id: &str, blob_client: &BlobServiceClient, ) -> Result<(), Error> { let (mut items, mut last_id) = self.fetch_log_items(user_id, backup_id, None).await?; while last_id.is_some() { let (mut new_items, new_last_id) = self.fetch_log_items(user_id, backup_id, last_id).await?; items.append(&mut new_items); last_id = new_last_id; } for log_item in &items { log_item.revoke_holders(blob_client); } let write_requests = items .into_iter() .map(|key| { DeleteRequest::builder() .set_key(Some(LogItem::item_key(user_id, key.backup_id, key.log_id))) .build() .expect("key not set in DeleteRequest builder") }) .map(|request| WriteRequest::builder().delete_request(request).build()) .collect::>(); database::batch_operations::batch_write( &self.client, log_table::TABLE_NAME, write_requests, ExponentialBackoffConfig::default(), ) .await?; Ok(()) } } // general functions impl DatabaseClient { pub async fn delete_user_data( &self, user_id: &str, blob_client: &BlobServiceClient, ) -> Result<(), Error> { // query the index to avoid unnecessarily querying backup data let items = self.query_ordered_backups_index(user_id, None).await?; for item in items { trace!("Removing backup item: {item:?}"); self .remove_backup_item(user_id, &item.backup_id, blob_client) .await?; } Ok(()) } async fn query_ordered_backups_index( &self, user_id: &str, limit: Option, ) -> Result, Error> { let response = self .client .query() .table_name(backup_table::TABLE_NAME) .index_name(backup_table::CREATED_INDEX) .key_condition_expression("#userID = :valueToMatch") .expression_attribute_names("#userID", backup_table::attr::USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .scan_index_forward(false) .set_limit(limit) .send() .await .map_err(|e| { - error!("DynamoDB client failed to fetch backups"); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to fetch backups" + ); Error::AwsSdk(e.into()) })?; if response.last_evaluated_key().is_some() { // In the intial version of the backup service this function will be run // for every new backup (each user only has one backup), so this shouldn't // happen warn!("Not all backups have been retrieved from the index"); } let items = response .items .unwrap_or_default() .into_iter() .map(OrderedBackupItem::try_from) .collect::, _>>()?; Ok(items) } } diff --git a/services/backup/src/error.rs b/services/backup/src/error.rs index 8783cb3a1..fce24055d 100644 --- a/services/backup/src/error.rs +++ b/services/backup/src/error.rs @@ -1,100 +1,111 @@ use actix_web::{ error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, HttpError, }, HttpResponse, ResponseError, }; pub use aws_sdk_dynamodb::Error as DynamoDBError; use comm_lib::database::Error as DBError; use comm_lib::{auth::AuthServiceError, blob::client::BlobServiceError}; use grpc_clients::error::Error as IdentityClientError; use reqwest::StatusCode; use tracing::{error, trace, warn}; +use crate::constants::error_types; + #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BackupError { NoBackup, NoUserID, BlobError(BlobServiceError), AuthError(AuthServiceError), DB(comm_lib::database::Error), IdentityClientError(IdentityClientError), BadRequest, NoUserData, } impl From<&BackupError> for actix_web::Error { fn from(value: &BackupError) -> Self { trace!("Handling backup service error: {value}"); match value { BackupError::NoBackup => ErrorNotFound("not found"), BackupError::BlobError( err @ (BlobServiceError::ClientError(_) | BlobServiceError::UnexpectedHttpStatus(_) | BlobServiceError::ServerError | BlobServiceError::UnexpectedError), ) => { warn!("Transient blob error occurred: {err}"); ErrorServiceUnavailable("please retry") } BackupError::BlobError(BlobServiceError::AlreadyExists) => { ErrorConflict("blob already exists") } BackupError::BlobError(BlobServiceError::InvalidArguments) => { ErrorBadRequest("bad request") } BackupError::BlobError( err @ (BlobServiceError::URLError(_) | BlobServiceError::NotFound), ) => { - error!("Unexpected blob error: {err}"); + error!( + errorType = error_types::BLOB_ERROR, + "Unexpected blob error: {err}" + ); ErrorInternalServerError("server error") } BackupError::AuthError(err) => { - error!("Unexpected auth error: {err}"); + error!( + errorType = error_types::AUTH_ERROR, + "Unexpected auth error: {err}" + ); ErrorInternalServerError("server error") } BackupError::DB(err) => match err { DBError::AwsSdk( err @ (DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_)), ) => { warn!("AWS transient error occurred: {err}"); ErrorServiceUnavailable("please retry") } unexpected => { - error!("Received an unexpected DB error: {0:?} - {0}", unexpected); + error!( + errorType = error_types::DDB_ERROR, + "Received an unexpected DB error: {0:?} - {0}", unexpected + ); ErrorInternalServerError("server error") } }, BackupError::IdentityClientError(err) => { warn!("Transient identity error occurred: {err}"); ErrorServiceUnavailable("please retry") } BackupError::NoUserID => ErrorBadRequest("bad request"), BackupError::BadRequest => ErrorBadRequest("bad request"), BackupError::NoUserData => ErrorNotFound("not found"), } } } impl From for HttpError { fn from(value: BackupError) -> Self { value.into() } } impl ResponseError for BackupError { fn error_response(&self) -> HttpResponse { actix_web::Error::from(self).error_response() } fn status_code(&self) -> StatusCode { actix_web::Error::from(self) .as_response_error() .status_code() } } diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs index 043f7db92..53d88ad5d 100644 --- a/services/backup/src/http/handlers/log.rs +++ b/services/backup/src/http/handlers/log.rs @@ -1,372 +1,373 @@ -use crate::constants::WS_FRAME_SIZE; +use crate::constants::{error_types, WS_FRAME_SIZE}; use crate::database::{log_item::LogItem, DatabaseClient}; use actix::fut::ready; use actix::{Actor, ActorContext, ActorFutureExt, AsyncContext, StreamHandler}; use actix_http::ws::{CloseCode, Item}; use actix_web::{ web::{self, Bytes, BytesMut}, Error, HttpRequest, HttpResponse, }; use actix_web_actors::ws::{self, WebsocketContext}; use comm_lib::auth::{AuthService, AuthServiceError, UserIdentity}; use comm_lib::{ backup::{ DownloadLogsRequest, LogWSRequest, LogWSResponse, UploadLogRequest, }, blob::{ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, }, database::{self, blob::BlobOrDBContent}, }; use std::future::Future; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tracing::{error, info, instrument, warn}; pub async fn handle_ws( req: HttpRequest, stream: web::Payload, blob_client: web::Data, db_client: web::Data, auth_service: AuthService, ) -> Result { ws::WsResponseBuilder::new( LogWSActor { user: Arc::new(Mutex::new(None)), blob_client: blob_client.as_ref().clone(), db_client: db_client.as_ref().clone(), auth_service, last_msg_time: Instant::now(), buffer: BytesMut::new(), }, &req, stream, ) .frame_size(WS_FRAME_SIZE) .start() } #[derive( Debug, derive_more::From, derive_more::Display, derive_more::Error, )] enum LogWSError { Bincode(bincode::Error), Blob(BlobServiceError), DB(database::Error), Auth(AuthServiceError), } struct LogWSActor { user: Arc>>, blob_client: BlobServiceClient, db_client: DatabaseClient, auth_service: AuthService, last_msg_time: Instant, buffer: BytesMut, } impl LogWSActor { const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); fn handle_msg_sync( &mut self, ctx: &mut WebsocketContext, bytes: Bytes, ) { match bincode::deserialize(&bytes) { Ok(request) => { if let LogWSRequest::Authenticate(user) = request { Self::spawn_response_future( ctx, Self::handle_auth_msg( self.auth_service.clone(), Arc::clone(&self.user), user, ), ); return; } let user_guard = self.user.lock().expect("user mutex poisoned"); let Some(user) = user_guard.as_ref() else { Self::spawn_response_future( ctx, ready(Ok(vec![LogWSResponse::Unauthenticated])), ); return; }; Self::spawn_response_future( ctx, Self::handle_msg( user.user_id.clone(), self.blob_client.clone().with_user_identity(user.clone()), self.db_client.clone(), request, ), ); } Err(err) => { - error!("Error: {err:?}"); + error!(errorType = error_types::WS_ERROR, "Error: {err:?}"); Self::spawn_response_future( ctx, ready(Ok(vec![LogWSResponse::ServerError])), ); } }; } fn spawn_response_future( ctx: &mut WebsocketContext, future: impl Future, LogWSError>> + 'static, ) { let fut = actix::fut::wrap_future(future).map( |responses, _: &mut LogWSActor, ctx: &mut WebsocketContext| { let responses = match responses { Ok(responses) => responses, Err(err) => { - error!("Error: {err:?}"); + error!(errorType = error_types::WS_ERROR, "Error: {err:?}"); vec![LogWSResponse::ServerError] } }; for response in responses { match bincode::serialize(&response) { Ok(bytes) => ctx.binary(bytes), Err(error) => { error!( + errorType = error_types::WS_ERROR, "Error serializing a response: {response:?}. Error: {error}" ); } }; } }, ); ctx.spawn(fut); } async fn handle_auth_msg( auth_service: AuthService, current_user: Arc>>, user_to_verify: UserIdentity, ) -> Result, LogWSError> { use comm_lib::auth::AuthorizationCredential; let credential = AuthorizationCredential::UserToken(user_to_verify.clone()); let user_valid = auth_service.verify_auth_credential(&credential).await?; if user_valid { *current_user.lock().expect("mutex poisoned") = Some(user_to_verify); Ok(vec![LogWSResponse::AuthSuccess]) } else { tracing::debug!("Invalid credentials"); Ok(vec![LogWSResponse::Unauthenticated]) } } async fn handle_msg( user_id: String, blob_client: BlobServiceClient, db_client: DatabaseClient, request: LogWSRequest, ) -> Result, LogWSError> { match request { LogWSRequest::UploadLog(UploadLogRequest { backup_id, log_id, content, attachments, }) => { let mut attachment_blob_infos = Vec::new(); for attachment in attachments.unwrap_or_default() { let blob_info = Self::create_attachment(&blob_client, attachment).await?; attachment_blob_infos.push(blob_info); } let mut log_item = LogItem { user_id, backup_id: backup_id.clone(), log_id, content: BlobOrDBContent::new(content), attachments: attachment_blob_infos, }; log_item.ensure_size_constraints(&blob_client).await?; db_client.put_log_item(log_item, &blob_client).await?; Ok(vec![LogWSResponse::LogUploaded { backup_id, log_id }]) } LogWSRequest::DownloadLogs(DownloadLogsRequest { backup_id, from_id, }) => { let (log_items, last_id) = db_client .fetch_log_items(&user_id, &backup_id, from_id) .await?; let mut messages = vec![]; for LogItem { log_id, content, attachments, .. } in log_items { let content = content.fetch_bytes(&blob_client).await?; let attachments: Vec = attachments.into_iter().map(|att| att.blob_hash).collect(); let attachments = if attachments.is_empty() { None } else { Some(attachments) }; messages.push(LogWSResponse::LogDownload { log_id, content, attachments, }) } messages.push(LogWSResponse::LogDownloadFinished { last_log_id: last_id, }); Ok(messages) } LogWSRequest::Authenticate(_) => { warn!("LogWSRequest::Authenticate should have been handled earlier."); Ok(Vec::new()) } } } async fn create_attachment( blob_client: &BlobServiceClient, attachment: String, ) -> Result { let blob_info = BlobInfo { blob_hash: attachment, holder: uuid::Uuid::new_v4().to_string(), }; if !blob_client .assign_holder(&blob_info.blob_hash, &blob_info.holder) .await? { warn!( "Blob attachment with hash {:?} doesn't exist", blob_info.blob_hash ); } Ok(blob_info) } } impl Actor for LogWSActor { type Context = ws::WebsocketContext; #[instrument(skip_all)] fn started(&mut self, ctx: &mut Self::Context) { info!("Socket opened"); ctx.run_interval(Self::HEARTBEAT_INTERVAL, |actor, ctx| { if Instant::now().duration_since(actor.last_msg_time) > Self::CONNECTION_TIMEOUT { warn!("Socket timeout, closing connection"); ctx.stop(); return; } ctx.ping(&[]); }); } #[instrument(skip_all)] fn stopped(&mut self, _: &mut Self::Context) { info!("Socket closed"); } } impl StreamHandler> for LogWSActor { #[instrument(skip_all)] fn handle( &mut self, msg: Result, ctx: &mut Self::Context, ) { let msg = match msg { Ok(msg) => msg, Err(err) => { warn!("Error during socket message handling: {err}"); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } }; self.last_msg_time = Instant::now(); match msg { ws::Message::Binary(bytes) => self.handle_msg_sync(ctx, bytes), // Continuations - this is mostly boilerplate code. Some websocket // clients may split a message into these ones ws::Message::Continuation(Item::FirstBinary(bytes)) => { if !self.buffer.is_empty() { warn!("Socket received continuation before previous was completed"); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } self.buffer.extend_from_slice(&bytes); } ws::Message::Continuation(Item::Continue(bytes)) => { if self.buffer.is_empty() { warn!("Socket received continuation message before it was started"); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } self.buffer.extend_from_slice(&bytes); } ws::Message::Continuation(Item::Last(bytes)) => { if self.buffer.is_empty() { warn!( "Socket received last continuation message before it was started" ); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } self.buffer.extend_from_slice(&bytes); let bytes = self.buffer.split(); self.handle_msg_sync(ctx, bytes.into()); } // Heartbeat ws::Message::Ping(message) => ctx.pong(&message), ws::Message::Pong(_) => (), // Other ws::Message::Text(_) | ws::Message::Continuation(Item::FirstText(_)) => { warn!("Socket received unsupported message"); ctx.close(Some(CloseCode::Unsupported.into())); ctx.stop(); } ws::Message::Close(reason) => { info!("Socket was closed"); ctx.close(reason); ctx.stop(); } ws::Message::Nop => (), } } }