Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3491753
D6211.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Referenced Files
None
Subscribers
None
D6211.diff
View Options
diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/src/service/handlers/send_log.rs
@@ -0,0 +1,80 @@
+use tonic::Status;
+use uuid::Uuid;
+
+use crate::{
+ blob::PutClient, constants::ID_SEPARATOR, database::DatabaseClient,
+ service::proto::SendLogResponse,
+};
+
+enum LogPersistence {
+ /// Log entirely stored in DynamoDB database
+ DB,
+ /// Log contents stored with Blob service
+ BLOB { holder: String },
+}
+
+pub struct SendLogHandler {
+ // flow control
+ pub should_close_stream: bool,
+
+ // inputs
+ user_id: Option<String>,
+ backup_id: Option<String>,
+ log_hash: Option<String>,
+
+ // internal state
+ log_id: Option<String>,
+ log_buffer: Vec<u8>,
+ persistence_method: LogPersistence,
+ should_receive_data: bool,
+
+ // client instances
+ db: DatabaseClient,
+ blob_client: Option<PutClient>,
+}
+
+impl SendLogHandler {
+ pub fn new(db: &DatabaseClient) -> Self {
+ SendLogHandler {
+ db: db.clone(),
+ blob_client: None,
+ user_id: None,
+ backup_id: None,
+ log_hash: None,
+ log_id: None,
+ log_buffer: Vec::new(),
+ persistence_method: LogPersistence::DB,
+ should_receive_data: false,
+ should_close_stream: false,
+ }
+ }
+
+ pub async fn handle_user_id(
+ &mut self,
+ user_id: String,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+ pub async fn handle_backup_id(
+ &mut self,
+ backup_id: String,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+ pub async fn handle_log_hash(
+ &mut self,
+ log_hash: Vec<u8>,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+ pub async fn handle_log_data(
+ &mut self,
+ data_chunk: Vec<u8>,
+ ) -> Result<(), Status> {
+ unimplemented!()
+ }
+
+ pub async fn finish(self) -> Result<SendLogResponse, Status> {
+ unimplemented!()
+ }
+}
diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs
--- a/services/backup/src/service/mod.rs
+++ b/services/backup/src/service/mod.rs
@@ -20,12 +20,14 @@
mod handlers {
pub(super) mod add_attachments;
pub(super) mod create_backup;
+ pub(super) mod send_log;
// re-exports for convenient usage in handlers
pub(self) use super::handle_db_error;
pub(self) use super::proto;
}
use self::handlers::create_backup::CreateBackupHandler;
+use self::handlers::send_log::SendLogHandler;
pub struct MyBackupService {
db: DatabaseClient,
@@ -106,12 +108,50 @@
))
}
- #[instrument(skip(self))]
+ #[instrument(skip_all, fields(backup_id, log_hash, log_id))]
async fn send_log(
&self,
- _request: Request<tonic::Streaming<proto::SendLogRequest>>,
+ request: Request<tonic::Streaming<proto::SendLogRequest>>,
) -> Result<Response<proto::SendLogResponse>, Status> {
- Err(Status::unimplemented("unimplemented"))
+ use proto::send_log_request::Data::*;
+
+ info!("SendLog request: {:?}", request);
+ let mut handler = SendLogHandler::new(&self.db);
+
+ let mut in_stream = request.into_inner();
+ while let Some(message) = in_stream.next().await {
+ let result = match message {
+ Ok(proto::SendLogRequest {
+ data: Some(UserId(user_id)),
+ }) => handler.handle_user_id(user_id).await,
+ Ok(proto::SendLogRequest {
+ data: Some(BackupId(backup_id)),
+ }) => handler.handle_backup_id(backup_id).await,
+ Ok(proto::SendLogRequest {
+ data: Some(LogHash(log_hash)),
+ }) => handler.handle_log_hash(log_hash).await,
+ Ok(proto::SendLogRequest {
+ data: Some(LogData(chunk)),
+ }) => handler.handle_log_data(chunk).await,
+ unexpected => {
+ error!("Received an unexpected request: {:?}", unexpected);
+ Err(Status::unknown("unknown error"))
+ }
+ };
+
+ if let Err(err) = result {
+ error!("An error occurred when processing request: {:?}", err);
+ return Err(err);
+ }
+ if handler.should_close_stream {
+ trace!("Handler requested to close request stream");
+ break;
+ }
+ }
+
+ let response = handler.finish().await;
+ debug!("Finished. Sending response: {:?}", response);
+ response.map(|response_body| Response::new(response_body))
}
type RecoverBackupKeyStream = Pin<
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 8:16 PM (20 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2678760
Default Alt Text
D6211.diff (4 KB)
Attached To
Mode
D6211: [services][backup] SendLog 1/4 - create handler module
Attached
Detach File
Event Timeline
Log In to Comment