Skip to content

Commit 903ee1f

Browse files
committed
feat(rust): use a better encoding for sending messages
1 parent 1186bf5 commit 903ee1f

File tree

4 files changed

+75
-47
lines changed

4 files changed

+75
-47
lines changed

Diff for: implementations/rust/ockam/ockam_api/src/nodes/service/messages.rs

+44-17
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::error::ApiError;
22
use crate::nodes::{BackgroundNodeClient, NodeManager, NodeManagerWorker};
33
use miette::IntoDiagnostic;
4-
use minicbor::{CborLen, Decode, Encode};
4+
use minicbor::encode::Write;
5+
use minicbor::{encode, CborLen, Decode, Decoder, Encode, Encoder};
56
use ockam_core::api::{Error, Request, Response};
67
use ockam_core::{self, async_trait, Decodable, Encodable, Encoded, Message, Result};
78
use ockam_multiaddr::MultiAddr;
@@ -13,23 +14,23 @@ const TARGET: &str = "ockam_api::message";
1314

1415
#[async_trait]
1516
pub trait Messages {
16-
async fn send_message<R: Message>(
17+
async fn send_message<T: Message, R: Message>(
1718
&self,
1819
ctx: &Context,
1920
to: &MultiAddr,
20-
message: Vec<u8>,
21+
message: T,
2122
timeout: Option<Duration>,
2223
) -> miette::Result<R>;
2324
}
2425

2526
#[async_trait]
2627
impl Messages for NodeManager {
2728
#[instrument(skip_all)]
28-
async fn send_message<R: Message>(
29+
async fn send_message<T: Message, R: Message>(
2930
&self,
3031
ctx: &Context,
3132
to: &MultiAddr,
32-
message: Vec<u8>,
33+
message: T,
3334
timeout: Option<Duration>,
3435
) -> miette::Result<R> {
3536
let connection = self
@@ -56,11 +57,11 @@ impl Messages for NodeManager {
5657
#[async_trait]
5758
impl Messages for BackgroundNodeClient {
5859
#[instrument(skip_all)]
59-
async fn send_message<R: Message>(
60+
async fn send_message<T: Message, R: Message>(
6061
&self,
6162
ctx: &Context,
6263
to: &MultiAddr,
63-
message: Vec<u8>,
64+
message: T,
6465
timeout: Option<Duration>,
6566
) -> miette::Result<R> {
6667
let request = Request::post("v0/message").body(SendMessage::new(to, message));
@@ -69,10 +70,10 @@ impl Messages for BackgroundNodeClient {
6970
}
7071

7172
impl NodeManagerWorker {
72-
pub(crate) async fn send_message<R: Message>(
73+
pub(crate) async fn send_message<T: Message, R: Message>(
7374
&self,
7475
ctx: &Context,
75-
send_message: SendMessage,
76+
send_message: SendMessage<T>,
7677
) -> Result<Response<R>, Response<Error>> {
7778
let multiaddr = send_message.multiaddr()?;
7879
let msg = send_message.message;
@@ -96,25 +97,51 @@ impl NodeManagerWorker {
9697
#[derive(Debug, Clone, Encode, Decode, CborLen, Message)]
9798
#[rustfmt::skip]
9899
#[cbor(map)]
99-
pub struct SendMessage {
100+
pub struct SendMessage<T: Message> {
100101
#[n(1)] pub route: String,
101-
#[n(2)] pub message: Vec<u8>,
102+
#[n(2)] pub message: T,
102103
}
103104

104-
impl Encodable for SendMessage {
105+
impl<T: Message> SendMessage<T> {
106+
fn encode_send_message<W>(self, buf: W) -> Result<(), encode::Error<W::Error>>
107+
where
108+
W: Write,
109+
{
110+
let mut e = Encoder::new(buf);
111+
e.encode(&self.route)?;
112+
e.writer_mut()
113+
.write_all(&<T as Encodable>::encode(self.message).map_err(encode::Error::message)?)
114+
.map_err(|_| encode::Error::message("encoding error"))?;
115+
Ok(())
116+
}
117+
118+
fn into_vec(self) -> Result<Vec<u8>, encode::Error<<Vec<u8> as Write>::Error>> {
119+
let mut buf = Vec::new();
120+
self.encode_send_message(&mut buf)?;
121+
Ok(buf)
122+
}
123+
}
124+
125+
impl<T: Message> Encodable for SendMessage<T> {
105126
fn encode(self) -> Result<Encoded> {
106-
Ok(minicbor::to_vec(self)?)
127+
Ok(self.into_vec()?)
107128
}
108129
}
109130

110-
impl Decodable for SendMessage {
131+
impl<T: Message> Decodable for SendMessage<T> {
111132
fn decode(e: &[u8]) -> Result<Self> {
112-
Ok(minicbor::decode(e)?)
133+
let mut dec = Decoder::new(e);
134+
let route: String = dec.decode()?;
135+
let message = dec.input().get(dec.position()..e.len()).unwrap();
136+
Ok(SendMessage {
137+
route,
138+
message: <T as Decodable>::decode(message)?,
139+
})
113140
}
114141
}
115142

116-
impl SendMessage {
117-
pub fn new(route: &MultiAddr, message: Vec<u8>) -> Self {
143+
impl<T: Message> SendMessage<T> {
144+
pub fn new(route: &MultiAddr, message: T) -> Self {
118145
Self {
119146
route: route.to_string(),
120147
message,

Diff for: implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,11 @@ impl NodeManagerWorker {
266266

267267
// ==*== Messages ==*==
268268
(Post, ["v0", "message"]) => {
269-
let send_message: SendMessage = Decodable::decode(&body)?;
269+
let send_message: SendMessage<Vec<u8>> = Decodable::decode(&body)?;
270270
encode_response(
271271
&header,
272-
self.send_message::<Vec<u8>>(ctx, send_message).await,
272+
self.send_message::<Vec<u8>, Vec<u8>>(ctx, send_message)
273+
.await,
273274
)
274275
}
275276

Diff for: implementations/rust/ockam/ockam_api/src/uppercase.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ impl Worker for Uppercase {
99

1010
#[instrument(skip_all, name = "Uppercase::handle_message")]
1111
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
12-
ctx.send(
13-
msg.return_route().clone(),
14-
msg.into_body()?.to_uppercase().as_bytes().to_vec(),
15-
)
16-
.await
12+
let return_route = msg.return_route().clone();
13+
ctx.send(return_route.clone(), msg.into_body()?.to_uppercase())
14+
.await
1715
}
1816
}

Diff for: implementations/rust/ockam/ockam_command/src/message/send.rs

+25-23
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use tracing::info;
66
use ockam::Context;
77
use ockam_api::address::extract_address_value;
88
use ockam_api::nodes::service::messages::Messages;
9-
use ockam_api::nodes::BackgroundNodeClient;
109
use ockam_api::nodes::InMemoryNode;
11-
use ockam_core::Encodable;
10+
use ockam_api::nodes::{BackgroundNodeClient, NodeManager};
1211
use ockam_multiaddr::MultiAddr;
1312

1413
use crate::project::util::{
@@ -74,11 +73,9 @@ impl Command for SendCommand {
7473

7574
// Setup environment depending on whether we are sending the message from a background node
7675
// or an in-memory node
77-
let response: Vec<u8> = if let Some(node) = &self.from {
78-
BackgroundNodeClient::create_to_node(ctx, &opts.state, node.as_str())?
79-
.send_message(ctx, &to, self.get_message()?, Some(self.timeout.timeout))
80-
.await
81-
.map_err(Error::Retry)?
76+
let result = if let Some(node) = &self.from {
77+
let client = BackgroundNodeClient::create_to_node(ctx, &opts.state, node.as_str())?;
78+
self.send_message(&client, ctx, &to).await?
8279
} else {
8380
let identity_name = opts
8481
.state
@@ -113,18 +110,8 @@ impl Command for SendCommand {
113110
.map_err(Error::Retry)?;
114111
let to = clean_projects_multiaddr(to, projects_sc)?;
115112
info!("sending to {to}");
116-
node_manager
117-
.send_message(ctx, &to, self.get_message()?, Some(self.timeout.timeout))
118-
.await
119-
.map_err(Error::Retry)?
120-
};
121-
122-
let result = if self.hex {
123-
hex::encode(response)
124-
} else {
125-
String::from_utf8(response)
126-
.into_diagnostic()
127-
.context("Received content is not a valid utf8 string")?
113+
let n: &NodeManager = &node_manager;
114+
self.send_message(n, ctx, &to).await?
128115
};
129116

130117
opts.terminal.stdout().plain(result).write_line()?;
@@ -133,13 +120,28 @@ impl Command for SendCommand {
133120
}
134121

135122
impl SendCommand {
136-
fn get_message(&self) -> crate::Result<Vec<u8>> {
123+
async fn send_message(
124+
self,
125+
client: &impl Messages,
126+
ctx: &Context,
127+
to: &MultiAddr,
128+
) -> crate::Result<String> {
137129
if self.hex {
138-
Ok(hex::decode(self.message.clone())
130+
let to_send = hex::decode(self.message.clone())
139131
.into_diagnostic()
140-
.context("The message is not a valid hex string")?)
132+
.context("The message is not a valid hex string")?;
133+
let response: Vec<u8> = client
134+
.send_message(ctx, &to, to_send, Some(self.timeout.timeout))
135+
.await
136+
.map_err(Error::Retry)?;
137+
Ok(hex::encode(response))
141138
} else {
142-
Ok(Encodable::encode(self.message.clone())?)
139+
client
140+
.send_message(ctx, &to, self.message, Some(self.timeout.timeout))
141+
.await
142+
.map_err(Error::Retry)
143+
.into_diagnostic()
144+
.context("Received content is not a valid utf8 string")
143145
}
144146
}
145147
}

0 commit comments

Comments
 (0)