@@ -3,7 +3,7 @@ use pallas::interop::utxorpc::spec::sync::BlockRef;
3
3
use pallas:: network:: miniprotocols:: Point ;
4
4
use serde:: Deserialize ;
5
5
use tracing:: debug;
6
- use utxorpc:: { CardanoSyncClient , ClientBuilder , TipEvent } ;
6
+ use utxorpc:: { CardanoSyncClient , ChainBlock , ClientBuilder , TipEvent } ;
7
7
8
8
use crate :: framework:: * ;
9
9
@@ -22,45 +22,49 @@ pub struct Worker {
22
22
}
23
23
24
24
impl Worker {
25
+ fn block_to_record (
26
+ & self ,
27
+ stage : & Stage ,
28
+ block : & ChainBlock < utxorpc:: spec:: cardano:: Block > ,
29
+ ) -> Result < ( Point , Record ) , WorkerError > {
30
+ let parsed = block. parsed . as_ref ( ) . ok_or ( WorkerError :: Panic ) ?;
31
+
32
+ let record = if stage. config . use_parsed_blocks {
33
+ Record :: ParsedBlock ( parsed. clone ( ) )
34
+ } else {
35
+ Record :: CborBlock ( block. native . to_vec ( ) )
36
+ } ;
37
+
38
+ let point = parsed
39
+ . header
40
+ . as_ref ( )
41
+ . map ( |h| Point :: Specific ( h. slot , h. hash . to_vec ( ) ) )
42
+ . ok_or ( WorkerError :: Panic ) ?;
43
+
44
+ Ok ( ( point, record) )
45
+ }
46
+
25
47
async fn process_next (
26
48
& self ,
27
49
stage : & mut Stage ,
28
50
unit : & TipEvent < utxorpc:: Cardano > ,
29
51
) -> Result < ( ) , WorkerError > {
30
52
match unit {
31
53
TipEvent :: Apply ( block) => {
32
- if let Some ( block) = & block. parsed {
33
- let header = block. header . as_ref ( ) . unwrap ( ) ;
54
+ let ( point, record) = self . block_to_record ( stage, block) ?;
34
55
35
- let block = block . body . as_ref ( ) . unwrap ( ) ;
56
+ let evt = ChainEvent :: Apply ( point . clone ( ) , record ) ;
36
57
37
- for tx in block. tx . clone ( ) {
38
- let evt = ChainEvent :: Apply (
39
- Point :: Specific ( header. slot , header. hash . to_vec ( ) ) ,
40
- Record :: ParsedTx ( tx) ,
41
- ) ;
42
-
43
- stage. output . send ( evt. into ( ) ) . await . or_panic ( ) ?;
44
- stage. chain_tip . set ( header. slot as i64 ) ;
45
- }
46
- }
58
+ stage. output . send ( evt. into ( ) ) . await . or_panic ( ) ?;
59
+ stage. chain_tip . set ( point. slot_or_default ( ) as i64 ) ;
47
60
}
48
61
TipEvent :: Undo ( block) => {
49
- if let Some ( block) = & block. parsed {
50
- let header = block. header . as_ref ( ) . unwrap ( ) ;
51
-
52
- let block = block. body . as_ref ( ) . unwrap ( ) ;
62
+ let ( point, record) = self . block_to_record ( stage, block) ?;
53
63
54
- for tx in block. tx . clone ( ) {
55
- let evt = ChainEvent :: Undo (
56
- Point :: Specific ( header. slot , header. hash . to_vec ( ) ) ,
57
- Record :: ParsedTx ( tx) ,
58
- ) ;
64
+ let evt = ChainEvent :: Undo ( point. clone ( ) , record) ;
59
65
60
- stage. output . send ( evt. into ( ) ) . await . or_panic ( ) ?;
61
- stage. chain_tip . set ( header. slot as i64 ) ;
62
- }
63
- }
66
+ stage. output . send ( evt. into ( ) ) . await . or_panic ( ) ?;
67
+ stage. chain_tip . set ( point. slot_or_default ( ) as i64 ) ;
64
68
}
65
69
TipEvent :: Reset ( block) => {
66
70
stage
@@ -154,6 +158,7 @@ pub struct Stage {
154
158
#[ derive( Deserialize ) ]
155
159
pub struct Config {
156
160
url : String ,
161
+ use_parsed_blocks : bool ,
157
162
}
158
163
159
164
impl Config {
0 commit comments