Changeset View
Changeset View
Standalone View
Standalone View
services/blob/src/service.rs
use anyhow::{Context, Result}; | use anyhow::{Context, Result}; | ||||
use blob::blob_service_server::BlobService; | use blob::blob_service_server::BlobService; | ||||
use chrono::Utc; | use chrono::Utc; | ||||
use std::{pin::Pin, sync::Arc}; | use std::{pin::Pin, sync::Arc}; | ||||
use tokio::sync::mpsc; | use tokio::sync::mpsc; | ||||
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; | use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; | ||||
use tonic::{Request, Response, Status}; | use tonic::{Request, Response, Status}; | ||||
use crate::{ | use crate::{ | ||||
constants::{ | constants::{ | ||||
BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, | BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, | ||||
MPSC_CHANNEL_BUFFER_CAPACITY, | MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, | ||||
}, | }, | ||||
database::{BlobItem, DatabaseClient, ReverseIndexItem}, | database::{BlobItem, DatabaseClient, ReverseIndexItem}, | ||||
s3::{MultiPartUploadSession, S3Path}, | s3::{MultiPartUploadSession, S3Path}, | ||||
}; | }; | ||||
pub mod blob { | pub mod blob { | ||||
tonic::include_proto!("blob"); | tonic::include_proto!("blob"); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 302 Lines • ▼ Show 20 Lines | match self.db.find_blob_item(blob_hash).await { | ||||
} | } | ||||
} | } | ||||
} | } | ||||
pub async fn handle_data_chunk( | pub async fn handle_data_chunk( | ||||
&mut self, | &mut self, | ||||
mut new_data: Vec<u8>, | mut new_data: Vec<u8>, | ||||
) -> PutResult { | ) -> PutResult { | ||||
unimplemented!() | let blob_item = match &self.action { | ||||
Some(PutAction::UploadNewBlob(blob_item)) => blob_item, | |||||
_ => { | |||||
self.should_close_stream = true; | |||||
return Err(Status::invalid_argument( | |||||
"Holder and hash should be provided before data", | |||||
)); | |||||
} | |||||
}; | |||||
// create upload session if it doesn't already exist | |||||
if self.uploader.is_none() { | |||||
self.uploader = | |||||
match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await | |||||
{ | |||||
Ok(session) => Some(session), | |||||
Err(_) => { | |||||
self.should_close_stream = true; | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
} | |||||
} | |||||
let uploader = self.uploader.as_mut().unwrap(); | |||||
// New parts should be added to AWS only if they exceed minimum part size, | |||||
// Otherwise AWS returns error | |||||
self.current_chunk.append(&mut new_data); | |||||
if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE | |||||
{ | |||||
if uploader.add_part(self.current_chunk.clone()).await.is_err() { | |||||
self.should_close_stream = true; | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
self.current_chunk.clear(); | |||||
} | |||||
Ok(blob::PutResponse { data_exists: false }) | |||||
} | } | ||||
/// This function should be called after the input stream is finished. | /// This function should be called after the input stream is finished. | ||||
/// This consumes `self` so this put handler instance cannot be used | /// This consumes `self` so this put handler instance cannot be used | ||||
/// after this is called. | /// after this is called. | ||||
pub async fn finish(self) -> Result<(), Status> { | pub async fn finish(self) -> Result<(), Status> { | ||||
unimplemented!() | if self.action.is_none() { | ||||
return Ok(()); | |||||
} | |||||
let holder = self | |||||
.holder | |||||
.ok_or_else(|| Status::aborted("Internal error"))?; | |||||
let blob_hash = self | |||||
.blob_hash | |||||
.ok_or_else(|| Status::aborted("Internal error"))?; | |||||
let blob_item = match self.action { | |||||
None => return Ok(()), | |||||
Some(PutAction::AssignHolder) => { | |||||
return assign_holder_to_blob(&self.db, holder, blob_hash).await; | |||||
} | |||||
Some(PutAction::UploadNewBlob(blob_item)) => blob_item, | |||||
}; | |||||
let mut uploader = self | |||||
.uploader | |||||
.ok_or_else(|| Status::aborted("Internal error"))?; | |||||
if !self.current_chunk.is_empty() | |||||
&& uploader.add_part(self.current_chunk).await.is_err() | |||||
{ | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
if uploader.finish_upload().await.is_err() { | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
if self.db.put_blob_item(blob_item).await.is_err() { | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
assign_holder_to_blob(&self.db, holder, blob_hash).await | |||||
} | |||||
} | |||||
async fn assign_holder_to_blob( | |||||
db: &DatabaseClient, | |||||
holder: String, | |||||
blob_hash: String, | |||||
) -> Result<(), Status> { | |||||
let reverse_index_item = ReverseIndexItem { holder, blob_hash }; | |||||
if db.put_reverse_index_item(reverse_index_item).await.is_err() { | |||||
return Err(Status::aborted("Internal error")); | |||||
} | } | ||||
Ok(()) | |||||
} | } |