diff --git a/Cargo.lock b/Cargo.lock index 8f2cc968..eb1616f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -174,6 +174,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "async-channel" version = "1.9.0" @@ -666,7 +672,7 @@ dependencies = [ "http-body 0.4.6", "md-5", "pin-project-lite", - "sha1 0.10.6", + "sha1", "sha2", "tracing", ] @@ -776,7 +782,7 @@ dependencies = [ "futures-core", "http 0.2.11", "http-body 0.4.6", - "itoa 1.0.10", + "itoa", "num-integer", "pin-project-lite", "pin-utils", @@ -825,7 +831,7 @@ dependencies = [ "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", - "itoa 1.0.10", + "itoa", "matchit", "memchr", "mime", @@ -1913,6 +1919,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -1925,12 +1942,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "dtoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" - [[package]] name = "dyn-clone" version = "1.0.17" @@ -2921,7 +2932,7 @@ checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", - "itoa 1.0.10", + "itoa", ] [[package]] @@ -2932,7 +2943,7 @@ checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ "bytes", "fnv", - "itoa 1.0.10", + "itoa", ] [[package]] @@ -3002,7 +3013,7 @@ dependencies = [ "http-body 0.4.6", "httparse", "httpdate", - "itoa 1.0.10", + "itoa", "pin-project-lite", "socket2 0.5.6", "tokio", @@ -3024,7 +3035,7 @@ dependencies = [ "http 1.0.0", "http-body 1.0.0", "httparse", - "itoa 1.0.10", + "itoa", "pin-project-lite", "smallvec", "tokio", @@ -3163,6 +3174,124 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "id-arena" version = "2.2.1" @@ -3177,12 +3306,23 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.4.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", ] [[package]] @@ -3331,12 +3471,6 @@ dependencies = [ "either", ] -[[package]] -name = "itoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" - [[package]] name = "itoa" version = "1.0.10" @@ -3508,6 +3642,12 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +[[package]] +name = "litemap" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" + [[package]] name = "lock_api" version = "0.4.11" @@ -3918,11 +4058,10 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ - "autocfg", "num-integer", "num-traits", ] @@ -4134,7 +4273,7 @@ dependencies = [ "net2", "openssl", "pallas", - "r2d2_redis", + "redis", "regex", "reqwest 0.11.24", "serde", @@ -4181,7 +4320,7 @@ dependencies = [ "hmac", "lazy_static", "rc2", - "sha1 0.10.6", + "sha1", "yasna", ] @@ -4606,9 +4745,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" @@ -5022,27 +5161,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - -[[package]] -name = "r2d2_redis" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "182473b876b0b93e353682ec58e207dd1cb4a62278bbe0045fe52b86b74363bb" -dependencies = [ - "r2d2", - "redis", -] - [[package]] name = "rand" version = "0.8.5" @@ -5121,16 +5239,19 @@ dependencies = [ [[package]] name = "redis" -version = "0.20.2" +version = "0.27.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4f0ceb2ec0dd769483ecd283f6615aa83dcd0be556d5294c6e659caefe7cc54" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" dependencies = [ - "async-trait", + "arc-swap", "combine", - "dtoa", - "itoa 0.4.8", + "itertools 0.13.0", + "itoa", + "num-bigint", "percent-encoding", - "sha1 0.6.1", + "ryu", + "sha1_smol", + "socket2 0.5.6", "url", ] @@ -5458,7 +5579,7 @@ checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ "bitflags 2.4.2", "errno", - "itoa 1.0.10", + "itoa", "libc", "linux-raw-sys 0.4.13", "once_cell", @@ -5639,15 +5760,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -5757,7 +5869,7 @@ version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ - "itoa 1.0.10", + "itoa", "ryu", "serde", ] @@ -5778,7 +5890,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 1.0.10", + "itoa", "ryu", "serde", ] @@ -5870,21 +5982,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ "indexmap 2.2.5", - "itoa 1.0.10", + "itoa", "ryu", "serde", "unsafe-libyaml", ] -[[package]] -name = "sha1" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" -dependencies = [ - "sha1_smol", -] - [[package]] name = "sha1" version = "0.10.6" @@ -6220,7 +6323,7 @@ dependencies = [ "hex", "hkdf", "hmac", - "itoa 1.0.10", + "itoa", "log", "md-5", "memchr", @@ -6229,7 +6332,7 @@ dependencies = [ "rand", "rsa", "serde", - "sha1 0.10.6", + "sha1", "sha2", "smallvec", "sqlx-core", @@ -6260,7 +6363,7 @@ dependencies = [ "hkdf", "hmac", "home", - "itoa 1.0.10", + "itoa", "log", "md-5", "memchr", @@ -6268,7 +6371,7 @@ dependencies = [ "rand", "serde", "serde_json", - "sha1 0.10.6", + "sha1", "sha2", "smallvec", "sqlx-core", @@ -6437,6 +6540,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -6597,7 +6711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", - "itoa 1.0.10", + "itoa", "num-conv", "powerfmt", "serde", @@ -6621,6 +6735,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -7142,9 +7266,9 @@ dependencies = [ [[package]] name = "url" -version = "2.4.1" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", @@ -7157,6 +7281,18 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.1" @@ -8155,6 +8291,18 @@ dependencies = [ "wast 35.0.2", ] +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "xattr" version = "1.3.1" @@ -8187,6 +8335,30 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +[[package]] +name = "yoke" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.32" @@ -8207,6 +8379,27 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "zerofrom" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", + "synstructure", +] + [[package]] name = "zeroize" version = "1.7.0" @@ -8227,6 +8420,28 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/Cargo.toml b/Cargo.toml index d8387ac3..22f6251d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ aws = ["aws-config", "aws-types", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3"] sql = ["sqlx"] gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default", "jsonwebtoken"] rabbitmq = ["lapin"] -redis = ["r2d2_redis"] u5c = ["tonic"] mithril = ["mithril-client"] # elasticsearch = auto feature flag @@ -56,7 +55,6 @@ file-rotate = { version = "0.7.5" } reqwest = { version = "0.11", features = ["json", "multipart"] } tokio = { version = "1", features = ["rt", "rt-multi-thread"] } async-trait = "0.1.68" - elasticsearch = { version = "8.5.0-alpha.1", optional = true } murmur3 = { version = "0.5.2", optional = true } openssl = { version = "0.10", optional = true, features = ["vendored"] } @@ -65,7 +63,6 @@ kafka = { version = "0.10.0", optional = true } google-cloud-pubsub = { version = "0.16.0", optional = true } google-cloud-googleapis = { version = "0.10.0", optional = true } google-cloud-default = { version = "0.4.0", optional = true, features = ["pubsub"] } -r2d2_redis = { version = "0.14.0", optional = true } jsonwebtoken = { version = "8.3.0", optional = true } tonic = { version = "0.11", features = ["tls", "tls-roots"], optional = true } futures = { version = "0.3.28", optional = true } @@ -79,3 +76,4 @@ extism = { version = "1.2.0", optional = true } mithril-client = { version = "^0.8", optional = true, features = ["fs"] } miette = { version = "7.2.0", features = ["fancy"] } itertools = "0.12.1" +redis = { version = "0.27.6", optional = true } diff --git a/src/cursor/redis.rs b/src/cursor/redis.rs index de69f762..2c15b73a 100644 --- a/src/cursor/redis.rs +++ b/src/cursor/redis.rs @@ -1,10 +1,6 @@ use gasket::framework::*; use pallas::network::miniprotocols::Point; -use r2d2_redis::{ - r2d2::{self, Pool}, - redis::{self, Commands}, - RedisConnectionManager, -}; +use redis::Commands; use serde::Deserialize; use tokio::select; use tracing::debug; @@ -40,18 +36,17 @@ pub enum Unit { } pub struct Worker { - pool: Pool, + client: redis::Client, key: String, } #[async_trait::async_trait(?Send)] impl gasket::framework::Worker for Worker { async fn bootstrap(stage: &Stage) -> Result { - let manager = RedisConnectionManager::new(stage.url.clone()).or_panic()?; - let pool = r2d2::Pool::builder().build(manager).or_panic()?; + let client = redis::Client::open(stage.url.as_str()).or_retry()?; Ok(Self { - pool, + client, key: stage.key.clone(), }) } @@ -74,10 +69,12 @@ impl gasket::framework::Worker for Worker { Unit::Track(x) => stage.breadcrumbs.track(x.clone()), Unit::Flush => { let data = breadcrumbs_to_data(&stage.breadcrumbs); - let mut conn = self.pool.get().or_restart()?; + let mut conn = self.client.get_connection().or_restart()?; let data_to_write = serde_json::to_string(&data).or_panic()?; - conn.set(&self.key, &data_to_write) + + let _: () = conn + .set(&self.key, &data_to_write) .map_err(Error::custom) .or_panic()?; } diff --git a/src/sinks/redis.rs b/src/sinks/redis.rs index b4a9e77f..d78a9966 100644 --- a/src/sinks/redis.rs +++ b/src/sinks/redis.rs @@ -1,16 +1,10 @@ -use std::ops::DerefMut; - use gasket::framework::*; -use r2d2_redis::{ - r2d2::{self, Pool}, - redis, RedisConnectionManager, -}; use serde::Deserialize; use crate::framework::*; pub struct Worker { - pool: Pool, + client: redis::Client, stream: String, maxlen: Option, } @@ -18,8 +12,7 @@ pub struct Worker { #[async_trait::async_trait(?Send)] impl gasket::framework::Worker for Worker { async fn bootstrap(stage: &Stage) -> Result { - let manager = RedisConnectionManager::new(stage.config.url.clone()).or_panic()?; - let pool = r2d2::Pool::builder().build(manager).or_panic()?; + let client = redis::Client::open(stage.config.url.as_str()).or_retry()?; let stream = stage .config @@ -30,7 +23,7 @@ impl gasket::framework::Worker for Worker { let maxlen = stage.config.stream_max_length; Ok(Self { - pool, + client, stream, maxlen, }) @@ -54,7 +47,7 @@ impl gasket::framework::Worker for Worker { let payload = serde_json::Value::from(record.unwrap()).to_string(); - let mut conn = self.pool.get().or_restart()?; + let mut conn = self.client.get_connection().or_restart()?; let mut command = redis::cmd("XADD"); command.arg(self.stream.clone()); @@ -64,10 +57,10 @@ impl gasket::framework::Worker for Worker { command.arg(maxlen); } - command + let _: () = command .arg("*") .arg(&[point.slot_or_default().to_string(), payload]) - .query(conn.deref_mut()) + .query(&mut conn) .or_retry()?; stage.ops_count.inc(1);