Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F33055670
D5730.1768428521.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
14 KB
Referenced Files
None
Subscribers
None
D5730.1768428521.diff
View Options
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -5,6 +5,7 @@
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
+use tracing::{debug, error, info, instrument, trace, warn, Instrument};
use crate::{
constants::{
@@ -39,8 +40,14 @@
let blob_hash = &reverse_index_item.blob_hash;
match self.db.find_blob_item(&blob_hash).await {
Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path),
- Ok(None) => Err(Status::not_found("blob not found")),
- Err(_) => Err(Status::aborted("internal error")),
+ Ok(None) => {
+ debug!("No blob found for {:?}", reverse_index_item);
+ Err(Status::not_found("blob not found"))
+ }
+ Err(err) => {
+ error!("Failed to find blob item: {:?}", err);
+ Err(Status::aborted("internal error"))
+ }
}
}
@@ -52,8 +59,14 @@
Ok(Some(reverse_index)) => {
self.find_s3_path_by_reverse_index(&reverse_index).await
}
- Ok(None) => Err(Status::not_found("blob not found")),
- Err(_) => Err(Status::aborted("internal error")),
+ Ok(None) => {
+ debug!("No db entry found for holder {:?}", holder);
+ Err(Status::not_found("blob not found"))
+ }
+ Err(err) => {
+ error!("Failed to find reverse index: {:?}", err);
+ Err(Status::aborted("internal error"))
+ }
}
}
}
@@ -63,15 +76,18 @@
impl BlobService for MyBlobService {
type PutStream =
Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, fields(holder, blob_hash))]
async fn put(
&self,
request: Request<tonic::Streaming<blob::PutRequest>>,
) -> Result<Response<Self::PutStream>, Status> {
+ info!("Put blob request: {:?}", request);
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let db = self.db.clone();
let s3 = self.s3.clone();
- tokio::spawn(async move {
+ let worker = async move {
let mut put_handler = PutHandler::new(&db, &s3);
while let Some(message) = in_stream.next().await {
@@ -85,21 +101,28 @@
Ok(blob::PutRequest {
data: Some(blob::put_request::Data::DataChunk(new_data)),
}) => put_handler.handle_data_chunk(new_data).await,
- _ => Err(Status::unknown("unknown error")),
+ unexpected => {
+ error!("Received an unexpected Result: {:?}", unexpected);
+ Err(Status::unknown("unknown error"))
+ }
};
+ trace!("Sending response: {:?}", response);
if let Err(e) = tx.send(response).await {
- println!("Response was dropped: {}", e);
+ error!("Response was dropped: {}", e);
break;
}
if put_handler.should_close_stream {
+ trace!("Put handler requested to close stream");
break;
}
}
if let Err(status) = put_handler.finish().await {
+ trace!("Sending error response: {:?}", status);
let _ = tx.send(Err(status)).await;
}
- });
+ };
+ tokio::spawn(worker.in_current_span());
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
@@ -107,12 +130,16 @@
type GetStream =
Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))]
async fn get(
&self,
request: Request<blob::GetRequest>,
) -> Result<Response<Self::GetStream>, Status> {
+ info!("Get blob request: {:?}", request);
let message: blob::GetRequest = request.into_inner();
let s3_path = self.find_s3_path_by_holder(&message.holder).await?;
+ tracing::Span::current().record("s3_path", s3_path.to_full_path());
let object_metadata = self
.s3
@@ -121,23 +148,28 @@
.key(s3_path.object_name.clone())
.send()
.await
- .map_err(|_| Status::aborted("server error"))?;
-
- let file_size: u64 = object_metadata
- .content_length()
- .try_into()
- .map_err(|_| Status::aborted("server error"))?;
+ .map_err(|err| {
+ error!("Failed to get S3 object metadata: {:?}", err);
+ Status::aborted("server error")
+ })?;
+
+ let file_size: u64 =
+ object_metadata.content_length().try_into().map_err(|err| {
+ error!("Failed to get S3 object content length: {:?}", err);
+ Status::aborted("server error")
+ })?;
let chunk_size: u64 =
GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let s3 = self.s3.clone();
- tokio::spawn(async move {
+ let worker = async move {
let mut offset: u64 = 0;
while offset < file_size {
let next_size = std::cmp::min(chunk_size, file_size - offset);
let range = format!("bytes={}-{}", offset, offset + next_size - 1);
+ trace!(range, "Getting {} bytes of data", next_size);
let data = match s3
.get_object()
@@ -158,38 +190,51 @@
Ok(data) => Ok(blob::GetResponse {
data_chunk: data.into_bytes().to_vec(),
}),
- Err(_) => Err(Status::aborted("download failed")),
+ Err(err) => {
+ error!("Failed to download data chunk: {:?}", err);
+ Err(Status::aborted("download failed"))
+ }
};
let should_abort = response.is_err();
if let Err(e) = tx.send(response).await {
- println!("Response was dropped: {}", e);
+ error!("Response was dropped: {}", e);
break;
}
if should_abort {
+ trace!("Error response, aborting");
break;
}
offset += chunk_size;
}
- });
+ };
+ tokio::spawn(worker.in_current_span());
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream) as Self::GetStream))
}
+ #[instrument(skip_all, fields(holder = %request.get_ref().holder))]
async fn remove(
&self,
request: Request<blob::RemoveRequest>,
) -> Result<Response<()>, Status> {
+ info!("Remove blob request: {:?}", request);
let message = request.into_inner();
let holder = message.holder.as_str();
let reverse_index_item = self
.db
.find_reverse_index_by_holder(holder)
.await
- .map_err(|_| Status::aborted("Internal error"))?
- .ok_or_else(|| Status::not_found("Blob not found"))?;
+ .map_err(|err| {
+ error!("Failed to find reverse index: {:?}", err);
+ Status::aborted("Internal error")
+ })?
+ .ok_or_else(|| {
+ debug!("Blob not found");
+ Status::not_found("Blob not found")
+ })?;
let blob_hash = &reverse_index_item.blob_hash;
if self.db.remove_reverse_index_item(holder).await.is_err() {
@@ -203,7 +248,10 @@
.db
.find_reverse_index_by_hash(blob_hash)
.await
- .map_err(|_| Status::aborted("Internal error"))?
+ .map_err(|err| {
+ error!("Failed to find reverse index: {:?}", err);
+ Status::aborted("Internal error")
+ })?
.is_empty()
{
let s3_path = self
@@ -217,9 +265,13 @@
.key(&s3_path.object_name)
.send()
.await
- .map_err(|_| Status::aborted("Internal error"))?;
+ .map_err(|err| {
+ error!("Failed to delete S3 object: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
- if self.db.remove_blob_item(blob_hash).await.is_err() {
+ if let Err(err) = self.db.remove_blob_item(blob_hash).await {
+ error!("Failed to remove blob item from database: {:?}", err);
return Err(Status::aborted("Internal error"));
}
}
@@ -266,16 +318,20 @@
pub async fn handle_holder(&mut self, new_holder: String) -> PutResult {
if self.holder.is_some() {
+ warn!("Holder already provided");
return Err(Status::invalid_argument("Holder already provided"));
}
+ tracing::Span::current().record("holder", &new_holder);
self.holder = Some(new_holder);
self.determine_action().await
}
pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult {
if self.blob_hash.is_some() {
+ warn!("Blob hash already provided");
return Err(Status::invalid_argument("Blob hash already provided"));
}
+ tracing::Span::current().record("blob_hash", &new_hash);
self.blob_hash = Some(new_hash);
self.determine_action().await
}
@@ -285,6 +341,7 @@
// this should be called only if action isn't determined yet
// this error should actually never happen
if self.action.is_some() {
+ error!("Put action is already started");
return Err(Status::failed_precondition("Put action is already started"));
}
@@ -301,12 +358,14 @@
match self.db.find_blob_item(blob_hash).await {
// Hash already exists, so we're only assigning a new holder to it
Ok(Some(_)) => {
+ debug!("Blob found, assigning holder");
self.action = Some(PutAction::AssignHolder);
self.should_close_stream = true;
Ok(blob::PutResponse { data_exists: true })
}
// Hash doesn't exist, so we're starting a new upload session
Ok(None) => {
+ debug!("Blob not found, starting upload action");
self.action = Some(PutAction::UploadNewBlob(BlobItem {
blob_hash: blob_hash.to_string(),
s3_path: S3Path {
@@ -317,8 +376,12 @@
}));
Ok(blob::PutResponse { data_exists: false })
}
- Err(_db_err) => {
+ Err(db_err) => {
self.should_close_stream = true;
+ error!(
+ "Error when finding BlobItem by hash {}: {:?}",
+ blob_hash, db_err
+ );
Err(Status::aborted("Internal error"))
}
}
@@ -332,20 +395,24 @@
Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
_ => {
self.should_close_stream = true;
+ error!("Data chunk sent before upload action is started");
return Err(Status::invalid_argument(
"Holder and hash should be provided before data",
));
}
};
+ trace!("Received {} bytes of data", new_data.len());
// create upload session if it doesn't already exist
if self.uploader.is_none() {
+ debug!("Uploader doesn't exist, starting new session");
self.uploader =
match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await
{
Ok(session) => Some(session),
- Err(_) => {
+ Err(err) => {
self.should_close_stream = true;
+ error!("Failed to create upload session: {:?}", err);
return Err(Status::aborted("Internal error"));
}
}
@@ -357,8 +424,10 @@
self.current_chunk.append(&mut new_data);
if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
{
- if uploader.add_part(self.current_chunk.clone()).await.is_err() {
+ trace!("Chunk size exceeded, adding new S3 part");
+ if let Err(err) = uploader.add_part(self.current_chunk.clone()).await {
self.should_close_stream = true;
+ error!("Failed to upload S3 part: {:?}", err);
return Err(Status::aborted("Internal error"));
}
self.current_chunk.clear();
@@ -372,14 +441,17 @@
/// after this is called.
pub async fn finish(self) -> Result<(), Status> {
if self.action.is_none() {
+ debug!("No action to perform, finishing now");
return Ok(());
}
- let holder = self
- .holder
- .ok_or_else(|| Status::aborted("Internal error"))?;
- let blob_hash = self
- .blob_hash
- .ok_or_else(|| Status::aborted("Internal error"))?;
+ let holder = self.holder.ok_or_else(|| {
+ error!("Cannot finish action. No holder provided!");
+ Status::aborted("Internal error")
+ })?;
+ let blob_hash = self.blob_hash.ok_or_else(|| {
+ error!("Cannot finish action. No blob hash provided!");
+ Status::aborted("Internal error")
+ })?;
let blob_item = match self.action {
None => return Ok(()),
Some(PutAction::AssignHolder) => {
@@ -388,25 +460,33 @@
Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
};
- let mut uploader = self
- .uploader
- .ok_or_else(|| Status::aborted("Internal error"))?;
+ let mut uploader = self.uploader.ok_or_else(|| {
+ // This also happens when client cancels before sending any data chunk
+ warn!("No uploader was created, finishing now");
+ Status::aborted("Internal error")
+ })?;
- if !self.current_chunk.is_empty()
- && uploader.add_part(self.current_chunk).await.is_err()
- {
- return Err(Status::aborted("Internal error"));
+ if !self.current_chunk.is_empty() {
+ if let Err(err) = uploader.add_part(self.current_chunk).await {
+ error!("Failed to upload final part: {:?}", err);
+ return Err(Status::aborted("Internal error"));
+ }
}
- if uploader.finish_upload().await.is_err() {
+ if let Err(err) = uploader.finish_upload().await {
+ error!("Failed to finish upload session: {:?}", err);
return Err(Status::aborted("Internal error"));
}
- if self.db.put_blob_item(blob_item).await.is_err() {
+ if let Err(err) = self.db.put_blob_item(blob_item).await {
+ error!("Failed to save BlobItem: {:?}", err);
return Err(Status::aborted("Internal error"));
}
- assign_holder_to_blob(&self.db, holder, blob_hash).await
+ assign_holder_to_blob(&self.db, holder, blob_hash).await?;
+
+ debug!("Upload finished successfully");
+ Ok(())
}
}
@@ -417,7 +497,8 @@
) -> Result<(), Status> {
let reverse_index_item = ReverseIndexItem { holder, blob_hash };
- if db.put_reverse_index_item(reverse_index_item).await.is_err() {
+ if let Err(err) = db.put_reverse_index_item(reverse_index_item).await {
+ error!("Failed to put reverse index: {:?}", err);
return Err(Status::aborted("Internal error"));
}
Ok(())
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jan 14, 10:08 PM (16 h, 25 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5934248
Default Alt Text
D5730.1768428521.diff (14 KB)
Attached To
Mode
D5730: [services][blob] Add meaningful logs in gRPC handler
Attached
Detach File
Event Timeline
Log In to Comment