Page MenuHomePhabricator

D5703.id18988.diff
No OneTemporary

D5703.id18988.diff

diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,17 +1,18 @@
use anyhow::{Context, Result};
use blob::blob_service_server::BlobService;
+use chrono::Utc;
use std::{pin::Pin, sync::Arc};
use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream};
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
use crate::{
constants::{
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
+ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
MPSC_CHANNEL_BUFFER_CAPACITY,
},
database::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::S3Path,
+ s3::{MultiPartUploadSession, S3Path},
};
pub mod blob {
@@ -64,9 +65,44 @@
Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
async fn put(
&self,
- _request: Request<tonic::Streaming<blob::PutRequest>>,
+ request: Request<tonic::Streaming<blob::PutRequest>>,
) -> Result<Response<Self::PutStream>, Status> {
- Err(Status::unimplemented("Not implemented yet"))
+ let mut in_stream = request.into_inner();
+ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let db = self.db.clone();
+ let s3 = self.s3.clone();
+ tokio::spawn(async move {
+ let mut put_handler = PutHandler::new(&db, &s3);
+
+ while let Some(message) = in_stream.next().await {
+ let response = match message {
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::Holder(new_holder)),
+ }) => put_handler.handle_holder(new_holder).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::BlobHash(new_hash)),
+ }) => put_handler.handle_blob_hash(new_hash).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::DataChunk(new_data)),
+ }) => put_handler.handle_data_chunk(new_data).await,
+ _ => Err(Status::unknown("unknown error")),
+ };
+ if let Err(e) = tx.send(response).await {
+ println!("Response was dropped: {}", e);
+ break;
+ }
+ if put_handler.should_close_stream {
+ break;
+ }
+ }
+
+ if let Err(status) = put_handler.finish().await {
+ let _ = tx.send(Err(status)).await;
+ }
+ });
+
+ let out_stream = ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
}
type GetStream =
@@ -191,3 +227,114 @@
Ok(Response::new(()))
}
}
+
+type PutResult = Result<blob::PutResponse, Status>;
+
+enum PutAction {
+ AssignHolder,
+ UploadNewBlob(BlobItem),
+}
+
+/// A helper for handling Put RPC requests
+struct PutHandler {
+ /// Should the stream be closed by server
+ pub should_close_stream: bool,
+ action: Option<PutAction>,
+
+ holder: Option<String>,
+ blob_hash: Option<String>,
+ current_chunk: Vec<u8>,
+
+ uploader: Option<MultiPartUploadSession>,
+ db: DatabaseClient,
+ s3: Arc<aws_sdk_s3::Client>,
+}
+
+impl PutHandler {
+ fn new(db: &DatabaseClient, s3: &Arc<aws_sdk_s3::Client>) -> Self {
+ PutHandler {
+ should_close_stream: false,
+ action: None,
+ holder: None,
+ blob_hash: None,
+ current_chunk: Vec::new(),
+ uploader: None,
+ db: db.clone(),
+ s3: s3.clone(),
+ }
+ }
+
+ pub async fn handle_holder(&mut self, new_holder: String) -> PutResult {
+ if self.holder.is_some() {
+ return Err(Status::invalid_argument("Holder already provided"));
+ }
+ self.holder = Some(new_holder);
+ self.determine_action()
+ }
+
+ pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult {
+ if self.blob_hash.is_some() {
+ return Err(Status::invalid_argument("Blob hash already provided"));
+ }
+ self.blob_hash = Some(new_hash);
+ self.determine_action()
+ }
+
+ /// private helper function to determine purpose of this RPC call
+ async fn determine_action(&mut self) -> PutResult {
+ // this should be called only if action isn't determined yet
+ // this error should actually never happen
+ if self.action.is_some() {
+ return Err(Status::failed_precondition("Put action is already started"));
+ }
+
+ // holder and hash need both to be set in order to continue
+ // otherwise we send a standard response
+ if self.holder.is_none() || self.blob_hash.is_none() {
+ return Ok(blob::PutResponse { data_exists: false });
+ }
+ let blob_hash = self
+ .blob_hash
+ .as_ref()
+ .ok_or_else(|| Status::failed_precondition("Internal error"))?;
+
+ match self.db.find_blob_item(blob_hash).await {
+ // Hash already exists, so we're only assigning a new holder to it
+ Ok(Some(_)) => {
+ self.action = Some(PutAction::AssignHolder);
+ self.should_close_stream = true;
+ Ok(blob::PutResponse { data_exists: true })
+ }
+ // Hash doesn't exist, so we're starting a new upload session
+ Ok(None) => {
+ self.action = Some(PutAction::UploadNewBlob(BlobItem {
+ blob_hash: blob_hash.to_string(),
+ s3_path: S3Path {
+ bucket_name: BLOB_S3_BUCKET_NAME.to_string(),
+ object_name: blob_hash.to_string(),
+ },
+ created: Utc::now(),
+ }));
+ Ok(blob::PutResponse { data_exists: false })
+ }
+ Err(_db_err) => {
+ self.should_close_stream = true;
+ Err(Status::aborted("Internal error"))
+ }
+ }
+ }
+
+ pub async fn handle_data_chunk(
+ &mut self,
+ mut new_data: Vec<u8>,
+ ) -> PutResult {
+ unimplemented!()
+ }
+
+ /// This function should be called after the input stream is finished.
+ /// This consumes `self` so this put handler instance cannot be used
+ /// after this is called.
+ pub async fn finish(self) -> Result<(), Status> {
+ unimplemented!()
+ }
+}

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 21, 12:50 PM (18 h, 37 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687707
Default Alt Text
D5703.id18988.diff (5 KB)

Event Timeline