1
+ use crate :: headers;
2
+ use cloudevents:: event:: SpecVersion ;
3
+ use cloudevents:: message:: { Result , BinarySerializer , BinaryDeserializer , MessageAttributeValue ,
4
+ MessageDeserializer , Encoding , StructuredSerializer , StructuredDeserializer } ;
5
+ use cloudevents:: { message, Event } ;
6
+ use paho_mqtt:: { Message , PropertyCode } ;
7
+ use std:: collections:: HashMap ;
8
+ use std:: convert:: TryFrom ;
9
+ use std:: str;
10
+
11
+ pub struct ConsumerMessageDeserializer {
12
+ pub ( crate ) headers : HashMap < String , Vec < u8 > > ,
13
+ pub ( crate ) payload : Option < Vec < u8 > > ,
14
+ }
15
+
16
+ impl ConsumerMessageDeserializer {
17
+ fn get_mqtt_headers ( message : & Message ) -> Result < HashMap < String , Vec < u8 > > > {
18
+ let mut hm = HashMap :: new ( ) ;
19
+ let prop_iterator = message. properties ( ) . iter ( PropertyCode :: UserProperty ) ;
20
+
21
+ for property in prop_iterator {
22
+ let header = property. get_string_pair ( ) . unwrap ( ) ;
23
+ hm. insert ( header. 0 . to_string ( ) , Vec :: from ( header. 1 ) ) ;
24
+ }
25
+
26
+ Ok ( hm)
27
+ }
28
+
29
+ pub fn new ( message : & Message ) -> Result < ConsumerMessageDeserializer > {
30
+ Ok ( ConsumerMessageDeserializer {
31
+ headers : Self :: get_mqtt_headers ( message) ?,
32
+ payload : Some ( message. payload ( ) ) . map ( |s| Vec :: from ( s) ) ,
33
+ } )
34
+ }
35
+ }
36
+
37
+ impl BinaryDeserializer for ConsumerMessageDeserializer {
38
+ fn deserialize_binary < R : Sized , V : BinarySerializer < R > > ( mut self , mut visitor : V ) -> Result < R > {
39
+ if self . encoding ( ) != Encoding :: BINARY {
40
+ return Err ( message:: Error :: WrongEncoding { } )
41
+ }
42
+
43
+ let spec_version = SpecVersion :: try_from (
44
+ str:: from_utf8 ( & self . headers . remove ( headers:: SPEC_VERSION_HEADER ) . unwrap ( ) [ ..] )
45
+ . map_err ( |e| cloudevents:: message:: Error :: Other {
46
+ source : Box :: new ( e) ,
47
+ } ) ?,
48
+ ) ?;
49
+
50
+ visitor = visitor. set_spec_version ( spec_version. clone ( ) ) ?;
51
+
52
+ let attributes = spec_version. attribute_names ( ) ;
53
+
54
+ if let Some ( hv) = self . headers . remove ( headers:: CONTENT_TYPE ) {
55
+ visitor = visitor. set_attribute (
56
+ "datacontenttype" ,
57
+ MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
58
+ cloudevents:: message:: Error :: Other {
59
+ source : Box :: new ( e) ,
60
+ }
61
+ } ) ?) ,
62
+ ) ?
63
+ }
64
+
65
+ for ( hn, hv) in self
66
+ . headers
67
+ . into_iter ( )
68
+ . filter ( |( hn, _) | headers:: SPEC_VERSION_HEADER != * hn && hn. starts_with ( "ce_" ) )
69
+ {
70
+ let name = & hn[ "ce_" . len ( ) ..] ;
71
+
72
+ if attributes. contains ( & name) {
73
+ visitor = visitor. set_attribute (
74
+ name,
75
+ MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
76
+ cloudevents:: message:: Error :: Other {
77
+ source : Box :: new ( e) ,
78
+ }
79
+ } ) ?) ,
80
+ ) ?
81
+ } else {
82
+ visitor = visitor. set_extension (
83
+ name,
84
+ MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
85
+ cloudevents:: message:: Error :: Other {
86
+ source : Box :: new ( e) ,
87
+ }
88
+ } ) ?) ,
89
+ ) ?
90
+ }
91
+ }
92
+
93
+ if self . payload != None {
94
+ visitor. end_with_data ( self . payload . unwrap ( ) )
95
+ } else {
96
+ visitor. end ( )
97
+ }
98
+ }
99
+ }
100
+
101
+ impl StructuredDeserializer for ConsumerMessageDeserializer {
102
+ fn deserialize_structured < R : Sized , V : StructuredSerializer < R > > ( self , visitor : V ) -> Result < R > {
103
+ visitor. set_structured_event ( self . payload . unwrap ( ) )
104
+ }
105
+ }
106
+
107
+ impl MessageDeserializer for ConsumerMessageDeserializer {
108
+ fn encoding ( & self ) -> Encoding {
109
+ match (
110
+ self . headers
111
+ . get ( "content-type" )
112
+ . map ( |s| String :: from_utf8 ( s. to_vec ( ) ) . ok ( ) )
113
+ . flatten ( )
114
+ . map ( |s| s. starts_with ( headers:: CLOUDEVENTS_JSON_HEADER ) )
115
+ . unwrap_or ( false ) ,
116
+ self . headers . get ( headers:: SPEC_VERSION_HEADER ) ,
117
+ ) {
118
+ ( true , _) => Encoding :: STRUCTURED ,
119
+ ( _, Some ( _) ) => Encoding :: BINARY ,
120
+ _ => Encoding :: UNKNOWN ,
121
+ }
122
+ }
123
+ }
124
+
125
+ pub fn record_to_event ( msg : & Message , version : headers:: MqttVersion ) -> Result < Event > {
126
+ match version {
127
+ headers:: MqttVersion :: V5 => BinaryDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?) ,
128
+ headers:: MqttVersion :: V3_1 => StructuredDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?) ,
129
+ headers:: MqttVersion :: V3_1_1 => StructuredDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?) ,
130
+ }
131
+ }
132
+
133
+ pub trait MessageExt {
134
+ fn to_event ( & self , version : headers:: MqttVersion ) -> Result < Event > ;
135
+ }
136
+
137
+ impl MessageExt for Message {
138
+ fn to_event ( & self , version : headers:: MqttVersion ) -> Result < Event > {
139
+ record_to_event ( self , version)
140
+ }
141
+ }
142
+
143
+ #[ cfg( test) ]
144
+ mod tests {
145
+ use super :: * ;
146
+ use crate :: mqtt_producer_record:: MessageRecord ;
147
+
148
+ use chrono:: Utc ;
149
+ use cloudevents:: { EventBuilder , EventBuilderV10 } ;
150
+ use crate :: MessageBuilderExt ;
151
+ use serde_json:: json;
152
+ use cloudevents:: event:: Data ;
153
+
154
+ #[ test]
155
+ fn test_binary_record ( ) {
156
+ let time = Utc :: now ( ) ;
157
+
158
+ let expected = EventBuilderV10 :: new ( )
159
+ . id ( "0001" )
160
+ . ty ( "example.test" )
161
+ . time ( time)
162
+ . source ( "http://localhost" )
163
+ . data ( "application/json" ,
164
+ Data :: Binary ( String :: from ( "{\" hello\" :\" world\" }" ) . into_bytes ( ) ) )
165
+ . extension ( "someint" , "10" )
166
+ . build ( )
167
+ . unwrap ( ) ;
168
+
169
+ let message_record = MessageRecord :: from_event (
170
+ EventBuilderV10 :: new ( )
171
+ . id ( "0001" )
172
+ . ty ( "example.test" )
173
+ . time ( time)
174
+ . source ( "http://localhost" )
175
+ . extension ( "someint" , "10" )
176
+ . data ( "application/json" , json ! ( { "hello" : "world" } ) )
177
+ . build ( )
178
+ . unwrap ( ) ,
179
+ headers:: MqttVersion :: V5 ,
180
+ )
181
+ . unwrap ( ) ;
182
+
183
+ let msg = MessageBuilder :: new ( )
184
+ . topic ( "test" )
185
+ . message_record ( & message_record)
186
+ . qos ( 1 )
187
+ . finalize ( ) ;
188
+
189
+ assert_eq ! ( msg. to_event( headers:: MqttVersion :: V5 ) . unwrap( ) , expected)
190
+ }
191
+
192
+ #[ test]
193
+ fn test_structured_record ( ) {
194
+ let j = json ! ( { "hello" : "world" } ) ;
195
+
196
+ let expected = EventBuilderV10 :: new ( )
197
+ . id ( "0001" )
198
+ . ty ( "example.test" )
199
+ . source ( "http://localhost" )
200
+ . data ( "application/cloudevents+json" , j. clone ( ) )
201
+ . extension ( "someint" , "10" )
202
+ . build ( )
203
+ . unwrap ( ) ;
204
+
205
+ let input = EventBuilderV10 :: new ( )
206
+ . id ( "0001" )
207
+ . ty ( "example.test" )
208
+ . source ( "http://localhost" )
209
+ . data ( "application/cloudevents+json" , j. clone ( ) )
210
+ . extension ( "someint" , "10" )
211
+ . build ( )
212
+ . unwrap ( ) ;
213
+
214
+ let serialized_event =
215
+ StructuredDeserializer :: deserialize_structured ( input, MessageRecord :: new ( ) ) . unwrap ( ) ;
216
+
217
+ let msg = MessageBuilder :: new ( )
218
+ . topic ( "test" )
219
+ . message_record ( & serialized_event)
220
+ . qos ( 1 )
221
+ . finalize ( ) ;
222
+
223
+ assert_eq ! ( msg. to_event( headers:: MqttVersion :: V3_1_1 ) . unwrap( ) , expected)
224
+ }
225
+ }
0 commit comments