Page MenuHomePhabricator

D6211.id20752.diff
No OneTemporary

D6211.id20752.diff

diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/src/service/handlers/send_log.rs
@@ -0,0 +1,80 @@
+use tonic::Status;
+use uuid::Uuid;
+
+use crate::{
+ blob::PutClient, constants::ID_SEPARATOR, database::DatabaseClient,
+ service::proto::SendLogResponse,
+};
+
+enum LogPersistence {
+ /// Log entirely stored in DynamoDB database
+ DB,
+ /// Log contents stored with Blob service
+ BLOB { holder: String },
+}
+
+pub struct SendLogHandler {
+ // flow control
+ pub should_close_stream: bool,
+
+ // inputs
+ user_id: Option<String>,
+ backup_id: Option<String>,
+ log_hash: Option<String>,
+
+ // internal state
+ log_id: Option<String>,
+ log_buffer: Vec<u8>,
+ persistence_method: LogPersistence,
+ should_receive_data: bool,
+
+ // client instances
+ db: DatabaseClient,
+ blob_client: Option<PutClient>,
+}
+
+impl SendLogHandler {
+ pub fn new(db: &DatabaseClient) -> Self {
+ SendLogHandler {
+ db: db.clone(),
+ blob_client: None,
+ user_id: None,
+ backup_id: None,
+ log_hash: None,
+ log_id: None,
+ log_buffer: Vec::new(),
+ persistence_method: LogPersistence::DB,
+ should_receive_data: false,
+ should_close_stream: false,
+ }
+ }
+
+ pub async fn handle_user_id(
+ &mut self,
+ user_id: String,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+ pub async fn handle_backup_id(
+ &mut self,
+ backup_id: String,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+ pub async fn handle_log_hash(
+ &mut self,
+ log_hash: Vec<u8>,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+ pub async fn handle_log_data(
+ &mut self,
+ data_chunk: Vec<u8>,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+
+ pub async fn finish(self) -> Result<SendLogResponse, Status> {
+ unimplemented!()
+ }
+}
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
--- a/services/backup/src/service/mod.rs
+++ b/services/backup/src/service/mod.rs
@@ -20,12 +20,14 @@
mod handlers {
pub(super) mod add_attachments;
pub(super) mod create_backup;
+ pub(super) mod send_log;
// re-exports for convenient usage in handlers
pub(self) use super::handle_db_error;
pub(self) use super::proto;
}
use self::handlers::create_backup::CreateBackupHandler;
+use self::handlers::send_log::SendLogHandler;
pub struct MyBackupService {
db: DatabaseClient,
@@ -109,12 +111,51 @@
))
}
- #[instrument(skip(self))]
+ #[instrument(skip_all, fields(user_id, backup_id, log_hash, log_id))]
async fn send_log(
&self,
- _request: Request<tonic::Streaming<proto::SendLogRequest>>,
+ request: Request<tonic::Streaming<proto::SendLogRequest>>,
) -> Result<Response<proto::SendLogResponse>, Status> {
- Err(Status::unimplemented("unimplemented"))
+ use proto::send_log_request::Data::*;
+
+ info!("SendLog request: {:?}", request);
+ let db = self.db.clone();
+ let mut handler = SendLogHandler::new(&db);
+
+ let mut in_stream = request.into_inner();
+ while let Some(message) = in_stream.next().await {
+ let result = match message {
+ Ok(proto::SendLogRequest {
+ data: Some(UserId(user_id)),
+ }) => handler.handle_user_id(user_id).await,
+ Ok(proto::SendLogRequest {
+ data: Some(BackupId(backup_id)),
+ }) => handler.handle_backup_id(backup_id).await,
+ Ok(proto::SendLogRequest {
+ data: Some(LogHash(log_hash)),
+ }) => handler.handle_log_hash(log_hash).await,
+ Ok(proto::SendLogRequest {
+ data: Some(LogData(chunk)),
+ }) => handler.handle_log_data(chunk).await,
+ unexpected => {
+ error!("Received an unexpected request: {:?}", unexpected);
+ Err(Status::unknown("unknown error"))
+ }
+ };
+
+ if let Err(err) = result {
+ error!("An error occurred when processing request: {:?}", err);
+ return Err(err);
+ }
+ if handler.should_close_stream {
+ trace!("Handler requested to close request stream");
+ break;
+ }
+ }
+
+ let response = handler.finish().await;
+ debug!("Finished. Sending response: {:?}", response);
+ response.map(|response_body| Response::new(response_body))
}
type RecoverBackupKeyStream = Pin<

File Metadata

Mime Type
text/plain
Expires
Sun, Nov 17, 2:40 PM (19 h, 15 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2527718
Default Alt Text
D6211.id20752.diff (4 KB)

Event Timeline