diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,6 +1,10 @@ use anyhow::{anyhow, Result}; -use aws_sdk_s3::model::CompletedPart; use aws_sdk_s3::Client as S3Client; +use aws_sdk_s3::{ + model::{CompletedMultipartUpload, CompletedPart}, + output::CreateMultipartUploadOutput, + types::ByteStream, +}; use std::sync::Arc; /// A helper structure representing an S3 object path @@ -49,17 +53,66 @@ impl MultiPartUploadSession { /// Starts a new upload session and returns its instance pub async fn start(client: &Arc, s3_path: &S3Path) -> Result { - unimplemented!() + let multipart_upload_res: CreateMultipartUploadOutput = client + .create_multipart_upload() + .bucket(&s3_path.bucket_name) + .key(&s3_path.object_name) + .send() + .await?; + + let upload_id = multipart_upload_res + .upload_id() + .ok_or(anyhow!("Upload ID expected to be present"))?; + + Ok(MultiPartUploadSession { + client: client.clone(), + bucket_name: String::from(&s3_path.bucket_name), + object_name: String::from(&s3_path.object_name), + upload_id: String::from(upload_id), + upload_parts: Vec::new(), + }) } /// adds data part to the multipart upload pub async fn add_part(&mut self, part: Vec) -> Result<()> { - unimplemented!() + let stream = ByteStream::from(part); + let part_number: i32 = self.upload_parts.len() as i32 + 1; + let upload_result = self + .client + .upload_part() + .key(&self.object_name) + .bucket(&self.bucket_name) + .upload_id(&self.upload_id) + .part_number(part_number) + .body(stream) + .send() + .await?; + + let completed_part = CompletedPart::builder() + .e_tag(upload_result.e_tag.unwrap_or_default()) + .part_number(part_number) + .build(); + self.upload_parts.push(completed_part); + Ok(()) } /// finishes the upload pub async fn finish_upload(&self) -> Result<()> { - unimplemented!() + let completed_multipart_upload = CompletedMultipartUpload::builder() + .set_parts(Some(self.upload_parts.clone())) + .build(); + + self + .client + .complete_multipart_upload() + .bucket(&self.bucket_name) + .key(&self.object_name) + .multipart_upload(completed_multipart_upload) + .upload_id(&self.upload_id) + .send() + .await?; + + Ok(()) } }