diff --git a/server/Cargo.lock b/server/Cargo.lock index 68faaecc0b..dafe6808a2 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -89,6 +89,54 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" +[[package]] +name = "amq-protocol" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acc7cad07d1b4533fcb46f0819a6126fa201fd0385469aba75e405424f3fe009" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8b20aba8c35a0b885e1e978eff456ced925730a4e012e63e4ff89a1deb602b" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e245e0e9083b6a6db5f8c10013074cb382266eb9e2a37204d19c651b8d3b8114" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56987108bf48d2eb500cae8896cd9291564eedd8744776ecc5c3338a8b2ca5f8" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -104,6 +152,98 @@ version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg 1.1.0", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.7", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "reactor-trait", +] + [[package]] name = "async-stream" version = "0.3.4" @@ -126,6 +266,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + [[package]] name = "async-trait" version = "0.1.67" @@ -155,6 +301,12 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "autocfg" version = "0.1.8" @@ -336,6 +488,30 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "log", +] + [[package]] name = "bumpalo" version = "3.12.0" @@ -360,6 +536,15 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher 0.4.4", +] + [[package]] name = "cc" version = "1.0.79" @@ -379,7 +564,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c80e5460aa66fe3b91d40bcbdab953a597b60053e34d684ac6903f863b680a6" dependencies = [ "cfg-if", - "cipher", + "cipher 0.3.0", "cpufeatures", "zeroize", ] @@ -392,7 +577,7 @@ checksum = "a18446b09be63d457bbec447509e85f662f32952b035ce892290396bc0b0cff5" dependencies = [ "aead", "chacha20", - "cipher", + "cipher 0.3.0", "poly1305", "zeroize", ] @@ -422,6 +607,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "4.1.13" @@ -494,6 +689,15 @@ dependencies = [ "tokio-util 0.7.7", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.6.2" @@ -506,6 +710,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.3" @@ -735,6 +945,15 @@ dependencies = [ "pem-rfc7468 0.3.1", ] +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher 0.4.4", +] + [[package]] name = "digest" version = "0.9.0" @@ -775,6 +994,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dotenv" version = "0.15.0" @@ -884,6 +1109,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "errno" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "errno-dragonfly" version = "0.1.2" @@ -900,6 +1136,15 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08" +dependencies = [ + "async-trait", +] + [[package]] name = "fancy-regex" version = "0.10.0" @@ -951,6 +1196,18 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1050,6 +1307,21 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.27" @@ -1467,6 +1739,16 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -1516,7 +1798,7 @@ checksum = "8687c819457e979cc940d09cb16e42a1bf70aa6b60a549de6d3a62a0ee90c69e" dependencies = [ "hermit-abi 0.3.1", "io-lifetimes", - "rustix", + "rustix 0.36.11", "windows-sys 0.45.0", ] @@ -1618,13 +1900,35 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "lapin" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd03ea5831b44775e296239a64851e2fd14a80a363d202ba147009ffc994ff0f" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot 0.12.1", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" dependencies = [ - "spin", + "spin 0.5.2", ] [[package]] @@ -1669,6 +1973,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" +[[package]] +name = "linux-raw-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59d8c75012853d2e872fb56bc8a2e53718e2cafe1a4c823143141c6d90c322f" + [[package]] name = "lock_api" version = "0.4.9" @@ -2092,6 +2402,23 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p12" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4873306de53fe82e7e484df31e1e947d61514b6ea2ed6cd7b45d63006fd9224" +dependencies = [ + "cbc", + "cipher 0.4.4", + "des", + "getrandom", + "hmac 0.12.1", + "lazy_static", + "rc2", + "sha1 0.10.5", + "yasna", +] + [[package]] name = "p256" version = "0.10.1" @@ -2104,6 +2431,12 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.11.2" @@ -2247,6 +2580,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" +dependencies = [ + "doc-comment", + "flume", + "parking_lot 0.12.1", + "tracing", +] + [[package]] name = "pkcs1" version = "0.2.4" @@ -2288,6 +2633,22 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" +[[package]] +name = "polling" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be1c66a6add46bff50935c313dae30a5030cf8385c5206e8a95e9e9def974aa" +dependencies = [ + "autocfg 1.1.0", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "poly1305" version = "0.7.2" @@ -2459,6 +2820,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher 0.4.4", +] + +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "redis" version = "0.21.7" @@ -2636,7 +3017,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -2669,10 +3050,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db4165c9963ab29e422d6c26fbc1d37f15bace6b2810221f9d925023480fcf0e" dependencies = [ "bitflags", - "errno", + "errno 0.2.8", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.1.4", + "windows-sys 0.45.0", +] + +[[package]] +name = "rustix" +version = "0.37.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aae838e49b3d63e9274e1c01833cc8139d3fec468c3b84688c628f44b1ae11d" +dependencies = [ + "bitflags", + "errno 0.3.1", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.1", "windows-sys 0.45.0", ] @@ -2688,6 +3083,30 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-connector" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c6a18f8d10f71bce9bca6eaeb80429460e652f3bcf0381f0c5f8954abf7b3b8" +dependencies = [ + "log", + "rustls", + "rustls-native-certs", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.2" @@ -3072,6 +3491,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.4.1" @@ -3300,6 +3728,7 @@ dependencies = [ "ipnet", "jsonschema", "jwt-simple", + "lapin", "lazy_static", "num_enum", "openssl", @@ -3387,6 +3816,18 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tcp-stream" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a4b0a70bac0a58ca6a7659d1328e34ee462339c70b0fa49f72bad1f278910a" +dependencies = [ + "cfg-if", + "p12", + "rustls-connector", + "rustls-pemfile", +] + [[package]] name = "tempfile" version = "3.4.0" @@ -3396,7 +3837,7 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall", - "rustix", + "rustix 0.36.11", "windows-sys 0.42.0", ] @@ -4068,6 +4509,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.0" @@ -4249,7 +4696,7 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdacb41e6a96a052c6cb63a144f24900236121c6f63f4f8219fef5977ecb0c25" dependencies = [ - "windows-targets", + "windows-targets 0.42.2", ] [[package]] @@ -4258,13 +4705,13 @@ version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", ] [[package]] @@ -4273,7 +4720,16 @@ version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" dependencies = [ - "windows-targets", + "windows-targets 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.0", ] [[package]] @@ -4282,13 +4738,28 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", ] [[package]] @@ -4297,42 +4768,84 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + [[package]] name = "windows_i686_gnu" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + [[package]] name = "windows_i686_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + [[package]] name = "winnow" version = "0.4.0" @@ -4357,6 +4870,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" + [[package]] name = "zeroize" version = "1.4.3" diff --git a/server/rabbit/enabled_plugins b/server/rabbit/enabled_plugins new file mode 100644 index 0000000000..56c2f19c62 --- /dev/null +++ b/server/rabbit/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management, rabbitmq_delayed_message_exchange]. diff --git a/server/rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez b/server/rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez new file mode 100644 index 0000000000..fb7530f361 Binary files /dev/null and b/server/rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez differ diff --git a/server/run-tests.sh b/server/run-tests.sh index 699fd7f1bd..fc2dcb7ca4 100755 --- a/server/run-tests.sh +++ b/server/run-tests.sh @@ -40,4 +40,9 @@ SVIX_CACHE_TYPE="none" \ SVIX_REDIS_DSN="redis://localhost:6379" \ ${TEST_COMMAND} - +echo "*********** RUN 6 ***********" +SVIX_QUEUE_TYPE="rabbitmq" \ +SVIX_CACHE_TYPE="redis" \ +SVIX_REDIS_DSN="redis://localhost:6379" \ +SVIX_RABBIT_DSN="amqp://xivs:xivs@localhost:5672/%2f" \ +${TEST_COMMAND} diff --git a/server/svix-server/Cargo.toml b/server/svix-server/Cargo.toml index 826851e920..0ebc134658 100644 --- a/server/svix-server/Cargo.toml +++ b/server/svix-server/Cargo.toml @@ -74,6 +74,7 @@ urlencoding = "2.1.2" strum_macros = "0.24" strum = { version = "0.24", features = ["derive"] } form_urlencoded = "1.1.0" +lapin = "2.1.1" [dev-dependencies] anyhow = "1.0.56" diff --git a/server/svix-server/development.env b/server/svix-server/development.env index a9c49d6d96..a53de4e76a 100644 --- a/server/svix-server/development.env +++ b/server/svix-server/development.env @@ -1,7 +1,8 @@ # Example .env file for development -DATABASE_URL=postgresql://postgres:postgres@localhost:8079/postgres # For sqlx +DATABASE_URL="postgresql://postgres:postgres@localhost:8079/postgres" # For sqlx SVIX_CACHE_TYPE=memory SVIX_JWT_SECRET=x SVIX_LOG_LEVEL=trace SVIX_QUEUE_TYPE=redis -SVIX_REDIS_DSN=redis://localhost:8078 +SVIX_RABBIT_DSN="amqp://xivs:xivs@127.0.0.1:5672/%2f" +SVIX_REDIS_DSN="redis://localhost:8078" diff --git a/server/svix-server/src/cfg.rs b/server/svix-server/src/cfg.rs index d1ebae7a53..3090c0506b 100644 --- a/server/svix-server/src/cfg.rs +++ b/server/svix-server/src/cfg.rs @@ -181,6 +181,9 @@ pub struct ConfigurationInner { /// Maximum number of concurrent worker tasks to spawn (0 is unlimited) pub worker_max_tasks: u16, + /// The address of the rabbitmq exchange + pub rabbit_dsn: Option>, + #[serde(flatten)] pub internal: InternalConfig, } @@ -216,6 +219,17 @@ fn validate_config_complete( }); } } + QueueType::RabbitMQ => { + if config.rabbit_dsn.is_none() { + return Err(ValidationError { + code: Cow::from("missing field"), + message: Some(Cow::from( + "The rabbit_dsn field must be set if the queue_type is `rabbitmq`", + )), + params: HashMap::new(), + }); + } + } } Ok(()) @@ -239,6 +253,7 @@ impl ConfigurationInner { QueueType::Memory => QueueBackend::Memory, QueueType::Redis => QueueBackend::Redis(self.queue_dsn().expect(err)), QueueType::RedisCluster => QueueBackend::RedisCluster(self.queue_dsn().expect(err)), + QueueType::RabbitMQ => QueueBackend::RabbitMq(self.rabbit_dsn.as_ref().expect(err)), } } @@ -280,6 +295,7 @@ pub enum QueueBackend<'a> { Memory, Redis(&'a str), RedisCluster(&'a str), + RabbitMq(&'a str), } #[derive(Debug, Eq, PartialEq)] @@ -311,6 +327,7 @@ pub enum QueueType { Memory, Redis, RedisCluster, + RabbitMQ, } #[derive(Clone, Debug, Deserialize)] diff --git a/server/svix-server/src/error.rs b/server/svix-server/src/error.rs index a56ad2551a..01f388d0a8 100644 --- a/server/svix-server/src/error.rs +++ b/server/svix-server/src/error.rs @@ -238,6 +238,12 @@ impl Traceable for std::result::Result> { } } +impl Traceable for std::result::Result { + fn trace(self, location: &'static str) -> Result { + self.map_err(|e| Error::queue(format!("{e:?}"), location)) + } +} + #[derive(Debug, Clone)] pub enum ErrorType { /// A generic error diff --git a/server/svix-server/src/queue/memory.rs b/server/svix-server/src/queue/memory.rs index 3ffd1e6d52..2b0aff4d88 100644 --- a/server/svix-server/src/queue/memory.rs +++ b/server/svix-server/src/queue/memory.rs @@ -19,7 +19,7 @@ pub async fn new_pair() -> (TaskQueueProducer, TaskQueueConsumer) { ) } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct MemoryQueueProducer { tx: mpsc::UnboundedSender, } diff --git a/server/svix-server/src/queue/mod.rs b/server/svix-server/src/queue/mod.rs index 91fdb3ddee..141d4ca174 100644 --- a/server/svix-server/src/queue/mod.rs +++ b/server/svix-server/src/queue/mod.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration}; use axum::async_trait; use chrono::{DateTime, Utc}; +use lapin::options::{BasicAckOptions, BasicNackOptions}; use serde::{Deserialize, Serialize}; use strum::Display; use svix_ksuid::*; @@ -22,6 +23,7 @@ use self::{ }; pub mod memory; +pub mod rabbitmq; pub mod redis; const RETRY_SCHEDULE: &[Duration] = &[ @@ -48,6 +50,12 @@ pub async fn new_pair( redis::new_pair(pool, prefix).await } QueueBackend::Memory => memory::new_pair().await, + QueueBackend::RabbitMq(dsn) => { + let prefix = prefix.unwrap_or(""); + rabbitmq::new_pair(dsn, format!("{prefix}-message-queue")) + .await + .expect("can't connect to rabbit") + } } } @@ -113,6 +121,7 @@ pub enum QueueTask { pub enum TaskQueueProducer { Memory(MemoryQueueProducer), Redis(RedisQueueProducer), + RabbitMq(rabbitmq::Producer), } impl TaskQueueProducer { @@ -123,6 +132,7 @@ impl TaskQueueProducer { match self { TaskQueueProducer::Memory(q) => q.send(task.clone(), delay).await, TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await, + TaskQueueProducer::RabbitMq(q) => q.send(task.clone(), delay).await, } }, should_retry, @@ -135,6 +145,7 @@ impl TaskQueueProducer { pub enum TaskQueueConsumer { Redis(RedisQueueConsumer), Memory(MemoryQueueConsumer), + RabbitMq(rabbitmq::Consumer), } impl TaskQueueConsumer { @@ -142,16 +153,20 @@ impl TaskQueueConsumer { match self { TaskQueueConsumer::Redis(q) => ctx!(q.receive_all().await), TaskQueueConsumer::Memory(q) => ctx!(q.receive_all().await), + TaskQueueConsumer::RabbitMq(q) => ctx!(q.receive_all().await), } } } /// Used by TaskQueueDeliveries to Ack/Nack itself +#[derive(Debug)] enum Acker { Memory(MemoryQueueProducer), Redis(Arc), + RabbitMQ(lapin::message::Delivery), } +#[derive(Debug)] pub struct TaskQueueDelivery { pub id: String, pub task: Arc, @@ -178,6 +193,15 @@ impl TaskQueueDelivery { Acker::Redis(q) => { ctx!(q.ack(&self).await) } + Acker::RabbitMQ(delivery) => { + ctx!( + delivery + .ack(BasicAckOptions { + multiple: false // Only ack this message, not others + }) + .await + ) + } } }, should_retry, @@ -198,6 +222,17 @@ impl TaskQueueDelivery { Acker::Redis(q) => { ctx!(q.nack(&self).await) } + Acker::RabbitMQ(delivery) => { + // See https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue + ctx!( + delivery + .nack(BasicNackOptions { + requeue: true, + multiple: false // Only nack this message, not others + }) + .await + ) + } } }, should_retry, diff --git a/server/svix-server/src/queue/rabbitmq.rs b/server/svix-server/src/queue/rabbitmq.rs new file mode 100644 index 0000000000..b2e1c957ee --- /dev/null +++ b/server/svix-server/src/queue/rabbitmq.rs @@ -0,0 +1,269 @@ +use chrono::Utc; +use futures::StreamExt; +use lapin::{ + options::{ + BasicConsumeOptions, BasicPublishOptions, BasicQosOptions, ExchangeDeclareOptions, + QueueBindOptions, QueueDeclareOptions, + }, + types::{AMQPValue, FieldTable}, + BasicProperties, ConnectionProperties, +}; +use svix_ksuid::{KsuidLike, KsuidMs}; + +use crate::{ctx, err_generic, err_queue, error::Result}; +use std::{sync::Arc, time::Duration}; + +use super::{ + Acker, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer, TaskQueueReceive, + TaskQueueSend, +}; + +#[derive(Clone)] +pub struct Producer(Arc); + +struct ProducerInner { + exchange_name: String, + queue_name: String, + channel: lapin::Channel, +} + +pub struct Consumer { + consumer: lapin::Consumer, +} + +pub async fn new_pair( + dsn: &str, + queue_name: String, +) -> Result<(TaskQueueProducer, TaskQueueConsumer)> { + let conn = ctx!(lapin::Connection::connect(dsn, ConnectionProperties::default()).await)?; + let producer_chan = ctx!(conn.create_channel().await)?; + let consumer_chan = ctx!(conn.create_channel().await)?; + + let exchange_name = ctx!(declare_delayed_message_exchange(&producer_chan).await)?; + ctx!(declare_bound_queue(&queue_name, &exchange_name, &producer_chan).await)?; + + // With prefetch_size, there's a tradeoff. + // More prefetched messages drains RabbitMQ Queues faster, but also consumes more worker memory. + // Additionally, too many prefetched messages can starve other workers, and hurt total throughput. + // Leaving as 1 for now, to prevent starvation. + let prefetch_size = 1; + let consumer = ctx!(start_queue_consumer(&queue_name, &consumer_chan, prefetch_size).await)?; + let consumer = Consumer { consumer }; + + let producer = Producer(Arc::new(ProducerInner { + exchange_name, + channel: producer_chan, + queue_name, + })); + + let producer = TaskQueueProducer::RabbitMq(producer); + let consumer = TaskQueueConsumer::RabbitMq(consumer); + + Ok((producer, consumer)) +} + +async fn declare_delayed_message_exchange(channel: &lapin::Channel) -> Result { + let exchange_name = "first-message"; + + // See https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare + let opts = ExchangeDeclareOptions { + // Want the server to create the exchange if it doesn't already exist + passive: false, + nowait: false, + // Want the exchange to survive restarts, and not be deleted even when the underlying queues are + // deleted + durable: true, + auto_delete: false, + internal: false, + }; + + // See https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#usage + let mut args = FieldTable::default(); + args.insert( + "x-delayed-type".into(), + AMQPValue::LongString("direct".into()), + ); + + ctx!( + channel + .exchange_declare( + exchange_name, + lapin::ExchangeKind::Custom("x-delayed-message".into()), + opts, + args + ) + .await + )?; + + Ok(exchange_name.to_owned()) +} + +async fn declare_bound_queue( + queue_name: &str, + exchange_name: &str, + channel: &lapin::Channel, +) -> Result<()> { + // Ref https://www.rabbitmq.com/amqp-0-9-1-quickref.html#queue.declare + let opts = QueueDeclareOptions { + // If the queue already exists with the same configuration, we want the server to respond w/OK + // Alternatively, if the queue exists with a _different_ configuration, we want this to fail + passive: false, + nowait: false, + + // We want to support multiple consumers per queue + exclusive: false, + + // We want the queue to survive rust-primary restarts + durable: true, + auto_delete: false, + }; + + // Refs https://www.rabbitmq.com/maxlength.html#definition-using-x-args and https://www.rabbitmq.com/dlx.html#using-optional-queue-arguments + // We may want to figure out what the queue length enforcement looks like and dead letter queueing at a later point in time + let args = FieldTable::default(); + ctx!(channel.queue_declare(queue_name, opts, args).await)?; + + let routing_key = queue_name; + ctx!( + channel + .queue_bind( + queue_name, + exchange_name, + routing_key, + QueueBindOptions { nowait: false }, + FieldTable::default() + ) + .await + )?; + + Ok(()) +} + +async fn start_queue_consumer( + queue_name: &str, + channel: &lapin::Channel, + prefetch_size: u16, +) -> Result { + // Ref https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume.consumer-tag + // NOTE from @svix-gabriel: it's not really clear why this needs to be set to anything, + // so letting rabbit choose a default for now + let consumer_tag = ""; + + let opts = BasicConsumeOptions { + // https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-local + // false because I don't care if the same connection reads and publishes to the same queue + no_local: false, + + // https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-ack + // Obviously want message ACKs to ensure message are handled + no_ack: false, + + // https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume.exclusive + // More than one worker should be able to read from the same queue + exclusive: false, + + // https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-wait + // want the server to respond if there's a failure + nowait: false, + }; + + // NOTE from @svix-gabriel: it's not clear what, if any args we can even + // set here. So keeping blank for now + let args = FieldTable::default(); + + // Ref https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size + // + // prefetch_size tells the consumer how many messages to load in batches from the queue. + // Higher values generally means better queue performance (fewer messages stuck in the queue), + // at the cost of consumer memory. + // Additionally, if the prefetch_size is *too* large, a single worker can "starve" other workers, + // potentially hurting total message throughput. + // + // "global" enforces the same limit for other consumers on the channel, which isn't necessarily + // what we want + ctx!( + channel + .basic_qos(prefetch_size, BasicQosOptions { global: false }) + .await + )?; + + ctx!( + channel + .basic_consume(queue_name, consumer_tag, opts, args) + .await + ) +} + +#[axum::async_trait] +impl TaskQueueSend for Producer { + async fn send(&self, task: Arc, delay: Option) -> Result<()> { + let payload = serde_json::to_vec(&task) + .map_err(|e| err_generic!("unable to serialize queue task wtf: {:?}", e))?; + + let mut headers = FieldTable::default(); + if let Some(delay) = delay { + let delay_ms: u32 = delay + .as_millis() + .try_into() + .map_err(|_| err_queue!("message delay is too large"))?; + headers.insert("x-delay".into(), AMQPValue::LongUInt(delay_ms)) + } + + let routing_key = &self.0.queue_name; + + // Ref https://www.rabbitmq.com/publishers.html#unroutable + let options = BasicPublishOptions { + mandatory: true, // so we're alerted if the message is unroutable + immediate: false, + }; + + let id = KsuidMs::new(Some(Utc::now()), None).to_string(); + + let properties = BasicProperties::default() + .with_message_id(id.into()) + .with_headers(headers); + + let confirm = ctx!( + self.0 + .channel + .basic_publish( + &self.0.exchange_name, + routing_key, + options, + &payload, + properties + ) + .await + )?; + + ctx!(confirm.await)?; + + Ok(()) + } +} + +#[axum::async_trait] +impl TaskQueueReceive for Consumer { + async fn receive_all(&mut self) -> Result> { + let delivery = ctx!(self.consumer.next().await.ok_or(err_generic!( + "rabbitmq consumer unexpectedly returned nothing!" + ))?)?; + + let id = delivery + .properties + .message_id() + .as_ref() + .ok_or(err_generic!("task is missing message_id!"))? + .to_string(); + + let task: QueueTask = serde_json::from_slice(&delivery.data).map_err(|_e| { + err_generic!("rabbitmq task deserialization unexpectedly failed?!: {e:?}") + })?; + + Ok(vec![TaskQueueDelivery { + id, + task: Arc::new(task), + acker: Acker::RabbitMQ(delivery), + }]) + } +} diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index 165e957f1f..654cf40181 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -439,6 +439,7 @@ impl ToRedisArgs for Direction { } } +#[derive(Debug)] pub(super) struct RedisQueueInner { pool: RedisPool, main_queue_name: String, diff --git a/server/svix-server/tests/queue.rs b/server/svix-server/tests/queue.rs index 7a4a04a0ee..4c04e2d249 100644 --- a/server/svix-server/tests/queue.rs +++ b/server/svix-server/tests/queue.rs @@ -90,9 +90,12 @@ async fn test_many_queue_consumers_inner(prefix: &str, delay: Option) loop { tokio::select! { recv = c.receive_all() => { - let mut recv = recv.unwrap(); + let recv = recv.unwrap(); read += recv.len(); - out.append(&mut recv); + for r in recv { + out.push(task_queue_delivery_to_u16(&r)); + r.ack().await.unwrap(); + } } _ = tokio::time::sleep(Duration::from_millis(1000)) => { break; @@ -108,7 +111,7 @@ async fn test_many_queue_consumers_inner(prefix: &str, delay: Option) // Create a Vec with all the threads' outputs let mut out = Vec::new(); for jh in join_handles { - let (mut jh_out, read): (Vec, usize) = jh.join().unwrap(); + let (mut jh_out, read): (Vec, usize) = jh.join().unwrap(); out.append(&mut jh_out); if read < 5 { @@ -117,11 +120,7 @@ async fn test_many_queue_consumers_inner(prefix: &str, delay: Option) } // Sort it by the message ID - out.sort_by(|a: &TaskQueueDelivery, b: &TaskQueueDelivery| { - let a = task_queue_delivery_to_u16(a); - let b = task_queue_delivery_to_u16(b); - a.cmp(&b) - }); + out.sort(); // Then assert that all the messages are there @@ -130,7 +129,7 @@ async fn test_many_queue_consumers_inner(prefix: &str, delay: Option) // Genreally, however, this lint is actually good practice. #[allow(clippy::needless_range_loop)] for index in 0..1000 { - assert_eq!(task_queue_delivery_to_u16(&out[index]) as usize, index + 1); + assert_eq!(out[index] as usize, index + 1); } } diff --git a/server/testing-docker-compose.yml b/server/testing-docker-compose.yml index a3151d8c0b..d7d52d5853 100644 --- a/server/testing-docker-compose.yml +++ b/server/testing-docker-compose.yml @@ -71,6 +71,20 @@ services: ALLOW_EMPTY_PASSWORD: "yes" REDIS_NODES: "redis-cluster redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2 redis-cluster-node-3 redis-cluster-node-4" + rabbitmq: + image: rabbitmq:3.11.13-management-alpine + ports: + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_DEFAULT_USER: "xivs" + RABBITMQ_DEFAULT_PASS: "xivs" + RABBITMQ_PLUGINS_DIR: "/opt/rabbitmq/plugins:/usr/lib/rabbitmq/plugins" + volumes: + - rabbitmq_data:/var/lib/rabbitmq + - ./rabbit/enabled_plugins:/etc/rabbitmq/enabled_plugins + - ./rabbit/plugins:/usr/lib/rabbitmq/plugins + volumes: postgres-data: - + rabbitmq_data: \ No newline at end of file