diff --git a/services/backup/build.rs b/services/backup/build.rs index 3edbbb5a2..4dcab23fd 100644 --- a/services/backup/build.rs +++ b/services/backup/build.rs @@ -1,7 +1,10 @@ fn main() { println!("cargo:rerun-if-changed=src/main.rs"); println!("cargo:rerun-if-changed=../../shared/protos/backup.proto"); + println!("cargo:rerun-if-changed=../../shared/protos/blob.proto"); tonic_build::compile_protos("../../shared/protos/backup.proto") - .expect("Failed to compile protobuf file"); + .expect("Failed to compile Backup protobuf file"); + tonic_build::compile_protos("../../shared/protos/blob.proto") + .expect("Failed to compile Blob protobuf file"); } diff --git a/services/backup/src/blob/get_client.rs b/services/backup/src/blob/get_client.rs new file mode 100644 index 000000000..6e25f166f --- /dev/null +++ b/services/backup/src/blob/get_client.rs @@ -0,0 +1,38 @@ +use anyhow::Result; +use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use tracing::instrument; + +use super::proto; +pub use proto::put_request::Data as PutRequestData; +pub use proto::{PutRequest, PutResponse}; + +pub struct GetClient { + rx: Receiver>, + handle: JoinHandle>, +} + +impl GetClient { + /// Connects to the Blob service and keeps the client connection open + /// in a separate Tokio task. + #[instrument(name = "get_client")] + pub async fn start(holder: String) -> Result { + todo!() + } + + /// Receives the next chunk of blob data if ready or sleeps + /// until the data is available. + /// + /// Returns `None` when the transmission is finished, but this doesn't + /// determine if it was successful. After receiving `None`, the client + /// should be consumed by calling [`GetClient::terminate`] to handle + /// possible errors. + pub async fn get(&mut self) -> Option> { + todo!() + } + + /// Stops receiving messages and awaits the client thread to exit + /// and returns its status. + pub async fn terminate(mut self) -> Result<()> { + todo!() + } +} diff --git a/services/backup/src/blob/mod.rs b/services/backup/src/blob/mod.rs new file mode 100644 index 000000000..f20dca0e2 --- /dev/null +++ b/services/backup/src/blob/mod.rs @@ -0,0 +1,10 @@ +mod proto { + tonic::include_proto!("blob"); +} +pub use proto::put_request::Data as PutRequestData; +pub use proto::{PutRequest, PutResponse}; + +mod get_client; +mod put_client; +pub use get_client::*; +pub use put_client::*; diff --git a/services/backup/src/blob/put_client.rs b/services/backup/src/blob/put_client.rs new file mode 100644 index 000000000..daa54d9fb --- /dev/null +++ b/services/backup/src/blob/put_client.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use tokio::{ + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, +}; +use tracing::instrument; + +use super::proto; +pub use proto::put_request::Data as PutRequestData; +pub use proto::{PutRequest, PutResponse}; + +pub struct PutClient { + req_tx: Sender, + res_rx: Receiver, + handle: JoinHandle>, +} + +impl PutClient { + /// Connects to the Blob service and keeps the client connection open + /// in a separate Tokio task. + #[instrument(name = "put_client")] + pub async fn start() -> Result { + todo!() + } + + /// Sends a [`PutRequest`] to the stream and waits for blob service + /// to send a response. After all data is sent, the [`PutClient::terminate`] + /// should be called to end the transmission and handle possible errors. + pub async fn put(&mut self, req: PutRequest) -> Result { + todo!() + } + + /// Closes the connection and awaits the blob client task to finish. + pub async fn terminate(self) -> Result<()> { + todo!() + } +} diff --git a/services/backup/src/main.rs b/services/backup/src/main.rs index 3c3a62658..a397a3a6c 100644 --- a/services/backup/src/main.rs +++ b/services/backup/src/main.rs @@ -1,46 +1,47 @@ use anyhow::Result; use std::net::SocketAddr; use tonic::transport::Server; use tracing::{info, Level}; use tracing_subscriber::EnvFilter; use crate::service::{BackupServiceServer, MyBackupService}; +pub mod blob; pub mod config; pub mod constants; pub mod service; // re-export this to be available as crate::CONFIG pub use config::CONFIG; fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } async fn run_grpc_server() -> Result<()> { let addr: SocketAddr = format!("[::]:{}", CONFIG.listening_port).parse()?; let backup_service = MyBackupService::default(); info!("Starting gRPC server listening at {}", addr.to_string()); Server::builder() .add_service(BackupServiceServer::new(backup_service)) .serve(addr) .await?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { config::parse_cmdline_args(); configure_logging()?; run_grpc_server().await }