Page MenuHomePhorge

D5730.1768387187.diff
No OneTemporary

Size
13 KB
Referenced Files
None
Subscribers
None

D5730.1768387187.diff

diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -5,6 +5,7 @@
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
+use tracing::{debug, error, info, instrument, trace, warn, Instrument};
use crate::{
constants::{
@@ -39,8 +40,14 @@
let blob_hash = &reverse_index_item.blob_hash;
match self.db.find_blob_item(&blob_hash).await {
Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path),
- Ok(None) => Err(Status::not_found("blob not found")),
- Err(_) => Err(Status::aborted("internal error")),
+ Ok(None) => {
+ debug!("No blob found for {:?}", reverse_index_item);
+ Err(Status::not_found("blob not found"))
+ }
+ Err(err) => {
+ error!("Failed to find blob item: {:?}", err);
+ Err(Status::aborted("internal error"))
+ }
}
}
@@ -52,8 +59,14 @@
Ok(Some(reverse_index)) => {
self.find_s3_path_by_reverse_index(&reverse_index).await
}
- Ok(None) => Err(Status::not_found("blob not found")),
- Err(_) => Err(Status::aborted("internal error")),
+ Ok(None) => {
+ debug!("No db entry found for holder {:?}", holder);
+ Err(Status::not_found("blob not found"))
+ }
+ Err(err) => {
+ error!("Failed to find reverse index: {:?}", err);
+ Err(Status::aborted("internal error"))
+ }
}
}
}
@@ -63,15 +76,18 @@
impl BlobService for MyBlobService {
type PutStream =
Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, fields(holder, blob_hash))]
async fn put(
&self,
request: Request<tonic::Streaming<blob::PutRequest>>,
) -> Result<Response<Self::PutStream>, Status> {
+ info!("Put blob request: {:?}", request);
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let db = self.db.clone();
let s3 = self.s3.clone();
- tokio::spawn(async move {
+ let worker = async move {
let mut put_handler = PutHandler::new(&db, &s3);
while let Some(message) = in_stream.next().await {
@@ -85,21 +101,28 @@
Ok(blob::PutRequest {
data: Some(blob::put_request::Data::DataChunk(new_data)),
}) => put_handler.handle_data_chunk(new_data).await,
- _ => Err(Status::unknown("unknown error")),
+ unexpected => {
+ error!("Received an unexpected Result: {:?}", unexpected);
+ Err(Status::unknown("unknown error"))
+ }
};
+ trace!("Sending response: {:?}", response);
if let Err(e) = tx.send(response).await {
- println!("Response was dropped: {}", e);
+ error!("Response was dropped: {}", e);
break;
}
if put_handler.should_close_stream {
+ trace!("Put handler requested to close stream");
break;
}
}
if let Err(status) = put_handler.finish().await {
+ trace!("Sending error response: {:?}", status);
let _ = tx.send(Err(status)).await;
}
- });
+ };
+ tokio::spawn(worker.in_current_span());
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
@@ -107,12 +130,16 @@
type GetStream =
Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))]
async fn get(
&self,
request: Request<blob::GetRequest>,
) -> Result<Response<Self::GetStream>, Status> {
+ info!("Get blob request: {:?}", request);
let message: blob::GetRequest = request.into_inner();
let s3_path = self.find_s3_path_by_holder(&message.holder).await?;
+ tracing::Span::current().record("s3_path", s3_path.to_full_path());
let object_metadata = self
.s3
@@ -121,23 +148,28 @@
.key(s3_path.object_name.clone())
.send()
.await
- .map_err(|_| Status::aborted("server error"))?;
-
- let file_size: u64 = object_metadata
- .content_length()
- .try_into()
- .map_err(|_| Status::aborted("server error"))?;
+ .map_err(|err| {
+ error!("Failed to get S3 object metadata: {:?}", err);
+ Status::aborted("server error")
+ })?;
+
+ let file_size: u64 =
+ object_metadata.content_length().try_into().map_err(|err| {
+ error!("Failed to get S3 object content length: {:?}", err);
+ Status::aborted("server error")
+ })?;
let chunk_size: u64 =
GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let s3 = self.s3.clone();
- tokio::spawn(async move {
+ let worker = async move {
let mut offset: u64 = 0;
while offset < file_size {
let next_size = std::cmp::min(chunk_size, file_size - offset);
let range = format!("bytes={}-{}", offset, offset + next_size - 1);
+ trace!(range, "Getting {} bytes of data", next_size);
let data = match s3
.get_object()
@@ -158,38 +190,51 @@
Ok(data) => Ok(blob::GetResponse {
data_chunk: data.into_bytes().to_vec(),
}),
- Err(_) => Err(Status::aborted("download failed")),
+ Err(err) => {
+ error!("Failed to download data chunk: {:?}", err);
+ Err(Status::aborted("download failed"))
+ }
};
let should_abort = response.is_err();
if let Err(e) = tx.send(response).await {
- println!("Response was dropped: {}", e);
+ error!("Response was dropped: {}", e);
break;
}
if should_abort {
+ trace!("Error response, aborting");
break;
}
offset += chunk_size;
}
- });
+ };
+ tokio::spawn(worker.in_current_span());
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream) as Self::GetStream))
}
+ #[instrument(skip_all, fields(holder = %request.get_ref().holder))]
async fn remove(
&self,
request: Request<blob::RemoveRequest>,
) -> Result<Response<()>, Status> {
+ info!("Remove blob request: {:?}", request);
let message = request.into_inner();
let holder = message.holder.as_str();
let reverse_index_item = self
.db
.find_reverse_index_by_holder(holder)
.await
- .map_err(|_| Status::aborted("Internal error"))?
- .ok_or_else(|| Status::not_found("Blob not found"))?;
+ .map_err(|err| {
+ error!("Failed to find reverse index: {:?}", err);
+ Status::aborted("Internal error")
+ })?
+ .ok_or_else(|| {
+ debug!("Blob not found");
+ Status::not_found("Blob not found")
+ })?;
let blob_hash = &reverse_index_item.blob_hash;
if self.db.remove_reverse_index_item(holder).await.is_err() {
@@ -203,7 +248,10 @@
.db
.find_reverse_index_by_hash(blob_hash)
.await
- .map_err(|_| Status::aborted("Internal error"))?
+ .map_err(|err| {
+ error!("Failed to find reverse index: {:?}", err);
+ Status::aborted("Internal error")
+ })?
.is_empty()
{
let s3_path = self
@@ -217,9 +265,13 @@
.key(&s3_path.object_name)
.send()
.await
- .map_err(|_| Status::aborted("Internal error"))?;
+ .map_err(|err| {
+ error!("Failed to delete S3 object: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
- if self.db.remove_blob_item(blob_hash).await.is_err() {
+ if let Err(err) = self.db.remove_blob_item(blob_hash).await {
+ error!("Failed to remove blob item from database: {:?}", err);
return Err(Status::aborted("Internal error"));
}
}
@@ -266,16 +318,20 @@
pub async fn handle_holder(&mut self, new_holder: String) -> PutResult {
if self.holder.is_some() {
+ warn!("Holder already provided");
return Err(Status::invalid_argument("Holder already provided"));
}
+ tracing::Span::current().record("holder", &new_holder);
self.holder = Some(new_holder);
return self.determine_action().await;
}
pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult {
if self.blob_hash.is_some() {
+ warn!("Blob hash already provided");
return Err(Status::invalid_argument("Blob hash already provided"));
}
+ tracing::Span::current().record("blob_hash", &new_hash);
self.blob_hash = Some(new_hash);
return self.determine_action().await;
}
@@ -285,6 +341,7 @@
// this should be called only if action isn't determined yet
// this error should actually never happen
if self.action.is_some() {
+ error!("Put action is already started");
return Err(Status::failed_precondition("Put action is already started"));
}
@@ -298,12 +355,14 @@
return match self.db.find_blob_item(blob_hash).await {
// Hash already exists, so we're only assigning a new holder to it
Ok(Some(_)) => {
+ debug!("Blob found, assigning holder");
self.action = Some(PutAction::AssignHolder);
self.should_close_stream = true;
Ok(blob::PutResponse { data_exists: true })
}
// Hash doesn't exist, so we're starting a new upload session
Ok(None) => {
+ debug!("Blob not found, starting upload action");
self.action = Some(PutAction::UploadNewBlob(BlobItem {
blob_hash: blob_hash.to_string(),
s3_path: S3Path {
@@ -314,8 +373,12 @@
}));
Ok(blob::PutResponse { data_exists: false })
}
- Err(_db_err) => {
+ Err(db_err) => {
self.should_close_stream = true;
+ error!(
+ "Error when finding BlobItem by hash {}: {:?}",
+ blob_hash, db_err
+ );
Err(Status::aborted("Internal error"))
}
};
@@ -329,20 +392,24 @@
Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
_ => {
self.should_close_stream = true;
+ error!("Data chunk sent before upload action is started");
return Err(Status::invalid_argument(
"Holder and hash should be provided before data",
));
}
};
+ trace!("Received {} bytes of data", new_data.len());
// create upload session if it doesn't already exist
if self.uploader.is_none() {
+ debug!("Uploader doesn't exist, starting new session");
self.uploader =
match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await
{
Ok(session) => Some(session),
- Err(_) => {
+ Err(err) => {
self.should_close_stream = true;
+ error!("Failed to create upload session: {:?}", err);
return Err(Status::aborted("Internal error"));
}
}
@@ -354,8 +421,10 @@
self.current_chunk.append(&mut new_data);
if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
{
- if uploader.add_part(self.current_chunk.clone()).await.is_err() {
+ trace!("Chunk size exceeded, adding new S3 part");
+ if let Err(err) = uploader.add_part(self.current_chunk.clone()).await {
self.should_close_stream = true;
+ error!("Failed to upload S3 part: {:?}", err);
return Err(Status::aborted("Internal error"));
}
self.current_chunk.clear();
@@ -368,10 +437,17 @@
/// after this is called. It should be called after the input stream is done.
pub async fn finish(self) -> Result<(), Status> {
if self.action.is_none() {
+ debug!("No action to perform, finishing now");
return Ok(());
}
- let holder = self.holder.ok_or(Status::aborted("Internal error"))?;
- let blob_hash = self.blob_hash.ok_or(Status::aborted("Internal error"))?;
+ let holder = self.holder.ok_or_else(|| {
+ error!("Cannot finish action. No holder provided!");
+ Status::aborted("Internal error")
+ })?;
+ let blob_hash = self.blob_hash.ok_or_else(|| {
+ error!("Cannot finish action. No blob hash provided!");
+ Status::aborted("Internal error")
+ })?;
let blob_item = match self.action {
None => return Ok(()),
Some(PutAction::AssignHolder) => {
@@ -380,24 +456,33 @@
Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
};
- let mut uploader =
- self.uploader.ok_or(Status::aborted("Internal error"))?;
+ let mut uploader = self.uploader.ok_or_else(|| {
+ // This also happens when client cancels before sending any data chunk
+ warn!("No uploader was created, finishing now");
+ Status::aborted("Internal error")
+ })?;
if !self.current_chunk.is_empty() {
- if uploader.add_part(self.current_chunk).await.is_err() {
+ if let Err(err) = uploader.add_part(self.current_chunk).await {
+ error!("Failed to upload final part: {:?}", err);
return Err(Status::aborted("Internal error"));
}
}
- if uploader.finish_upload().await.is_err() {
+ if let Err(err) = uploader.finish_upload().await {
+ error!("Failed to finish upload session: {:?}", err);
return Err(Status::aborted("Internal error"));
}
- if self.db.put_blob_item(blob_item).await.is_err() {
+ if let Err(err) = self.db.put_blob_item(blob_item).await {
+ error!("Failed to save BlobItem: {:?}", err);
return Err(Status::aborted("Internal error"));
}
- assign_holder_to_blob(&self.db, holder, blob_hash).await
+ assign_holder_to_blob(&self.db, holder, blob_hash).await?;
+
+ debug!("Upload finished successfully");
+ Ok(())
}
}
@@ -408,7 +493,8 @@
) -> Result<(), Status> {
let reverse_index_item = ReverseIndexItem { holder, blob_hash };
- if db.put_reverse_index_item(reverse_index_item).await.is_err() {
+ if let Err(err) = db.put_reverse_index_item(reverse_index_item).await {
+ error!("Failed to put reverse index: {:?}", err);
return Err(Status::aborted("Internal error"));
}
Ok(())

File Metadata

Mime Type
text/plain
Expires
Wed, Jan 14, 10:39 AM (6 h, 18 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5931585
Default Alt Text
D5730.1768387187.diff (13 KB)

Event Timeline