diff --git a/services/backup/src/config.rs b/services/backup/src/config.rs index 3294aec07..2f1f0b9ca 100644 --- a/services/backup/src/config.rs +++ b/services/backup/src/config.rs @@ -1,38 +1,59 @@ +use aws_sdk_dynamodb::{Endpoint, Region}; use clap::{builder::FalseyValueParser, Parser}; use once_cell::sync::Lazy; +use tonic::transport::Uri; +use tracing::info; use crate::constants::{ - DEFAULT_BLOB_SERVICE_URL, DEFAULT_GRPC_SERVER_PORT, DEFAULT_LOCALSTACK_URL, - SANDBOX_ENV_VAR, + AWS_REGION, DEFAULT_BLOB_SERVICE_URL, DEFAULT_GRPC_SERVER_PORT, + DEFAULT_LOCALSTACK_URL, SANDBOX_ENV_VAR, }; #[derive(Parser)] #[command(version, about, long_about = None)] pub struct AppConfig { /// gRPC server listening port #[arg(long = "port", default_value_t = DEFAULT_GRPC_SERVER_PORT)] pub listening_port: u64, /// Run the service in sandbox #[arg(long = "sandbox", default_value_t = false)] // support the env var for compatibility reasons #[arg(env = SANDBOX_ENV_VAR)] #[arg(value_parser = FalseyValueParser::new())] pub is_sandbox: bool, /// AWS Localstack service URL, applicable in sandbox mode #[arg(long, default_value_t = DEFAULT_LOCALSTACK_URL.to_string())] pub localstack_url: String, /// Blob service URL #[arg(long, default_value_t = DEFAULT_BLOB_SERVICE_URL.to_string())] pub blob_service_url: String, } /// Stores configuration parsed from command-line arguments /// and environment variables pub static CONFIG: Lazy = Lazy::new(|| AppConfig::parse()); /// Processes the command-line arguments and environment variables. /// Should be called at the beginning of the `main()` function. pub(super) fn parse_cmdline_args() { // force evaluation of the lazy initialized config Lazy::force(&CONFIG); } + +/// Provides region/credentials configuration for AWS SDKs +pub async fn load_aws_config() -> aws_types::SdkConfig { + let mut config_builder = + aws_config::from_env().region(Region::new(AWS_REGION)); + + if CONFIG.is_sandbox { + info!( + "Running in sandbox environment. Localstack URL: {}", + &CONFIG.localstack_url + ); + config_builder = config_builder.endpoint_resolver(Endpoint::immutable( + Uri::from_static(&CONFIG.localstack_url), + )); + } + + config_builder.load().await +} diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs index fd04940a5..0ac0e36c1 100644 --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,34 +1,35 @@ // Assorted constants +pub const AWS_REGION: &str = "us-east-2"; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; // Configuration defaults pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051; pub const DEFAULT_LOCALSTACK_URL: &str = "http://localhost:4566"; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // Environment variable names pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // DynamoDB constants pub const BACKUP_TABLE_NAME: &str = "backup-service-backup"; pub const BACKUP_TABLE_FIELD_USER_ID: &str = "userID"; pub const BACKUP_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const BACKUP_TABLE_FIELD_CREATED: &str = "created"; pub const BACKUP_TABLE_FIELD_RECOVERY_DATA: &str = "recoveryData"; pub const BACKUP_TABLE_FIELD_COMPACTION_HOLDER: &str = "compactionHolder"; pub const BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const BACKUP_TABLE_INDEX_USERID_CREATED: &str = "userID-created-index"; pub const LOG_TABLE_NAME: &str = "backup-service-log"; pub const LOG_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const LOG_TABLE_FIELD_LOG_ID: &str = "logID"; pub const LOG_TABLE_FIELD_PERSISTED_IN_BLOB: &str = "persistedInBlob"; pub const LOG_TABLE_FIELD_VALUE: &str = "value"; pub const LOG_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const LOG_TABLE_FIELD_DATA_HASH: &str = "dataHash"; diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs index 80f8d7c11..19073ec26 100644 --- a/services/backup/src/main.rs +++ b/services/backup/src/main.rs @@ -1,48 +1,51 @@ use anyhow::Result; use std::net::SocketAddr; use tonic::transport::Server; use tracing::{info, Level}; use tracing_subscriber::EnvFilter; use crate::service::{BackupServiceServer, MyBackupService}; pub mod blob; pub mod config; pub mod constants; pub mod database; pub mod service; // re-export this to be available as crate::CONFIG pub use config::CONFIG; fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } -async fn run_grpc_server() -> Result<()> { +async fn run_grpc_server(db: database::DatabaseClient) -> Result<()> { let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?; - let backup_service = MyBackupService::default(); + let backup_service = MyBackupService::new(db); info!("Starting gRPC server listening at {}", addr.to_string()); Server::builder() .add_service(BackupServiceServer::new(backup_service)) .serve(addr) .await?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { config::parse_cmdline_args(); configure_logging()?; - run_grpc_server().await + let aws_config = config::load_aws_config().await; + let db = database::DatabaseClient::new(&aws_config); + + run_grpc_server(db).await } diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs index cc943bf01..e4e3af622 100644 --- a/services/backup/src/service/mod.rs +++ b/services/backup/src/service/mod.rs @@ -1,73 +1,82 @@ use proto::backup_service_server::BackupService; use std::pin::Pin; use tokio_stream::Stream; use tonic::{Request, Response, Status}; use tracing::instrument; +use crate::database::DatabaseClient; + mod proto { tonic::include_proto!("backup"); } pub use proto::backup_service_server::BackupServiceServer; -#[derive(Default)] -pub struct MyBackupService {} +pub struct MyBackupService { + db: DatabaseClient, +} + +impl MyBackupService { + pub fn new(db_client: DatabaseClient) -> Self { + MyBackupService { db: db_client } + } +} // gRPC implementation #[tonic::async_trait] impl BackupService for MyBackupService { type CreateNewBackupStream = Pin< Box< dyn Stream> + Send, >, >; #[instrument(skip(self))] async fn create_new_backup( &self, _request: Request>, ) -> Result, Status> { Err(Status::unimplemented("unimplemented")) } #[instrument(skip(self))] async fn send_log( &self, _request: Request>, ) -> Result, Status> { Err(Status::unimplemented("unimplemented")) } type RecoverBackupKeyStream = Pin< Box< dyn Stream> + Send, >, >; #[instrument(skip(self))] async fn recover_backup_key( &self, _request: Request>, ) -> Result, Status> { Err(Status::unimplemented("unimplemented")) } type PullBackupStream = Pin< Box> + Send>, >; #[instrument(skip(self))] async fn pull_backup( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("unimplemented")) } #[instrument(skip(self))] async fn add_attachments( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("unimplemented")) } }