diff --git a/services/backup/src/http/handlers/backup.rs b/services/backup/src/http/handlers/backup.rs index 1b8d5ad69..410f906b7 100644 --- a/services/backup/src/http/handlers/backup.rs +++ b/services/backup/src/http/handlers/backup.rs @@ -1,345 +1,339 @@ use actix_web::{ error::{ErrorBadRequest, ErrorInternalServerError}, web::{self, Bytes}, HttpRequest, HttpResponse, Responder, }; use comm_lib::{ auth::{AuthService, UserIdentity}, backup::LatestBackupIDResponse, blob::{client::BlobServiceClient, types::BlobInfo}, http::multipart::{get_named_text_field, get_text_field}, tools::Defer, }; use std::convert::Infallible; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tracing::{info, instrument, trace, warn}; use crate::{ database::{backup_item::BackupItem, DatabaseClient}, error::BackupError, }; #[instrument(skip_all, fields(backup_id))] pub async fn upload( user: UserIdentity, blob_client: web::Data, db_client: web::Data, mut multipart: actix_multipart::Multipart, ) -> actix_web::Result { let backup_id = get_named_text_field("backup_id", &mut multipart).await?; let blob_client = blob_client.with_user_identity(user.clone()); tracing::Span::current().record("backup_id", &backup_id); info!("Backup data upload started"); let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_keys_hash", "user_keys", ) .await?; let (user_data_blob_info, user_data_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_data_hash", "user_data", ) .await?; let attachments_hashes: Vec = match get_text_field(&mut multipart).await? { Some((name, attachments)) => { if name != "attachments" { warn!( name, "Malformed request: 'attachments' text field expected." ); return Err(ErrorBadRequest("Bad request")); } attachments.lines().map(ToString::to_string).collect() } None => Vec::new(), }; let mut attachments = Vec::new(); let mut attachments_revokes = Vec::new(); for attachment_hash in attachments_hashes { let (holder, revoke) = create_attachment_holder(&attachment_hash, &blob_client).await?; attachments.push(BlobInfo { blob_hash: attachment_hash, holder, }); attachments_revokes.push(revoke); } let siwe_backup_msg_option: Option = match get_text_field(&mut multipart).await? { Some((name, siwe_backup_msg)) => { if name == "siwe_backup_msg" { Some(siwe_backup_msg) } else { None } } _ => None, }; let item = BackupItem::new( user.user_id.clone(), backup_id, user_keys_blob_info, user_data_blob_info, attachments, siwe_backup_msg_option, ); db_client .put_backup_item(item) .await .map_err(BackupError::from)?; user_keys_revoke.cancel(); user_data_revoke.cancel(); for attachment_revoke in attachments_revokes { attachment_revoke.cancel(); } db_client .remove_old_backups(&user.user_id, &blob_client) .await .map_err(BackupError::from)?; Ok(HttpResponse::Ok().finish()) } #[instrument(skip_all, fields(hash_field_name, data_field_name))] async fn forward_field_to_blob<'revoke, 'blob: 'revoke>( multipart: &mut actix_multipart::Multipart, blob_client: &'blob BlobServiceClient, hash_field_name: &str, data_field_name: &str, ) -> actix_web::Result<(BlobInfo, Defer<'revoke>)> { trace!("Reading blob fields: {hash_field_name:?}, {data_field_name:?}"); let blob_hash = get_named_text_field(hash_field_name, multipart).await?; let Some(mut field) = multipart.try_next().await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request"))?; }; if field.name() != data_field_name { warn!( hash_field_name, "Malformed request: '{data_field_name}' data field expected." ); return Err(ErrorBadRequest("Bad request"))?; } let blob_info = BlobInfo { blob_hash, holder: uuid::Uuid::new_v4().to_string(), }; // [`actix_multipart::Multipart`] isn't [`std::marker::Send`], and so we cannot pass it to the blob client directly. // Instead we have to forward it to a channel and create stream from the receiver. let (tx, rx) = tokio::sync::mpsc::channel(1); let receive_promise = async move { trace!("Receiving blob data"); // [`actix_multipart::MultipartError`] isn't [`std::marker::Send`] so we return it here, and pass [`Infallible`] // as the error to the channel while let Some(chunk) = field.try_next().await? { if let Err(err) = tx.send(Result::::Ok(chunk)).await { warn!("Error when sending data through a channel: '{err}'"); // Error here means that the channel has been closed from the blob client side. We don't want to return an error // here, because `tokio::try_join!` only returns the first error it receives and we want to prioritize the backup // client error. break; } } trace!("Finished receiving blob data"); Result::<(), actix_web::Error>::Ok(()) }; let data_stream = ReceiverStream::new(rx); let send_promise = async { blob_client .simple_put(&blob_info.blob_hash, &blob_info.holder, data_stream) .await .map_err(BackupError::from)?; Ok(()) }; tokio::try_join!(receive_promise, send_promise)?; let revoke_info = blob_info.clone(); let revoke_holder = Defer::new(|| { blob_client .schedule_revoke_holder(revoke_info.blob_hash, revoke_info.holder) }); Ok((blob_info, revoke_holder)) } #[instrument(skip_all)] async fn create_attachment_holder<'revoke, 'blob: 'revoke>( attachment: &str, blob_client: &'blob BlobServiceClient, ) -> Result<(String, Defer<'revoke>), BackupError> { let holder = uuid::Uuid::new_v4().to_string(); if !blob_client .assign_holder(attachment, &holder) .await .map_err(BackupError::from)? { warn!("Blob attachment with hash {attachment:?} doesn't exist"); } let revoke_hash = attachment.to_string(); let revoke_holder = holder.clone(); let revoke_holder = Defer::new(|| { blob_client.schedule_revoke_holder(revoke_hash, revoke_holder) }); Ok((holder, revoke_holder)) } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_keys( user: UserIdentity, path: web::Path, blob_client: web::Data, db_client: web::Data, ) -> actix_web::Result { let blob_client = blob_client.with_user_identity(user.clone()); info!("Download user keys request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_keys, &user.user_id, &backup_id, blob_client, db_client, ) .await } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_data( user: UserIdentity, path: web::Path, blob_client: web::Data, db_client: web::Data, ) -> actix_web::Result { info!("Download user data request"); let blob_client = blob_client.with_user_identity(user.clone()); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_data, &user.user_id, &backup_id, blob_client, db_client, ) .await } pub async fn download_user_blob( data_extractor: impl FnOnce(&BackupItem) -> &BlobInfo, user_id: &str, backup_id: &str, blob_client: BlobServiceClient, db_client: web::Data, ) -> actix_web::Result { let backup_item = db_client .find_backup_item(user_id, backup_id) .await .map_err(BackupError::from)? .ok_or(BackupError::NoBackup)?; let stream = blob_client .get(&data_extractor(&backup_item).blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } #[instrument(skip_all, fields(username = %path))] pub async fn get_latest_backup_id( path: web::Path, db_client: web::Data, ) -> actix_web::Result { let username = path.into_inner(); // Treat username as user_id in the initial version let user_id = username; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let response = LatestBackupIDResponse { backup_id: backup_item.backup_id, siwe_backup_msg: backup_item.siwe_backup_msg, }; Ok(web::Json(response)) } #[instrument(skip_all, fields(username = %path))] pub async fn download_latest_backup_keys( path: web::Path, db_client: web::Data, blob_client: web::Data, - req: HttpRequest, + auth_service: AuthService, ) -> actix_web::Result { - let auth_service = req.app_data::().ok_or_else(|| { - tracing::error!( - "Failed to get AuthService from request. Check HTTP server config." - ); - ErrorInternalServerError("internal error") - })?; let services_token = auth_service .get_services_token() .await .map_err(BackupError::from)?; let blob_client = blob_client.with_authentication( comm_lib::auth::AuthorizationCredential::ServicesToken(services_token), ); let username = path.into_inner(); // Treat username as user_id in the initial version let user_id = username; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let stream = blob_client .get(&backup_item.user_keys.blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } diff --git a/services/reports/src/http/service.rs b/services/reports/src/http/service.rs index f0086ed7a..64e2ccafd 100644 --- a/services/reports/src/http/service.rs +++ b/services/reports/src/http/service.rs @@ -1,90 +1,82 @@ use actix_web::FromRequest; use comm_lib::auth::{ is_csat_verification_disabled, AuthService, AuthorizationCredential, }; use std::{future::Future, pin::Pin}; use tracing::{error, warn}; use crate::service::ReportsService; impl FromRequest for ReportsService { type Error = actix_web::Error; type Future = Pin>>>; #[inline] fn from_request( req: &actix_web::HttpRequest, payload: &mut actix_web::dev::Payload, ) -> Self::Future { use actix_web::error::{ErrorForbidden, ErrorInternalServerError}; let base_service = req.app_data::().cloned().ok_or_else(|| { tracing::error!( "FATAL! Failed to extract ReportsService from actix app_data. \ Check HTTP server configuration" ); ErrorInternalServerError("Internal server error") }); - let auth_service = - req.app_data::().cloned().ok_or_else(|| { - tracing::error!( - "FATAL! Failed to extract AuthService from actix app_data. \ - Check HTTP server configuration" - ); - ErrorInternalServerError("Internal server error") - }); - + let auth_service = AuthService::from_request(req, payload).into_inner(); let request_auth_value = - AuthorizationCredential::from_request(req, payload); + AuthorizationCredential::from_request(req, payload).into_inner(); Box::pin(async move { let auth_service = auth_service?; let base_service = base_service?; - let credential = request_auth_value.await.ok(); + let credential = request_auth_value.ok(); // This is Some if the request contains valid Authorization header let auth_token = match credential { Some(token @ AuthorizationCredential::UserToken(_)) => { let token_valid = auth_service .verify_auth_credential(&token) .await .map_err(|err| { error!("Failed to verify access token: {err}"); ErrorInternalServerError("Internal server error") })?; if token_valid || is_csat_verification_disabled() { token } else { warn!("Posting report with invalid credentials! Defaulting to ServicesToken..."); get_services_token_credential(&auth_service).await? } } Some(_) => { // Reports service shouldn't be called by other services warn!("Reports service requires user authorization"); return Err(ErrorForbidden("Forbidden")); } None => { // Unauthenticated requests get a service-to-service token get_services_token_credential(&auth_service).await? } }; let service = base_service.with_authentication(auth_token); Ok(service) }) } } async fn get_services_token_credential( auth_service: &AuthService, ) -> Result { let services_token = auth_service.get_services_token().await.map_err(|err| { error!("Failed to get services token: {err}"); actix_web::error::ErrorInternalServerError("Internal server error") })?; Ok(AuthorizationCredential::ServicesToken(services_token)) } diff --git a/shared/comm-lib/src/http/auth.rs b/shared/comm-lib/src/http/auth.rs index 22e7c5897..271e78d84 100644 --- a/shared/comm-lib/src/http/auth.rs +++ b/shared/comm-lib/src/http/auth.rs @@ -1,194 +1,215 @@ use actix_web::{ body::{EitherBody, MessageBody}, dev::{Service, ServiceRequest, ServiceResponse, Transform}, error::{ErrorForbidden, ErrorInternalServerError}, FromRequest, HttpMessage, }; use actix_web_httpauth::{ extractors::{bearer::BearerAuth, AuthenticationError}, headers::www_authenticate::bearer::Bearer, middleware::HttpAuthentication, }; use futures_util::future::{ready, Ready}; use http::StatusCode; use std::str::FromStr; use tracing::debug; use crate::auth::{ is_csat_verification_disabled, AuthService, AuthorizationCredential, UserIdentity, }; +impl FromRequest for AuthService { + type Error = actix_web::Error; + type Future = Ready>; + + fn from_request( + req: &actix_web::HttpRequest, + _: &mut actix_web::dev::Payload, + ) -> Self::Future { + let auth_service = + req.app_data::().cloned().ok_or_else(|| { + tracing::error!( + "FATAL! Failed to get AuthService from request for `{}` handler. + Check HTTP server config - make sure it's passed to App::app_data().", + req.match_name().unwrap_or_else(|| req.path()) + ); + ErrorInternalServerError("internal error") + }); + ready(auth_service) + } +} + impl FromRequest for AuthorizationCredential { type Error = actix_web::Error; type Future = Ready>; fn from_request( req: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload, ) -> Self::Future { if let Some(credential) = req.extensions().get::() { return ready(Ok(credential.clone())); } let f = || { let bearer = BearerAuth::extract(req).into_inner()?; let credential = match AuthorizationCredential::from_str(bearer.token()) { Ok(credential) => credential, Err(err) => { debug!("HTTP authorization error: {err}"); return Err(AuthenticationError::new(Bearer::default()).into()); } }; Ok(credential) }; ready(f()) } } impl FromRequest for UserIdentity { type Error = actix_web::Error; type Future = Ready>; fn from_request( req: &actix_web::HttpRequest, payload: &mut actix_web::dev::Payload, ) -> Self::Future { use futures_util::future::{err, ok}; match AuthorizationCredential::from_request(req, payload).into_inner() { Ok(AuthorizationCredential::UserToken(user)) => ok(user.clone()), Ok(_) => { debug!("Authorization provided, but it's not UserIdentity"); let mut error = AuthenticationError::new(Bearer::default()); *error.status_code_mut() = StatusCode::FORBIDDEN; err(error.into()) } Err(e) => err(e), } } } /// Counterpart of [`actix_web_httpauth::extractors::bearer::BearerAuth`] that /// handles parsing Authorization header into [`AuthorizationCredential`]. /// The value can be `None` when CSAT verification is disabled. #[derive(Clone, Debug)] struct CommServicesBearerAuth { credential: Option, } impl FromRequest for CommServicesBearerAuth { type Error = actix_web::Error; type Future = Ready>; fn from_request( req: &actix_web::HttpRequest, _payload: &mut actix_web::dev::Payload, ) -> Self::Future { use futures_util::future::{err, ok}; if is_csat_verification_disabled() { return ok(Self { credential: None }); } match AuthorizationCredential::extract(req).into_inner() { Ok(credential) => ok(Self { credential: Some(credential), }), Err(e) => err(e), } } } /// Function used by auth middleware to validate authenticated requests. async fn middleware_validation_function( req: ServiceRequest, auth: CommServicesBearerAuth, ) -> Result { let Some(credential) = auth.credential else { return if is_csat_verification_disabled() { Ok(req) } else { // This branch should be normally unreachable. If this happens, // it means that `MiddlewareCredentialExtractor::from_request()` // implementation is incorrect. tracing::error!( "CSAT verification enabled, but no credential was extracted!" ); let mut error = AuthenticationError::new(Bearer::default()); *error.status_code_mut() = StatusCode::INTERNAL_SERVER_ERROR; Err((error.into(), req)) }; }; let auth_service = req .app_data::() .expect("FATAL: missing AuthService app data. Check HTTP server config."); match auth_service.verify_auth_credential(&credential).await { Ok(true) => tracing::trace!("Request is authenticated with {credential}"), Ok(false) => { tracing::trace!("Request is not authenticated. Token: {credential:?}"); // allow for invalid tokens if verification is disabled if !is_csat_verification_disabled() { return Err((ErrorForbidden("invalid credentials"), req)); } } Err(err) => { tracing::error!("Error verifying auth credential: {err}"); return Err((ErrorInternalServerError("internal error"), req)); } }; req.extensions_mut().insert(credential); Ok(req) } /// Use this to add Authentication Middleware. It's going to parse Authorization /// header and call the identity service to check if the provided credentials /// are correct. If not it's going to reject the request. /// Note that this requires `AuthService` to be present in the app data. /// /// # Example /// ```ignore /// let auth_service = AuthService::new(&aws_config, &config.identity_endpoint); /// let auth_middleware = get_comm_authentication_middleware(); /// App::new() /// .app_data(auth_service.clone()) /// .wrap(auth_middleware) /// ``` /// If you don't want all of the routes to require authentication you can wrap /// individual resources or scopes: /// ```ignore /// App::new().service( /// web::resource("/endpoint").route(web::get().to(handler)).wrap(auth_middleware), /// ) /// ``` // This type is very complicated, but unfortunately typing this directly // requires https://github.com/rust-lang/rust/issues/99697 to be merged. // The issue is that we can't specify the second generic argument of // HttpAuthentication, because it look something like this: // ``` // impl Fn(ServiceRequest, BearerAuth) -> impl Future< // Output = Result, // > // `` // which isn't valid (until the linked issue is merged). pub fn get_comm_authentication_middleware() -> impl Transform< S, ServiceRequest, Response = ServiceResponse>, Error = actix_web::Error, InitError = (), > + Clone + 'static where B: MessageBody + 'static, S: Service< ServiceRequest, Response = ServiceResponse, Error = actix_web::Error, > + 'static, { HttpAuthentication::with_fn(middleware_validation_function) }