Changeset View
Changeset View
Standalone View
Standalone View
services/identity/src/service.rs
Show First 20 Lines • Show All 81 Lines • ▼ Show 20 Lines | impl IdentityService for MyIdentityService { | ||||
#[instrument(skip(self))] | #[instrument(skip(self))] | ||||
async fn register_user( | async fn register_user( | ||||
&self, | &self, | ||||
request: Request<tonic::Streaming<RegistrationRequest>>, | request: Request<tonic::Streaming<RegistrationRequest>>, | ||||
) -> Result<Response<Self::RegisterUserStream>, Status> { | ) -> Result<Response<Self::RegisterUserStream>, Status> { | ||||
let mut in_stream = request.into_inner(); | let mut in_stream = request.into_inner(); | ||||
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); | let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); | ||||
let client = self.client.clone(); | |||||
tokio::spawn(async move { | |||||
let first_message = in_stream.next().await; | let first_message = in_stream.next().await; | ||||
let mut registration_state = registration::handle_registration_request( | let mut registration_state = registration::handle_registration_request( | ||||
first_message, | first_message, | ||||
self.client.clone(), | &client, | ||||
tx.clone(), | tx.clone(), | ||||
) | ) | ||||
.await?; | .await?; | ||||
// ServerRegistration in opaque-ke v1.2 doesn't implement Clone, so we | // ServerRegistration in opaque-ke v1.2 doesn't implement Clone, so we | ||||
// have to take the value out of registration_state, replacing it with None | // have to take the value out of registration_state, replacing it with None | ||||
let pake_state = | let pake_state = | ||||
if let Some(pake_state) = registration_state.pake_state.take() { | if let Some(pake_state) = registration_state.pake_state.take() { | ||||
pake_state | pake_state | ||||
} else { | } else { | ||||
error!("registration_state is missing opaque-ke ServerRegistration"); | error!("registration_state is missing opaque-ke ServerRegistration"); | ||||
return Err(Status::failed_precondition("internal error")); | return Err(Status::failed_precondition("internal error")); | ||||
}; | }; | ||||
let second_message = in_stream.next().await; | let second_message = in_stream.next().await; | ||||
let server_login = | let server_login = | ||||
registration::handle_registration_upload_and_credential_request( | registration::handle_registration_upload_and_credential_request( | ||||
second_message, | second_message, | ||||
tx.clone(), | tx.clone(), | ||||
&self.client, | &client, | ||||
®istration_state, | ®istration_state, | ||||
pake_state, | pake_state, | ||||
) | ) | ||||
.await?; | .await?; | ||||
let third_message = in_stream.next().await; | let third_message = in_stream.next().await; | ||||
registration::handle_credential_finalization( | registration::handle_credential_finalization( | ||||
third_message, | third_message, | ||||
tx, | tx, | ||||
&self.client, | &client, | ||||
®istration_state, | ®istration_state, | ||||
server_login, | server_login, | ||||
) | ) | ||||
.await?; | .await?; | ||||
Ok(()) | |||||
}); | |||||
let out_stream = ReceiverStream::new(rx); | let out_stream = ReceiverStream::new(rx); | ||||
Ok(Response::new( | Ok(Response::new( | ||||
Box::pin(out_stream) as Self::RegisterUserStream | Box::pin(out_stream) as Self::RegisterUserStream | ||||
)) | )) | ||||
} | } | ||||
type LoginUserStream = | type LoginUserStream = | ||||
Pin<Box<dyn Stream<Item = Result<LoginResponse, Status>> + Send + 'static>>; | Pin<Box<dyn Stream<Item = Result<LoginResponse, Status>> + Send + 'static>>; | ||||
#[instrument(skip(self))] | #[instrument(skip(self))] | ||||
async fn login_user( | async fn login_user( | ||||
&self, | &self, | ||||
request: Request<tonic::Streaming<LoginRequest>>, | request: Request<tonic::Streaming<LoginRequest>>, | ||||
) -> Result<Response<Self::LoginUserStream>, Status> { | ) -> Result<Response<Self::LoginUserStream>, Status> { | ||||
let mut in_stream = request.into_inner(); | let mut in_stream = request.into_inner(); | ||||
let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); | let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); | ||||
let client = self.client.clone(); | |||||
tokio::spawn(async move { | |||||
let first_message = in_stream.next().await; | let first_message = in_stream.next().await; | ||||
let login_state = | let login_state = | ||||
login::handle_login_request(first_message, tx.clone(), &self.client) | login::handle_login_request(first_message, tx.clone(), &client).await?; | ||||
.await?; | |||||
// login_state will be None if user is logging in with a wallet | // login_state will be None if user is logging in with a wallet | ||||
if let Some(state) = login_state { | if let Some(state) = login_state { | ||||
let second_message = in_stream.next().await; | let second_message = in_stream.next().await; | ||||
login::handle_credential_finalization( | login::handle_credential_finalization( | ||||
second_message, | second_message, | ||||
tx, | tx, | ||||
self.client.clone(), | &client, | ||||
state, | state, | ||||
) | ) | ||||
.await?; | .await?; | ||||
} | } | ||||
Ok::<(), Status>(()) | |||||
}); | |||||
let out_stream = ReceiverStream::new(rx); | let out_stream = ReceiverStream::new(rx); | ||||
Ok(Response::new(Box::pin(out_stream) as Self::LoginUserStream)) | Ok(Response::new(Box::pin(out_stream) as Self::LoginUserStream)) | ||||
} | } | ||||
#[instrument(skip(self))] | #[instrument(skip(self))] | ||||
async fn verify_user_token( | async fn verify_user_token( | ||||
&self, | &self, | ||||
▲ Show 20 Lines • Show All 348 Lines • ▼ Show 20 Lines | |||||
} | } | ||||
async fn pake_registration_start( | async fn pake_registration_start( | ||||
rng: &mut (impl Rng + CryptoRng), | rng: &mut (impl Rng + CryptoRng), | ||||
registration_request_bytes: &[u8], | registration_request_bytes: &[u8], | ||||
) -> Result<RegistrationResponseAndPakeState, Status> { | ) -> Result<RegistrationResponseAndPakeState, Status> { | ||||
match ServerRegistration::<Cipher>::start( | match ServerRegistration::<Cipher>::start( | ||||
rng, | rng, | ||||
PakeRegistrationRequest::deserialize(registration_request_bytes).unwrap(), | PakeRegistrationRequest::deserialize(registration_request_bytes).map_err( | ||||
|e| { | |||||
error!("Failed to deserialize registration request bytes: {}", e); | |||||
Status::aborted("registration failed") | |||||
}, | |||||
)?, | |||||
CONFIG.server_keypair.public(), | CONFIG.server_keypair.public(), | ||||
) { | ) { | ||||
Ok(server_registration_start_result) => { | Ok(server_registration_start_result) => { | ||||
Ok(RegistrationResponseAndPakeState { | Ok(RegistrationResponseAndPakeState { | ||||
response: RegistrationResponse { | response: RegistrationResponse { | ||||
data: Some(PakeRegistrationResponse( | data: Some(PakeRegistrationResponse( | ||||
server_registration_start_result.message.serialize(), | server_registration_start_result.message.serialize(), | ||||
)), | )), | ||||
▲ Show 20 Lines • Show All 85 Lines • Show Last 20 Lines |