Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3505113
D10602.id35671.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Referenced Files
None
Subscribers
None
D10602.id35671.diff
View Options
diff --git a/native/native_rust_library/src/backup.rs b/native/native_rust_library/src/backup.rs
--- a/native/native_rust_library/src/backup.rs
+++ b/native/native_rust_library/src/backup.rs
@@ -6,8 +6,8 @@
use crate::RUNTIME;
use backup_client::{
BackupClient, BackupData, BackupDescriptor, DownloadLogsRequest,
- LatestBackupIDResponse, LogWSResponse, RequestedData, SinkExt, StreamExt,
- UploadLogRequest, UserIdentity,
+ LatestBackupIDResponse, LogUploadConfirmation, LogWSResponse, RequestedData,
+ SinkExt, StreamExt, UploadLogRequest, UserIdentity,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -85,21 +85,27 @@
.upload_backup(&user_identity, backup_data)
.await?;
- let (tx, rx) = backup_client
- .upload_logs(&user_identity, &backup_id)
- .await?;
+ let (tx, rx) = backup_client.upload_logs(&user_identity).await?;
tokio::pin!(tx);
tokio::pin!(rx);
let log_data = UploadLogRequest {
+ backup_id: backup_id.clone(),
log_id: 1,
content: (1..100).collect(),
attachments: None,
};
tx.send(log_data.clone()).await?;
match rx.next().await {
- Some(Ok(1)) => (),
+ Some(Ok(LogUploadConfirmation {
+ backup_id: response_backup_id,
+ log_id: 1,
+ }))
+ if backup_id == response_backup_id =>
+ {
+ // Correctly uploaded
+ }
response => {
return Err(Box::new(InvalidWSLogResponse(format!("{response:?}"))))
}
@@ -149,14 +155,16 @@
let user_data: serde_json::Value = serde_json::from_slice(&user_data)?;
- let (tx, rx) = backup_client
- .download_logs(&user_identity, &backup_id)
- .await?;
+ let (tx, rx) = backup_client.download_logs(&user_identity).await?;
tokio::pin!(tx);
tokio::pin!(rx);
- tx.send(DownloadLogsRequest { from_id: None }).await?;
+ tx.send(DownloadLogsRequest {
+ backup_id: backup_id.clone(),
+ from_id: None,
+ })
+ .await?;
match rx.next().await {
Some(Ok(LogWSResponse::LogDownload {
diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs
--- a/services/backup/src/http/handlers/log.rs
+++ b/services/backup/src/http/handlers/log.rs
@@ -17,23 +17,17 @@
},
database::{self, blob::BlobOrDBContent},
};
-use std::{
- sync::Arc,
- time::{Duration, Instant},
-};
+use std::time::{Duration, Instant};
use tracing::{error, info, instrument, warn};
pub async fn handle_ws(
- path: web::Path<String>,
req: HttpRequest,
stream: web::Payload,
blob_client: web::Data<BlobServiceClient>,
db_client: web::Data<DatabaseClient>,
) -> Result<HttpResponse, Error> {
- let backup_id = path.into_inner();
ws::WsResponseBuilder::new(
LogWSActor {
- info: Arc::new(ConnectionInfo { backup_id }),
blob_client: blob_client.as_ref().clone(),
db_client: db_client.as_ref().clone(),
last_msg_time: Instant::now(),
@@ -46,10 +40,6 @@
.start()
}
-struct ConnectionInfo {
- backup_id: String,
-}
-
#[derive(
Debug, derive_more::From, derive_more::Display, derive_more::Error,
)]
@@ -60,7 +50,6 @@
}
struct LogWSActor {
- info: Arc<ConnectionInfo>,
blob_client: BlobServiceClient,
db_client: DatabaseClient,
@@ -77,12 +66,8 @@
ctx: &mut WebsocketContext<LogWSActor>,
bytes: Bytes,
) {
- let fut = Self::handle_msg(
- self.info.clone(),
- self.blob_client.clone(),
- self.db_client.clone(),
- bytes,
- );
+ let fut =
+ Self::handle_msg(self.blob_client.clone(), self.db_client.clone(), bytes);
let fut = actix::fut::wrap_future(fut).map(
|responses,
@@ -113,7 +98,6 @@
}
async fn handle_msg(
- info: Arc<ConnectionInfo>,
blob_client: BlobServiceClient,
db_client: DatabaseClient,
bytes: Bytes,
@@ -122,6 +106,7 @@
match request {
LogWSRequest::UploadLog(UploadLogRequest {
+ backup_id,
log_id,
content,
attachments,
@@ -136,7 +121,7 @@
}
let mut log_item = LogItem {
- backup_id: info.backup_id.clone(),
+ backup_id: backup_id.clone(),
log_id,
content: BlobOrDBContent::new(content),
attachments: attachment_blob_infos,
@@ -145,11 +130,14 @@
log_item.ensure_size_constraints(&blob_client).await?;
db_client.put_log_item(log_item).await?;
- Ok(vec![LogWSResponse::LogUploaded { log_id }])
+ Ok(vec![LogWSResponse::LogUploaded { backup_id, log_id }])
}
- LogWSRequest::DownloadLogs(DownloadLogsRequest { from_id }) => {
+ LogWSRequest::DownloadLogs(DownloadLogsRequest {
+ backup_id,
+ from_id,
+ }) => {
let (log_items, last_id) =
- db_client.fetch_log_items(&info.backup_id, from_id).await?;
+ db_client.fetch_log_items(&backup_id, from_id).await?;
let mut messages = vec![];
@@ -210,7 +198,7 @@
impl Actor for LogWSActor {
type Context = ws::WebsocketContext<Self>;
- #[instrument(skip_all, fields(backup_id = self.info.backup_id))]
+ #[instrument(skip_all)]
fn started(&mut self, ctx: &mut Self::Context) {
info!("Socket opened");
ctx.run_interval(Self::HEARTBEAT_INTERVAL, |actor, ctx| {
@@ -226,14 +214,14 @@
});
}
- #[instrument(skip_all, fields(backup_id = self.info.backup_id))]
+ #[instrument(skip_all)]
fn stopped(&mut self, _: &mut Self::Context) {
info!("Socket closed");
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for LogWSActor {
- #[instrument(skip_all, fields(backup_id = self.info.backup_id))]
+ #[instrument(skip_all)]
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
diff --git a/services/backup/src/http/mod.rs b/services/backup/src/http/mod.rs
--- a/services/backup/src/http/mod.rs
+++ b/services/backup/src/http/mod.rs
@@ -64,7 +64,7 @@
.service(
web::scope("/logs")
.wrap(get_comm_authentication_middleware())
- .service(web::resource("{backup_d}").route(web::get().to(handle_ws))),
+ .service(web::resource("").route(web::get().to(handle_ws))),
)
})
.bind(("0.0.0.0", CONFIG.http_port))?
diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs
--- a/services/commtest/tests/backup_integration_test.rs
+++ b/services/commtest/tests/backup_integration_test.rs
@@ -2,7 +2,7 @@
use backup_client::{
BackupClient, BackupData, BackupDescriptor, Error as BackupClientError,
- RequestedData, SinkExt, StreamExt, TryStreamExt,
+ LogUploadConfirmation, RequestedData, SinkExt, StreamExt, TryStreamExt,
};
use bytesize::ByteSize;
use comm_lib::{
@@ -37,10 +37,7 @@
.upload_backup(&user_identity, backup_data.clone())
.await?;
- let (tx, rx) = backup_client
- .upload_logs(&user_identity, &backup_data.backup_id)
- .await
- .unwrap();
+ let (tx, rx) = backup_client.upload_logs(&user_identity).await.unwrap();
tokio::pin!(tx);
tokio::pin!(rx);
@@ -49,9 +46,16 @@
tx.send(log_data.clone()).await.unwrap();
}
- let result: HashSet<usize> =
+ let result: HashSet<LogUploadConfirmation> =
rx.take(log_datas.len()).try_collect().await.unwrap();
- let expected = log_datas.iter().map(|data| data.log_id).collect();
+ let expected = log_datas
+ .iter()
+ .map(|data| LogUploadConfirmation {
+ backup_id: data.backup_id.clone(),
+ log_id: data.log_id,
+ })
+ .collect();
+
assert_eq!(result, expected);
}
@@ -92,17 +96,17 @@
assert_eq!(user_keys, backup_data.user_keys);
// Test log download
- let (tx, rx) = backup_client
- .download_logs(&user_identity, &backup_data.backup_id)
- .await
- .unwrap();
+ let (tx, rx) = backup_client.download_logs(&user_identity).await.unwrap();
tokio::pin!(tx);
tokio::pin!(rx);
- tx.send(DownloadLogsRequest { from_id: None })
- .await
- .unwrap();
+ tx.send(DownloadLogsRequest {
+ backup_id: backup_data.backup_id.clone(),
+ from_id: None,
+ })
+ .await
+ .unwrap();
let mut downloaded_logs = HashMap::new();
'download: loop {
@@ -118,6 +122,7 @@
LogWSResponse::LogDownloadFinished { last_log_id } => {
if let Some(last_log_id) = last_log_id {
tx.send(DownloadLogsRequest {
+ backup_id: backup_data.backup_id.clone(),
from_id: Some(last_log_id),
})
.await
@@ -159,17 +164,17 @@
);
// Test log cleanup
- let (tx, rx) = backup_client
- .download_logs(&user_identity, &removed_backup.backup_id)
- .await
- .unwrap();
+ let (tx, rx) = backup_client.download_logs(&user_identity).await.unwrap();
tokio::pin!(tx);
tokio::pin!(rx);
- tx.send(DownloadLogsRequest { from_id: None })
- .await
- .unwrap();
+ tx.send(DownloadLogsRequest {
+ backup_id: removed_backup.backup_id.clone(),
+ from_id: None,
+ })
+ .await
+ .unwrap();
match rx.next().await.unwrap().unwrap() {
LogWSResponse::LogDownloadFinished { last_log_id: None } => (),
@@ -199,7 +204,7 @@
),
attachments: vec![],
},
- generate_log_data(b'a'),
+ generate_log_data("b1", b'a'),
),
(
BackupData {
@@ -214,12 +219,12 @@
),
attachments: vec![],
},
- generate_log_data(b'b'),
+ generate_log_data("b2", b'b'),
),
]
}
-fn generate_log_data(value: u8) -> Vec<UploadLogRequest> {
+fn generate_log_data(backup_id: &str, value: u8) -> Vec<UploadLogRequest> {
const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize;
const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize;
@@ -240,6 +245,7 @@
content.extend(unique_suffix.as_bytes());
UploadLogRequest {
+ backup_id: backup_id.to_string(),
log_id,
content,
attachments,
diff --git a/shared/backup_client/src/lib.rs b/shared/backup_client/src/lib.rs
--- a/shared/backup_client/src/lib.rs
+++ b/shared/backup_client/src/lib.rs
@@ -119,15 +119,14 @@
pub async fn upload_logs(
&self,
user_identity: &UserIdentity,
- backup_id: &str,
) -> Result<
(
impl Sink<UploadLogRequest, Error = WSError>,
- impl Stream<Item = Result<usize, WSError>>,
+ impl Stream<Item = Result<LogUploadConfirmation, WSError>>,
),
Error,
> {
- let request = self.create_ws_request(user_identity, backup_id)?;
+ let request = self.create_ws_request(user_identity)?;
let (stream, response) = connect_async(request).await?;
if response.status().is_client_error() {
@@ -150,7 +149,9 @@
};
match response {
- LogWSResponse::LogUploaded { log_id } => Some(Ok(log_id)),
+ LogWSResponse::LogUploaded { backup_id, log_id } => {
+ Some(Ok(LogUploadConfirmation { backup_id, log_id }))
+ }
LogWSResponse::LogDownload { .. }
| LogWSResponse::LogDownloadFinished { .. } => {
Some(Err(WSError::InvalidBackupMessage))
@@ -165,7 +166,6 @@
pub async fn download_logs(
&self,
user_identity: &UserIdentity,
- backup_id: &str,
) -> Result<
(
impl Sink<DownloadLogsRequest, Error = WSError>,
@@ -173,7 +173,7 @@
),
Error,
> {
- let request = self.create_ws_request(user_identity, backup_id)?;
+ let request = self.create_ws_request(user_identity)?;
let (stream, response) = connect_async(request).await?;
if response.status().is_client_error() {
@@ -211,7 +211,6 @@
fn create_ws_request(
&self,
user_identity: &UserIdentity,
- backup_id: &str,
) -> Result<Request<()>, Error> {
let mut url = self.url.clone();
@@ -220,7 +219,7 @@
"https" => url.set_scheme("wss")?,
_ => (),
};
- let url = url.join("logs/")?.join(backup_id)?;
+ let url = url.join("logs")?;
let mut request = url.into_client_request().unwrap();
@@ -259,6 +258,12 @@
UserData,
}
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct LogUploadConfirmation {
+ pub backup_id: String,
+ pub log_id: usize,
+}
+
#[derive(
Debug, derive_more::Display, derive_more::Error, derive_more::From,
)]
diff --git a/shared/comm-lib/src/backup/mod.rs b/shared/comm-lib/src/backup/mod.rs
--- a/shared/comm-lib/src/backup/mod.rs
+++ b/shared/comm-lib/src/backup/mod.rs
@@ -8,6 +8,7 @@
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UploadLogRequest {
+ pub backup_id: String,
pub log_id: usize,
pub content: Vec<u8>,
pub attachments: Option<Vec<String>>,
@@ -15,6 +16,7 @@
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadLogsRequest {
+ pub backup_id: String,
pub from_id: Option<usize>,
}
@@ -27,6 +29,7 @@
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogWSResponse {
LogUploaded {
+ backup_id: String,
log_id: usize,
},
LogDownload {
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 21, 12:15 PM (19 h, 51 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687644
Default Alt Text
D10602.id35671.diff (12 KB)
Attached To
Mode
D10602: [backup] Allow one websocket connection to handle multiple backups
Attached
Detach File
Event Timeline
Log In to Comment