Page MenuHomePhabricator

D6213.diff
No OneTemporary

D6213.diff

diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
--- a/services/backup/src/service/handlers/send_log.rs
+++ b/services/backup/src/service/handlers/send_log.rs
@@ -1,9 +1,11 @@
use tonic::Status;
-use tracing::{error, trace, warn};
+use tracing::{debug, error, trace, warn};
use uuid::Uuid;
use crate::{
- blob::PutClient, constants::ID_SEPARATOR, database::DatabaseClient,
+ blob::PutClient,
+ constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT},
+ database::{DatabaseClient, LogItem},
service::proto::SendLogResponse,
};
@@ -93,7 +95,38 @@
&mut self,
data_chunk: Vec<u8>,
) -> Result<(), Status> {
- unimplemented!()
+ if !self.should_receive_data || self.log_id.is_none() {
+ self.should_close_stream = true;
+ error!("Data chunk sent before other inputs");
+ return Err(Status::invalid_argument(
+ "Data chunk sent before other inputs",
+ ));
+ }
+
+ // empty chunk ends transmission
+ if data_chunk.is_empty() {
+ self.should_close_stream = true;
+ return Ok(());
+ }
+
+ match self.persistence_method {
+ LogPersistence::DB => {
+ self.log_buffer.extend(data_chunk);
+ self.ensure_size_constraints().await?;
+ }
+ LogPersistence::BLOB { .. } => {
+ let Some(client) = self.blob_client.as_mut() else {
+ self.should_close_stream = true;
+ error!("Put client uninitialized. This should never happen!");
+ return Err(Status::failed_precondition("Internal error"));
+ };
+ client.put_data(data_chunk).await.map_err(|err| {
+ error!("Failed to upload data chunk: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+ }
+ }
+ Ok(())
}
pub async fn finish(self) -> Result<SendLogResponse, Status> {
@@ -122,6 +155,48 @@
self.should_receive_data = true;
Ok(())
}
+
+ /// Ensures log fits db size constraints. If not, it is moved to blob
+ /// persistence
+ async fn ensure_size_constraints(&mut self) -> Result<(), Status> {
+ let (Some(backup_id), Some(log_id), Some(log_hash)) = (
+ self.backup_id.as_ref(),
+ self.log_id.as_ref(),
+ self.log_hash.as_ref()
+ ) else {
+ self.should_close_stream = true;
+ error!("Log info absent in data mode. This should never happen!");
+ return Err(Status::failed_precondition("Internal error"));
+ };
+
+ let log_size = LogItem::size_from_components(
+ backup_id,
+ log_id,
+ log_hash,
+ &self.log_buffer,
+ );
+ if log_size > LOG_DATA_SIZE_DATABASE_LIMIT {
+ debug!("Log too large, switching persistence to Blob");
+ let holder =
+ crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id));
+ match crate::blob::start_simple_put_client(&holder, &log_hash).await? {
+ Some(mut put_client) => {
+ let blob_chunk = std::mem::take(&mut self.log_buffer);
+ put_client.put_data(blob_chunk).await.map_err(|err| {
+ error!("Failed to upload data chunk: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+ self.blob_client = Some(put_client);
+ }
+ None => {
+ debug!("Log hash already exists");
+ self.should_close_stream = true;
+ }
+ }
+ self.persistence_method = LogPersistence::BLOB { holder };
+ }
+ Ok(())
+ }
}
fn generate_log_id(backup_id: &str) -> String {

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 7:59 PM (20 h, 31 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2678718
Default Alt Text
D6213.diff (3 KB)

Event Timeline