Skip to content

Commit 689e2cf

Browse files
author
i076326
committed
cloudevents#9 Encoders for MQTT
1 parent a3ad7a9 commit 689e2cf

File tree

5 files changed

+231
-258
lines changed

5 files changed

+231
-258
lines changed

Diff for: cloudevents-sdk-paho-mqtt/README.md

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# CloudEvents SDK Rust - paho-mqtt [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
2+
3+
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [paho-mqtt](https://www.eclipse.org/paho/).
4+
5+
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
6+
7+
## Development & Contributing
8+
9+
If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md)
10+
11+
## Community
12+
13+
## Sample usage
14+
15+
- Check the example [paho-mqtt-example](../example-projects/paho-mqtt-example)
16+
17+
### MQTT V3
18+
- Start the MQTT V3 Consumer
19+
20+
```
21+
run --package <package-name> --bin <binary-name> -- --mode consumerV3 --broker tcp://localhost:1883 --topic test
22+
```
23+
24+
- Start the MQTT V3 Producer
25+
26+
```
27+
run --package <package-name> --bin <binary-name> -- --broker tcp://localhost:1883 --topic test --mode producerV3
28+
```
29+
30+
### MQTT V5
31+
- Start the MQTT V5 Consumer
32+
33+
```
34+
run --package <package-name> --bin <binary-name> -- --mode consumerV5 --broker tcp://localhost:1883 --topic test
35+
```
36+
37+
- Start the MQTT V5 Producer
38+
39+
```
40+
run --package <package-name> --bin <binary-name> -- --broker tcp://localhost:1883 --topic test --mode producerV5
41+
```
42+

Diff for: cloudevents-sdk-paho-mqtt/src/headers.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudeven
2929
pub(crate) static CONTENT_TYPE: &'static str = "content-type";
3030

3131
pub enum MqttVersion {
32-
V3_1,
33-
V3_1_1,
34-
V5,
32+
MQTT_3,
33+
MQTT_5,
3534
}

Diff for: cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs

+28-78
Original file line numberDiff line numberDiff line change
@@ -5,90 +5,59 @@ use cloudevents::message::{
55
Result, StructuredDeserializer, StructuredSerializer,
66
};
77
use cloudevents::{message, Event};
8-
use paho_mqtt::{Message, PropertyCode};
9-
use std::collections::HashMap;
8+
use paho_mqtt::{Message, Properties, PropertyCode};
109
use std::convert::TryFrom;
11-
use std::str;
1210

13-
pub struct ConsumerMessageDeserializer {
14-
pub(crate) headers: HashMap<String, Vec<u8>>,
11+
pub struct ConsumerMessageDeserializer<'a> {
12+
pub(crate) headers: &'a Properties,
1513
pub(crate) payload: Option<Vec<u8>>,
1614
}
1715

18-
impl ConsumerMessageDeserializer {
19-
fn get_mqtt_headers(message: &Message) -> Result<HashMap<String, Vec<u8>>> {
20-
let mut hm = HashMap::new();
21-
let prop_iterator = message.properties().iter(PropertyCode::UserProperty);
22-
23-
for property in prop_iterator {
24-
let header = property.get_string_pair().unwrap();
25-
hm.insert(header.0.to_string(), Vec::from(header.1));
26-
}
27-
28-
Ok(hm)
16+
impl<'a> ConsumerMessageDeserializer<'a> {
17+
fn get_mqtt_headers(message: &Message) -> &Properties {
18+
message.properties()
2919
}
3020

3121
pub fn new(message: &Message) -> Result<ConsumerMessageDeserializer> {
3222
Ok(ConsumerMessageDeserializer {
33-
headers: Self::get_mqtt_headers(message)?,
23+
headers: Self::get_mqtt_headers(message),
3424
payload: Some(message.payload()).map(|s| Vec::from(s)),
3525
})
3626
}
3727
}
3828

39-
impl BinaryDeserializer for ConsumerMessageDeserializer {
40-
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
29+
impl<'a> BinaryDeserializer for ConsumerMessageDeserializer<'a> {
30+
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
4131
if self.encoding() != Encoding::BINARY {
4232
return Err(message::Error::WrongEncoding {});
4333
}
4434

4535
let spec_version = SpecVersion::try_from(
46-
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
47-
.map_err(|e| cloudevents::message::Error::Other {
48-
source: Box::new(e),
49-
})?,
36+
self.headers
37+
.find_user_property(headers::SPEC_VERSION_HEADER)
38+
.unwrap()
39+
.as_str(),
5040
)?;
5141

5242
visitor = visitor.set_spec_version(spec_version.clone())?;
5343

5444
let attributes = spec_version.attribute_names();
5545

56-
if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
57-
visitor = visitor.set_attribute(
58-
"datacontenttype",
59-
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
60-
cloudevents::message::Error::Other {
61-
source: Box::new(e),
62-
}
63-
})?),
64-
)?
46+
if let Some(hv) = self.headers.find_user_property(headers::CONTENT_TYPE) {
47+
visitor = visitor.set_attribute("datacontenttype", MessageAttributeValue::String(hv))?
6548
}
6649

6750
for (hn, hv) in self
6851
.headers
69-
.into_iter()
52+
.user_iter()
7053
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
7154
{
7255
let name = &hn["ce_".len()..];
7356

7457
if attributes.contains(&name) {
75-
visitor = visitor.set_attribute(
76-
name,
77-
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
78-
cloudevents::message::Error::Other {
79-
source: Box::new(e),
80-
}
81-
})?),
82-
)?
58+
visitor = visitor.set_attribute(name, MessageAttributeValue::String(hv))?
8359
} else {
84-
visitor = visitor.set_extension(
85-
name,
86-
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
87-
cloudevents::message::Error::Other {
88-
source: Box::new(e),
89-
}
90-
})?),
91-
)?
60+
visitor = visitor.set_extension(name, MessageAttributeValue::String(hv))?
9261
}
9362
}
9463

@@ -100,51 +69,32 @@ impl BinaryDeserializer for ConsumerMessageDeserializer {
10069
}
10170
}
10271

103-
impl StructuredDeserializer for ConsumerMessageDeserializer {
72+
impl<'a> StructuredDeserializer for ConsumerMessageDeserializer<'a> {
10473
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
10574
visitor.set_structured_event(self.payload.unwrap())
10675
}
10776
}
10877

109-
impl MessageDeserializer for ConsumerMessageDeserializer {
78+
impl<'a> MessageDeserializer for ConsumerMessageDeserializer<'a> {
11079
fn encoding(&self) -> Encoding {
111-
match (
112-
self.headers
113-
.get("content-type")
114-
.map(|s| String::from_utf8(s.to_vec()).ok())
115-
.flatten()
116-
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
117-
.unwrap_or(false),
118-
self.headers.get(headers::SPEC_VERSION_HEADER),
119-
) {
120-
(true, _) => Encoding::STRUCTURED,
121-
(_, Some(_)) => Encoding::BINARY,
122-
_ => Encoding::UNKNOWN,
80+
match self.headers.iter(PropertyCode::UserProperty).count() == 0 {
81+
true => Encoding::STRUCTURED,
82+
false => Encoding::BINARY,
12383
}
12484
}
12585
}
12686

127-
pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result<Event> {
128-
match version {
129-
headers::MqttVersion::V5 => {
130-
BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
131-
}
132-
headers::MqttVersion::V3_1 => {
133-
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
134-
}
135-
headers::MqttVersion::V3_1_1 => {
136-
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
137-
}
138-
}
87+
pub fn record_to_event(msg: &Message) -> Result<Event> {
88+
MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
13989
}
14090

14191
pub trait MessageExt {
142-
fn to_event(&self, version: headers::MqttVersion) -> Result<Event>;
92+
fn to_event(&self) -> Result<Event>;
14393
}
14494

14595
impl MessageExt for Message {
146-
fn to_event(&self, version: headers::MqttVersion) -> Result<Event> {
147-
record_to_event(self, version)
96+
fn to_event(&self) -> Result<Event> {
97+
record_to_event(self)
14898
}
14999
}
150100

Diff for: cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,10 @@ impl MessageRecord {
2424

2525
pub fn from_event(event: Event, version: headers::MqttVersion) -> Result<Self> {
2626
match version {
27-
headers::MqttVersion::V5 => {
27+
headers::MqttVersion::MQTT_5 => {
2828
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
2929
}
30-
headers::MqttVersion::V3_1 => {
31-
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
32-
}
33-
headers::MqttVersion::V3_1_1 => {
30+
headers::MqttVersion::MQTT_3 => {
3431
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
3532
}
3633
}
@@ -127,11 +124,14 @@ impl StructuredSerializer<MessageRecord> for MessageRecord {
127124
}
128125

129126
pub trait MessageBuilderExt {
130-
fn message_record(self, message_record: &MessageRecord) -> MessageBuilder;
127+
fn event(self, event: Event, version: headers::MqttVersion) -> MessageBuilder;
131128
}
132129

133130
impl MessageBuilderExt for MessageBuilder {
134-
fn message_record(mut self, message_record: &MessageRecord) -> MessageBuilder {
131+
fn event(mut self, event: Event, version: headers::MqttVersion) -> MessageBuilder {
132+
let message_record =
133+
MessageRecord::from_event(event, version).expect("error while serializing the event");
134+
135135
self = self.properties(message_record.headers.clone());
136136

137137
if let Some(s) = message_record.payload.as_ref() {

0 commit comments

Comments
 (0)