diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock --- a/services/tunnelbroker/Cargo.lock +++ b/services/tunnelbroker/Cargo.lock @@ -31,6 +31,54 @@ "memchr", ] +[[package]] +name = "amq-protocol" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d40d8b2465c7959dd40cee32ba6ac334b5de57e9fca0cc756759894a4152a5d" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cb2100adae7da61953a2c3a01935d86caae13329fadce3333f524d6d6ce12e2" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "156ff13c8a3ced600b4e54ed826a2ae6242b6069d00dd98466827cef07d3daff" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "751bbd7d440576066233e740576f1b31fdc6ab86cfabfbd48c548de77eca73e4" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -95,6 +143,98 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "reactor-trait", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -117,6 +257,12 @@ "syn 2.0.15", ] +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + [[package]] name = "async-trait" version = "0.1.68" @@ -128,6 +274,12 @@ "syn 2.0.15", ] +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "atty" version = "0.2.14" @@ -551,6 +703,30 @@ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "log", +] + [[package]] name = "bumpalo" version = "3.12.1" @@ -579,6 +755,15 @@ "either", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.79" @@ -606,6 +791,16 @@ "winapi", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "4.2.7" @@ -664,12 +859,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "convert_case" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.3" @@ -695,6 +905,15 @@ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -775,6 +994,15 @@ "syn 1.0.109", ] +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "digest" version = "0.10.6" @@ -786,6 +1014,12 @@ "subtle", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "either" version = "1.8.1" @@ -844,6 +1078,21 @@ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08" +dependencies = [ + "async-trait", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -873,6 +1122,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -951,6 +1212,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -1247,6 +1523,16 @@ "hashbrown", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -1309,6 +1595,28 @@ "wasm-bindgen", ] +[[package]] +name = "lapin" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acc13beaa09eed710f406201f46b961345b4d061dd90ec3d3ccc70721e70342a" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1382,6 +1690,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.8.6" @@ -1418,6 +1732,16 @@ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1529,6 +1853,39 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p12" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4873306de53fe82e7e484df31e1e947d61514b6ea2ed6cd7b45d63006fd9224" +dependencies = [ + "cbc", + "cipher", + "des", + "getrandom", + "hmac", + "lazy_static", + "rc2", + "sha1", + "yasna", +] + +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + [[package]] name = "parking_lot_core" version = "0.9.7" @@ -1590,12 +1947,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" +dependencies = [ + "doc-comment", + "flume", + "parking_lot", + "tracing", +] + [[package]] name = "pkg-config" version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1714,6 +2099,26 @@ "getrandom", ] +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1810,7 +2215,7 @@ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -1864,6 +2269,30 @@ "webpki 0.22.0", ] +[[package]] +name = "rustls" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c911ba11bc8433e811ce56fde130ccf32f5127cab0e0194e9c68c5a5b671791e" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct 0.7.0", +] + +[[package]] +name = "rustls-connector" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67c8d6cf0e464eff7cee6ba0419f56a65d29999fc164dd719c8633fbb401365f" +dependencies = [ + "log", + "rustls 0.21.1", + "rustls-native-certs", + "rustls-webpki", +] + [[package]] name = "rustls-native-certs" version = "0.6.2" @@ -1885,6 +2314,16 @@ "base64 0.21.0", ] +[[package]] +name = "rustls-webpki" +version = "0.100.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.12" @@ -2072,6 +2511,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "strsim" version = "0.10.0" @@ -2112,6 +2560,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tcp-stream" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1322b18a9e329ba45e4430b19543045b85cd1dcb2892e77d27ab471ba2039bd1" +dependencies = [ + "cfg-if", + "p12", + "rustls-connector", + "rustls-pemfile", +] + [[package]] name = "tempfile" version = "3.5.0" @@ -2515,6 +2975,7 @@ "fcm", "futures", "futures-util", + "lapin", "lazy_static", "log", "once_cell", @@ -2632,6 +3093,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.0" @@ -2972,6 +3439,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" + [[package]] name = "zeroize" version = "1.6.0" diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -35,6 +35,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } tunnelbroker_messages = { path = "../../shared/tunnelbroker_messages" } derive_more = "0.99.17" +lapin = "2.2.1" [build-dependencies] tonic-build = "0.8" diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/amqp.rs @@ -0,0 +1,14 @@ +use crate::CONFIG; +use lapin::{Connection, ConnectionProperties}; +use tracing::info; + +pub async fn connect() -> Connection { + let amqp_uri = format!("amqp://{}:{}", CONFIG.amqp_address, CONFIG.amqp_port); + let conn = Connection::connect(&amqp_uri, ConnectionProperties::default()) + .await + .expect("Unable to connect to amqp endpoint"); + + info!("Connected to amqp endpoint: {}", &amqp_uri); + + return conn; +} diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs --- a/services/tunnelbroker/src/config.rs +++ b/services/tunnelbroker/src/config.rs @@ -13,6 +13,12 @@ /// HTTP server listening port #[arg(long, default_value_t = 51001)] pub http_port: u16, + /// AMQP server listening port + #[arg(long, default_value_t = 5672)] + pub amqp_port: u16, + /// AMQP server listening port + #[arg(long, default_value_t = String::from("localhost"))] + pub amqp_address: String, /// AWS Localstack service URL #[arg(env = "LOCALSTACK_ENDPOINT")] #[arg(long)] diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -1,3 +1,4 @@ +pub mod amqp; pub mod config; pub mod constants; pub mod database; @@ -29,6 +30,7 @@ config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; let db_client = database::DatabaseClient::new(&aws_config); + let amqp_connection = amqp::connect().await; let grpc_server = grpc::run_server(db_client.clone()); let websocket_server = websockets::run_server(db_client.clone());