Page MenuHomePhabricator

D6196.diff
No OneTemporary

D6196.diff

diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/src/service/handlers/create_backup.rs
@@ -0,0 +1,87 @@
+use tonic::Status;
+
+use crate::{blob::PutClient, database::DatabaseClient, service::proto};
+
+type CreateBackupResult = Result<proto::CreateNewBackupResponse, Status>;
+
+enum HandlerState {
+ /// Initial state. Handler is receiving non-data inputs
+ ReceivingParams,
+ /// Handler is receiving data chunks
+ ReceivingData { blob_client: PutClient },
+ /// A special case when Blob service claims that a blob with given
+ /// [`CreateBackupHandler::data_hash`] already exists
+ DataAlreadyExists,
+}
+
+pub struct CreateBackupHandler {
+ // flow control
+ pub should_close_stream: bool,
+
+ // inputs
+ user_id: Option<String>,
+ device_id: Option<String>,
+ key_entropy: Option<Vec<u8>>,
+ data_hash: Option<String>,
+
+ // client instances
+ db: DatabaseClient,
+
+ // internal state
+ state: HandlerState,
+ backup_id: String,
+ holder: Option<String>,
+}
+
+impl CreateBackupHandler {
+ pub fn new(db: &DatabaseClient) -> Self {
+ CreateBackupHandler {
+ should_close_stream: false,
+ user_id: None,
+ device_id: None,
+ key_entropy: None,
+ data_hash: None,
+ db: db.clone(),
+ state: HandlerState::ReceivingParams,
+ backup_id: String::new(),
+ holder: None,
+ }
+ }
+
+ pub async fn handle_user_id(
+ &mut self,
+ user_id: String,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+ pub async fn handle_device_id(
+ &mut self,
+ device_id: String,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+ pub async fn handle_key_entropy(
+ &mut self,
+ key_entropy: Vec<u8>,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+ pub async fn handle_data_hash(
+ &mut self,
+ data_hash: Vec<u8>,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+
+ pub async fn handle_data_chunk(
+ &mut self,
+ data_chunk: Vec<u8>,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+
+ /// This function should be called after the input stream is finished.
+ pub async fn finish(self) -> Result<(), Status> {
+ unimplemented!()
+ }
+}
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
--- a/services/backup/src/service/mod.rs
+++ b/services/backup/src/service/mod.rs
@@ -1,16 +1,28 @@
use proto::backup_service_server::BackupService;
use std::pin::Pin;
-use tokio_stream::Stream;
+use tokio::sync::mpsc;
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
-use tracing::instrument;
+use tracing::{debug, error, info, instrument, trace, Instrument};
-use crate::database::DatabaseClient;
+use crate::{
+ constants::MPSC_CHANNEL_BUFFER_CAPACITY, database::DatabaseClient,
+};
mod proto {
tonic::include_proto!("backup");
}
pub use proto::backup_service_server::BackupServiceServer;
+/// submodule containing gRPC endpoint handler implementations
+mod handlers {
+ pub(super) mod create_backup;
+
+ // re-exports for convenient usage in handlers
+ pub(self) use super::proto;
+}
+use self::handlers::create_backup::CreateBackupHandler;
+
pub struct MyBackupService {
db: DatabaseClient,
}
@@ -30,12 +42,64 @@
>,
>;
- #[instrument(skip(self))]
+ #[instrument(skip_all, fields(device_id, data_hash, backup_id, blob_holder))]
async fn create_new_backup(
&self,
- _request: Request<tonic::Streaming<proto::CreateNewBackupRequest>>,
+ request: Request<tonic::Streaming<proto::CreateNewBackupRequest>>,
) -> Result<Response<Self::CreateNewBackupStream>, Status> {
- Err(Status::unimplemented("unimplemented"))
+ use proto::create_new_backup_request::Data::*;
+
+ info!("CreateNewBackup request: {:?}", request);
+ let mut in_stream = request.into_inner();
+ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let db = self.db.clone();
+ let worker = async move {
+ let mut handler = CreateBackupHandler::new(&db);
+ while let Some(message) = in_stream.next().await {
+ let response = match message {
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(UserId(user_id)),
+ }) => handler.handle_user_id(user_id).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(DeviceId(device_id)),
+ }) => handler.handle_device_id(device_id).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(KeyEntropy(key_entropy)),
+ }) => handler.handle_key_entropy(key_entropy).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(NewCompactionHash(hash)),
+ }) => handler.handle_data_hash(hash).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(NewCompactionChunk(chunk)),
+ }) => handler.handle_data_chunk(chunk).await,
+ unexpected => {
+ error!("Received an unexpected request: {:?}", unexpected);
+ Err(Status::unknown("unknown error"))
+ }
+ };
+
+ trace!("Sending response: {:?}", response);
+ if let Err(e) = tx.send(response).await {
+ error!("Response was dropped: {}", e);
+ break;
+ }
+ if handler.should_close_stream {
+ trace!("Handler requested to close stream");
+ break;
+ }
+ }
+ if let Err(status) = handler.finish().await {
+ trace!("Sending error response: {:?}", status);
+ let _ = tx.send(Err(status)).await;
+ }
+ debug!("Request finished processing");
+ };
+ tokio::spawn(worker.in_current_span());
+
+ let out_stream = ReceiverStream::new(rx);
+ Ok(Response::new(
+ Box::pin(out_stream) as Self::CreateNewBackupStream
+ ))
}
#[instrument(skip(self))]

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 7:52 PM (21 h, 9 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2678672
Default Alt Text
D6196.diff (5 KB)

Event Timeline