Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3491639
D6196.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D6196.diff
View Options
diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/src/service/handlers/create_backup.rs
@@ -0,0 +1,87 @@
+use tonic::Status;
+
+use crate::{blob::PutClient, database::DatabaseClient, service::proto};
+
+type CreateBackupResult = Result<proto::CreateNewBackupResponse, Status>;
+
+enum HandlerState {
+ /// Initial state. Handler is receiving non-data inputs
+ ReceivingParams,
+ /// Handler is receiving data chunks
+ ReceivingData { blob_client: PutClient },
+ /// A special case when Blob service claims that a blob with given
+ /// [`CreateBackupHandler::data_hash`] already exists
+ DataAlreadyExists,
+}
+
+pub struct CreateBackupHandler {
+ // flow control
+ pub should_close_stream: bool,
+
+ // inputs
+ user_id: Option<String>,
+ device_id: Option<String>,
+ key_entropy: Option<Vec<u8>>,
+ data_hash: Option<String>,
+
+ // client instances
+ db: DatabaseClient,
+
+ // internal state
+ state: HandlerState,
+ backup_id: String,
+ holder: Option<String>,
+}
+
+impl CreateBackupHandler {
+ pub fn new(db: &DatabaseClient) -> Self {
+ CreateBackupHandler {
+ should_close_stream: false,
+ user_id: None,
+ device_id: None,
+ key_entropy: None,
+ data_hash: None,
+ db: db.clone(),
+ state: HandlerState::ReceivingParams,
+ backup_id: String::new(),
+ holder: None,
+ }
+ }
+
+ pub async fn handle_user_id(
+ &mut self,
+ user_id: String,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+ pub async fn handle_device_id(
+ &mut self,
+ device_id: String,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+ pub async fn handle_key_entropy(
+ &mut self,
+ key_entropy: Vec<u8>,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+ pub async fn handle_data_hash(
+ &mut self,
+ data_hash: Vec<u8>,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+
+ pub async fn handle_data_chunk(
+ &mut self,
+ data_chunk: Vec<u8>,
+ ) -> CreateBackupResult {
+ unimplemented!()
+ }
+
+ /// This function should be called after the input stream is finished.
+ pub async fn finish(self) -> Result<(), Status> {
+ unimplemented!()
+ }
+}
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
--- a/services/backup/src/service/mod.rs
+++ b/services/backup/src/service/mod.rs
@@ -1,16 +1,28 @@
use proto::backup_service_server::BackupService;
use std::pin::Pin;
-use tokio_stream::Stream;
+use tokio::sync::mpsc;
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
-use tracing::instrument;
+use tracing::{debug, error, info, instrument, trace, Instrument};
-use crate::database::DatabaseClient;
+use crate::{
+ constants::MPSC_CHANNEL_BUFFER_CAPACITY, database::DatabaseClient,
+};
mod proto {
tonic::include_proto!("backup");
}
pub use proto::backup_service_server::BackupServiceServer;
+/// submodule containing gRPC endpoint handler implementations
+mod handlers {
+ pub(super) mod create_backup;
+
+ // re-exports for convenient usage in handlers
+ pub(self) use super::proto;
+}
+use self::handlers::create_backup::CreateBackupHandler;
+
pub struct MyBackupService {
db: DatabaseClient,
}
@@ -30,12 +42,64 @@
>,
>;
- #[instrument(skip(self))]
+ #[instrument(skip_all, fields(device_id, data_hash, backup_id, blob_holder))]
async fn create_new_backup(
&self,
- _request: Request<tonic::Streaming<proto::CreateNewBackupRequest>>,
+ request: Request<tonic::Streaming<proto::CreateNewBackupRequest>>,
) -> Result<Response<Self::CreateNewBackupStream>, Status> {
- Err(Status::unimplemented("unimplemented"))
+ use proto::create_new_backup_request::Data::*;
+
+ info!("CreateNewBackup request: {:?}", request);
+ let mut in_stream = request.into_inner();
+ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let db = self.db.clone();
+ let worker = async move {
+ let mut handler = CreateBackupHandler::new(&db);
+ while let Some(message) = in_stream.next().await {
+ let response = match message {
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(UserId(user_id)),
+ }) => handler.handle_user_id(user_id).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(DeviceId(device_id)),
+ }) => handler.handle_device_id(device_id).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(KeyEntropy(key_entropy)),
+ }) => handler.handle_key_entropy(key_entropy).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(NewCompactionHash(hash)),
+ }) => handler.handle_data_hash(hash).await,
+ Ok(proto::CreateNewBackupRequest {
+ data: Some(NewCompactionChunk(chunk)),
+ }) => handler.handle_data_chunk(chunk).await,
+ unexpected => {
+ error!("Received an unexpected request: {:?}", unexpected);
+ Err(Status::unknown("unknown error"))
+ }
+ };
+
+ trace!("Sending response: {:?}", response);
+ if let Err(e) = tx.send(response).await {
+ error!("Response was dropped: {}", e);
+ break;
+ }
+ if handler.should_close_stream {
+ trace!("Handler requested to close stream");
+ break;
+ }
+ }
+ if let Err(status) = handler.finish().await {
+ trace!("Sending error response: {:?}", status);
+ let _ = tx.send(Err(status)).await;
+ }
+ debug!("Request finished processing");
+ };
+ tokio::spawn(worker.in_current_span());
+
+ let out_stream = ReceiverStream::new(rx);
+ Ok(Response::new(
+ Box::pin(out_stream) as Self::CreateNewBackupStream
+ ))
}
#[instrument(skip(self))]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 7:52 PM (21 h, 9 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2678672
Default Alt Text
D6196.diff (5 KB)
Attached To
Mode
D6196: [services][backup] CreateBackup 1/3 - create handler module
Attached
Detach File
Event Timeline
Log In to Comment