diff --git a/services/blob/src/service.rs b/services/blob/src/grpc.rs rename from services/blob/src/service.rs rename to services/blob/src/grpc.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/grpc.rs @@ -2,13 +2,14 @@ use aws_sdk_dynamodb::Error as DynamoDBError; use blob::blob_service_server::BlobService; use chrono::Utc; -use std::pin::Pin; +use std::{net::SocketAddr, pin::Pin}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tonic::{Request, Response, Status}; +use tonic::{transport::Server, Request, Response, Status}; use tracing::{debug, error, info, instrument, trace, warn, Instrument}; use crate::{ + config::CONFIG, constants::{ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, @@ -18,11 +19,28 @@ tools::MemOps, }; -pub mod blob { +mod blob { tonic::include_proto!("blob"); } +use blob::blob_service_server::BlobServiceServer; + +pub async fn run_grpc_server( + db_client: DatabaseClient, + s3_client: S3Client, +) -> Result<()> { + let addr: SocketAddr = format!("[::]:{}", CONFIG.grpc_port).parse()?; + let blob_service = MyBlobService::new(db_client, s3_client); + + info!("Starting gRPC server listening at {}", CONFIG.grpc_port); + Server::builder() + .add_service(BlobServiceServer::new(blob_service)) + .serve(addr) + .await?; + + Ok(()) +} -pub struct MyBlobService { +struct MyBlobService { db: DatabaseClient, s3: S3Client, } diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -1,18 +1,11 @@ pub mod config; pub mod constants; pub mod database; +pub mod grpc; pub mod s3; -pub mod service; pub mod tools; use anyhow::Result; -use config::CONFIG; -use database::DatabaseClient; -use s3::S3Client; -use service::{blob::blob_service_server::BlobServiceServer, MyBlobService}; -use std::net::SocketAddr; -use tonic::transport::Server; -use tracing::info; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; fn configure_logging() -> Result<()> { @@ -26,22 +19,6 @@ Ok(()) } -async fn run_grpc_server( - db_client: DatabaseClient, - s3_client: S3Client, -) -> Result<()> { - let addr: SocketAddr = format!("[::]:{}", CONFIG.grpc_port).parse()?; - let blob_service = MyBlobService::new(db_client, s3_client); - - info!("Starting gRPC server listening at {}", CONFIG.grpc_port); - Server::builder() - .add_service(BlobServiceServer::new(blob_service)) - .serve(addr) - .await?; - - Ok(()) -} - #[tokio::main] async fn main() -> Result<()> { configure_logging()?; @@ -51,5 +28,5 @@ let db = database::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); - run_grpc_server(db, s3).await + crate::grpc::run_grpc_server(db, s3).await }