Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3505080
D5703.id18920.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D5703.id18920.diff
View Options
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,17 +1,18 @@
use anyhow::{Context, Result};
use blob::blob_service_server::BlobService;
+use chrono::Utc;
use std::{pin::Pin, sync::Arc};
use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream};
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
use crate::{
constants::{
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
+ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
MPSC_CHANNEL_BUFFER_CAPACITY,
},
database::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::S3Path,
+ s3::{MultiPartUploadSession, S3Path},
};
pub mod blob {
@@ -64,9 +65,44 @@
Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
async fn put(
&self,
- _request: Request<tonic::Streaming<blob::PutRequest>>,
+ request: Request<tonic::Streaming<blob::PutRequest>>,
) -> Result<Response<Self::PutStream>, Status> {
- Err(Status::unimplemented("Not implemented yet"))
+ let mut in_stream = request.into_inner();
+ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let db = self.db.clone();
+ let s3 = self.s3.clone();
+ tokio::spawn(async move {
+ let mut put_handler = PutHandler::new(&db, &s3);
+
+ while let Some(message) = in_stream.next().await {
+ let response = match message {
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::Holder(new_holder)),
+ }) => put_handler.handle_holder(new_holder).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::BlobHash(new_hash)),
+ }) => put_handler.handle_blob_hash(new_hash).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::DataChunk(new_data)),
+ }) => put_handler.handle_data_chunk(new_data).await,
+ _ => Err(Status::unknown("unknown error")),
+ };
+ if let Err(e) = tx.send(response).await {
+ println!("Response was dropped: {}", e);
+ break;
+ }
+ if put_handler.should_close_stream {
+ break;
+ }
+ }
+
+ if let Err(status) = put_handler.finish().await {
+ let _ = tx.send(Err(status)).await;
+ }
+ });
+
+ let out_stream = ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
}
type GetStream =
@@ -191,3 +227,110 @@
Ok(Response::new(()))
}
}
+
+type PutResult = Result<blob::PutResponse, Status>;
+
+enum PutAction {
+ AssignHolder,
+ UploadNewBlob(BlobItem),
+}
+
+/// A helper for handling Put RPC requests
+struct PutHandler {
+ /// Should the stream be closed by server
+ pub should_close_stream: bool,
+ action: Option<PutAction>,
+
+ holder: Option<String>,
+ blob_hash: Option<String>,
+ current_chunk: Vec<u8>,
+
+ uploader: Option<MultiPartUploadSession>,
+ db: DatabaseClient,
+ s3: Arc<aws_sdk_s3::Client>,
+}
+
+impl PutHandler {
+ fn new(db: &DatabaseClient, s3: &Arc<aws_sdk_s3::Client>) -> Self {
+ PutHandler {
+ should_close_stream: false,
+ action: None,
+ holder: None,
+ blob_hash: None,
+ current_chunk: Vec::new(),
+ uploader: None,
+ db: db.clone(),
+ s3: s3.clone(),
+ }
+ }
+
+ pub async fn handle_holder(&mut self, new_holder: String) -> PutResult {
+ if self.holder.is_some() {
+ return Err(Status::invalid_argument("Holder already provided"));
+ }
+ self.holder = Some(new_holder);
+ return self.determine_action().await;
+ }
+
+ pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult {
+ if self.blob_hash.is_some() {
+ return Err(Status::invalid_argument("Blob hash already provided"));
+ }
+ self.blob_hash = Some(new_hash);
+ return self.determine_action().await;
+ }
+
+ /// private helper function to determine purpose of this RPC call
+ async fn determine_action(&mut self) -> PutResult {
+ // this should be called only if action isn't determined yet
+ // this error should actually never happen
+ if self.action.is_some() {
+ return Err(Status::failed_precondition("Put action is already started"));
+ }
+
+ // holder and hash need both to be set in order to continue
+ // otherwise we send a standard response
+ if self.holder.is_none() || self.blob_hash.is_none() {
+ return Ok(blob::PutResponse { data_exists: false });
+ }
+
+ let blob_hash = self.blob_hash.as_ref().unwrap();
+ return match self.db.find_blob_item(blob_hash).await {
+ // Hash already exists, so we're only assigning a new holder to it
+ Ok(Some(_)) => {
+ self.action = Some(PutAction::AssignHolder);
+ self.should_close_stream = true;
+ Ok(blob::PutResponse { data_exists: true })
+ }
+ // Hash doesn't exist, so we're starting a new upload session
+ Ok(None) => {
+ self.action = Some(PutAction::UploadNewBlob(BlobItem {
+ blob_hash: blob_hash.to_string(),
+ s3_path: S3Path {
+ bucket_name: BLOB_S3_BUCKET_NAME.to_string(),
+ object_name: blob_hash.to_string(),
+ },
+ created: Utc::now(),
+ }));
+ Ok(blob::PutResponse { data_exists: false })
+ }
+ Err(_db_err) => {
+ self.should_close_stream = true;
+ Err(Status::aborted("Internal error"))
+ }
+ };
+ }
+
+ pub async fn handle_data_chunk(
+ &mut self,
+ mut new_data: Vec<u8>,
+ ) -> PutResult {
+ unimplemented!()
+ }
+
+ /// This consumes `self` so this put handler instance cannot be used after this is called.
+ /// It should be called after the input stream is done
+ pub async fn finish(self) -> Result<(), Status> {
+ unimplemented!()
+ }
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 21, 12:06 PM (20 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687619
Default Alt Text
D5703.id18920.diff (5 KB)
Attached To
Mode
D5703: [services][blob] Upload 1/2 - Introduce Put handlers
Attached
Detach File
Event Timeline
Log In to Comment