Skip to content

Commit 3687a60

Browse files
committed
Merge branch 'main' into chore/update-pallas-git-edge
2 parents 21210ce + 7453ae0 commit 3687a60

File tree

9 files changed

+2105
-1646
lines changed

9 files changed

+2105
-1646
lines changed

Cargo.lock

+2,055-1,607
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ aws = ["aws-config", "aws-types", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3"]
1616
sql = ["sqlx"]
1717
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default", "jsonwebtoken"]
1818
rabbitmq = ["lapin"]
19-
redis = ["r2d2_redis"]
2019
u5c = ["tonic"]
2120
mithril = ["mithril-client"]
2221
# elasticsearch = auto feature flag
@@ -56,7 +55,6 @@ file-rotate = { version = "0.7.5" }
5655
reqwest = { version = "0.11", features = ["json", "multipart"] }
5756
tokio = { version = "1", features = ["rt", "rt-multi-thread"] }
5857
async-trait = "0.1.68"
59-
6058
elasticsearch = { version = "8.5.0-alpha.1", optional = true }
6159
murmur3 = { version = "0.5.2", optional = true }
6260
openssl = { version = "0.10", optional = true, features = ["vendored"] }
@@ -65,7 +63,6 @@ kafka = { version = "0.10.0", optional = true }
6563
google-cloud-pubsub = { version = "0.16.0", optional = true }
6664
google-cloud-googleapis = { version = "0.10.0", optional = true }
6765
google-cloud-default = { version = "0.4.0", optional = true, features = ["pubsub"] }
68-
r2d2_redis = { version = "0.14.0", optional = true }
6966
jsonwebtoken = { version = "8.3.0", optional = true }
7067
tonic = { version = "0.12.3", features = ["tls", "tls-roots"], optional = true }
7168
futures = { version = "0.3.28", optional = true }
@@ -79,3 +76,4 @@ extism = { version = "1.2.0", optional = true }
7976
mithril-client = { version = "^0.8", optional = true, features = ["fs"] }
8077
miette = { version = "7.2.0", features = ["fancy"] }
8178
itertools = "0.12.1"
79+
redis = { version = "0.27.6", optional = true }

docs/pages/v2/filters/parse_cbor.mdx

+21-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# Parse CBOR filter
22

3-
The `parse_cbor` filter aims to map cbor transactions to a structured transaction.
3+
The `parse_cbor` filter aims to map cbor blocks to structured blocks and cbor transactions to structured transactions.
44

5-
However, the filter will only work when the record received in the stage is CborTx in other words a transaction in Cbor format that was previously extracted from a block by another stage, otherwise, parse_cbor will ignore and pass the record to the next stage. When the record is CborTx, parse_cbor will decode and map the Cbor to a structure, so the next stage will receive the ParsedTx record. If no filter is enabled, the stages will receive the record in CborBlock format, and if only the parse_cbor filter is enabled in `daemon.toml`, it will be necessary to enable the [split_cbor](split_block) filter for the stage to receive the CborTx format.
5+
- When the record received in the stage is CborBlock, the filter will decode and map it to ParsedBlock, which is passed to the next stage.
6+
- When the record received in the stage is CborTx, the filter will decode and map it to ParsedTx, which is passed to the next stage. This will only happen when the record received in the stage is CborTx and therefore requires to enable the [split_cbor](split_block) filter before for the stage to receive the CborTx format.
7+
- Else, parse_cbor will ignore and pass the record to the next stage.
68

79
## Configuration
810

@@ -15,7 +17,23 @@ type = "ParseCbor"
1517

1618
## Examples
1719

18-
Below is an example of the data that will be sent to the sink. A block can contain many transactions, so the sink will receive an event for each transaction in json format.
20+
Below is an example of the data that will be sent to the sink when the filter received a CborBlock record.
21+
22+
```json
23+
{
24+
"event": "apply",
25+
"point": {
26+
"slot": 0,
27+
"hash": ""
28+
},
29+
"record": {
30+
"header": {},
31+
"body": {}
32+
}
33+
}
34+
```
35+
36+
Below is an example of the data that will be sent to the sink when the filter received a CborTx record. A block can contain many transactions, so the sink will receive an event for each transaction.
1937

2038
```json
2139
{

src/cursor/redis.rs

+8-11
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
use gasket::framework::*;
22
use pallas::network::miniprotocols::Point;
3-
use r2d2_redis::{
4-
r2d2::{self, Pool},
5-
redis::{self, Commands},
6-
RedisConnectionManager,
7-
};
3+
use redis::Commands;
84
use serde::Deserialize;
95
use tokio::select;
106
use tracing::debug;
@@ -40,18 +36,17 @@ pub enum Unit {
4036
}
4137

4238
pub struct Worker {
43-
pool: Pool<RedisConnectionManager>,
39+
client: redis::Client,
4440
key: String,
4541
}
4642

4743
#[async_trait::async_trait(?Send)]
4844
impl gasket::framework::Worker<Stage> for Worker {
4945
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
50-
let manager = RedisConnectionManager::new(stage.url.clone()).or_panic()?;
51-
let pool = r2d2::Pool::builder().build(manager).or_panic()?;
46+
let client = redis::Client::open(stage.url.as_str()).or_retry()?;
5247

5348
Ok(Self {
54-
pool,
49+
client,
5550
key: stage.key.clone(),
5651
})
5752
}
@@ -74,10 +69,12 @@ impl gasket::framework::Worker<Stage> for Worker {
7469
Unit::Track(x) => stage.breadcrumbs.track(x.clone()),
7570
Unit::Flush => {
7671
let data = breadcrumbs_to_data(&stage.breadcrumbs);
77-
let mut conn = self.pool.get().or_restart()?;
72+
let mut conn = self.client.get_connection().or_restart()?;
7873

7974
let data_to_write = serde_json::to_string(&data).or_panic()?;
80-
conn.set(&self.key, &data_to_write)
75+
76+
let _: () = conn
77+
.set(&self.key, &data_to_write)
8178
.map_err(Error::custom)
8279
.or_panic()?;
8380
}

src/filters/parse_cbor.rs

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ impl From<&Stage> for Worker {
4040

4141
gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
4242
let output = unit.clone().try_map_record(|r| match r {
43+
Record::CborBlock(cbor) => {
44+
let block = trv::MultiEraBlock::decode(&cbor).or_panic()?;
45+
let block = stage.mapper.map_block(&block);
46+
Ok(Record::ParsedBlock(block))
47+
}
4348
Record::CborTx(cbor) => {
4449
let tx = trv::MultiEraTx::decode(&cbor).or_panic()?;
4550
let tx = stage.mapper.map_tx(&tx);

src/sinks/mod.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ use serde::Deserialize;
33

44
use crate::framework::*;
55

6-
mod assert;
7-
mod common;
8-
mod file_rotate;
9-
mod noop;
10-
mod stdout;
11-
mod terminal;
12-
mod webhook;
6+
pub mod assert;
7+
pub mod common;
8+
pub mod file_rotate;
9+
pub mod noop;
10+
pub mod stdout;
11+
pub mod terminal;
12+
pub mod webhook;
1313

1414
#[cfg(feature = "rabbitmq")]
1515
mod rabbitmq;

src/sinks/redis.rs

+6-13
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,18 @@
1-
use std::ops::DerefMut;
2-
31
use gasket::framework::*;
4-
use r2d2_redis::{
5-
r2d2::{self, Pool},
6-
redis, RedisConnectionManager,
7-
};
82
use serde::Deserialize;
93

104
use crate::framework::*;
115

126
pub struct Worker {
13-
pool: Pool<RedisConnectionManager>,
7+
client: redis::Client,
148
stream: String,
159
maxlen: Option<usize>,
1610
}
1711

1812
#[async_trait::async_trait(?Send)]
1913
impl gasket::framework::Worker<Stage> for Worker {
2014
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
21-
let manager = RedisConnectionManager::new(stage.config.url.clone()).or_panic()?;
22-
let pool = r2d2::Pool::builder().build(manager).or_panic()?;
15+
let client = redis::Client::open(stage.config.url.as_str()).or_retry()?;
2316

2417
let stream = stage
2518
.config
@@ -30,7 +23,7 @@ impl gasket::framework::Worker<Stage> for Worker {
3023
let maxlen = stage.config.stream_max_length;
3124

3225
Ok(Self {
33-
pool,
26+
client,
3427
stream,
3528
maxlen,
3629
})
@@ -54,7 +47,7 @@ impl gasket::framework::Worker<Stage> for Worker {
5447

5548
let payload = serde_json::Value::from(record.unwrap()).to_string();
5649

57-
let mut conn = self.pool.get().or_restart()?;
50+
let mut conn = self.client.get_connection().or_restart()?;
5851

5952
let mut command = redis::cmd("XADD");
6053
command.arg(self.stream.clone());
@@ -64,10 +57,10 @@ impl gasket::framework::Worker<Stage> for Worker {
6457
command.arg(maxlen);
6558
}
6659

67-
command
60+
let _: () = command
6861
.arg("*")
6962
.arg(&[point.slot_or_default().to_string(), payload])
70-
.query(conn.deref_mut())
63+
.query(&mut conn)
7164
.or_retry()?;
7265

7366
stage.ops_count.inc(1);

src/sources/n2c.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl gasket::framework::Worker<Stage> for Worker {
195195

196196
#[derive(Deserialize)]
197197
pub struct Config {
198-
socket_path: PathBuf,
198+
pub socket_path: PathBuf,
199199
}
200200

201201
impl Config {

src/sources/n2n.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ impl gasket::framework::Worker<Stage> for Worker {
220220

221221
#[derive(Deserialize)]
222222
pub struct Config {
223-
peers: Vec<String>,
223+
pub peers: Vec<String>,
224224
}
225225

226226
impl Config {

0 commit comments

Comments
 (0)