Skip to content

Commit 77b6900

Browse files
committed
async
1 parent 7553a16 commit 77b6900

File tree

5 files changed

+98
-20
lines changed

5 files changed

+98
-20
lines changed

examples/cc253x.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ fn main() {
2020
let b = async {
2121
println!("version: {:?}", cc2531.version().await);
2222
};
23-
futures::join!(a, b);
23+
futures::join!(b, a);
2424
};
2525
let executor = Executor::default();
2626

src/cc253x.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ const MAXIMUM_ZIGBEE_PAYLOAD_SIZE: usize = 255;
1515

1616
pub struct CC2531X {
1717
_supports_led: Option<bool>,
18-
serial: Arc<Mutex<Box<dyn SerialPort>>>,
18+
read: Arc<Mutex<Box<dyn SerialPort>>>,
19+
write: Arc<Mutex<Box<dyn SerialPort>>>,
1920
}
2021

2122
impl CC2531X {
@@ -25,7 +26,10 @@ impl CC2531X {
2526
.open()
2627
.map_err(|_e| CoordinatorError::SerialOpen)?;
2728
Ok(Self {
28-
serial: Arc::new(Mutex::new(serial)),
29+
read: Arc::new(Mutex::new(
30+
serial.try_clone().map_err(|_e| CoordinatorError::Io)?,
31+
)),
32+
write: Arc::new(Mutex::new(serial)),
2933
_supports_led: None,
3034
})
3135
}
@@ -37,12 +41,12 @@ impl CC2531X {
3741
subsystem: Subsystem,
3842
_timeout: Option<std::time::Duration>,
3943
) -> Result<ParametersValueMap, CoordinatorError> {
44+
log!("waiting for {:?}", name);
4045
let command =
4146
get_command_by_name(&subsystem, name).ok_or(CoordinatorError::NoCommandWithName)?;
4247
let mut buffer = [0; MAX_FRAME_SIZE];
43-
let len = self
44-
.serial
45-
.lock()
48+
let lock = self.read.lock();
49+
let len = lock
4650
.await
4751
.read(&mut buffer)
4852
.map_err(|_e| CoordinatorError::Io)?;
@@ -89,14 +93,15 @@ impl Coordinator for CC2531X {
8993
let command = get_command_by_name(&Subsystem::Sys, "version")
9094
.ok_or(CoordinatorError::NoCommandWithName)?;
9195
let send = async {
92-
let mut lock = self.serial.lock().await;
96+
let mut lock = self.write.lock().await;
9397
UnpiPacket::from_command_to_serial_async(
9498
command.id,
9599
command,
96100
&[],
97101
(MessageType::SREQ, Subsystem::Sys),
98102
&mut **lock,
99-
).await
103+
)
104+
.await
100105
};
101106
let wait = self.wait_for("version", MessageType::SRESP, Subsystem::Sys, None);
102107
let r = futures::try_join!(send, wait)?;
@@ -111,7 +116,7 @@ impl Coordinator for CC2531X {
111116
ResetType::Hard => &[("type", ParameterValue::U8(0))],
112117
};
113118

114-
let mut lock = self.serial.lock().await;
119+
let mut lock = self.write.lock().await;
115120
UnpiPacket::from_command_to_serial(
116121
command.id,
117122
command,
@@ -128,7 +133,7 @@ impl Coordinator for CC2531X {
128133
.ok_or(CoordinatorError::NoCommandWithName)?;
129134
//TODO: const firmwareControlsLed = parseInt(this.version.revision) >= 20211029;
130135
let firmware_controls_led = true;
131-
let mut lock = self.serial.lock().await;
136+
let mut lock = self.write.lock().await;
132137
let parameters = match led_status {
133138
LedStatus::Disable => {
134139
if firmware_controls_led {
@@ -187,7 +192,7 @@ impl Coordinator for CC2531X {
187192

188193
let command = get_command_by_name(&Subsystem::Zdo, "management_network_update_request")
189194
.ok_or(CoordinatorError::NoCommandWithName)?;
190-
let mut lock = self.serial.lock().await;
195+
let mut lock = self.write.lock().await;
191196
UnpiPacket::from_command_to_serial(
192197
command.id,
193198
command,
@@ -207,7 +212,7 @@ impl Coordinator for CC2531X {
207212

208213
let command = get_command_by_name(&Subsystem::Zdo, "stack_tune")
209214
.ok_or(CoordinatorError::NoCommandWithName)?;
210-
let mut lock = self.serial.lock().await;
215+
let mut lock = self.write.lock().await;
211216
UnpiPacket::from_command_to_serial(
212217
command.id,
213218
command,

src/coordinator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub enum ResetType {
5858
pub enum CoordinatorError {
5959
SerialOpen,
6060
SerialWrite,
61+
SerialRead,
6162
NoCommandWithName,
6263
Io,
6364
Parameter(ParameterError),
@@ -66,6 +67,7 @@ pub enum CoordinatorError {
6667
Map(MapError),
6768
NoRequest,
6869
NoResponse,
70+
6971
}
7072

7173
impl From<std::io::Error> for CoordinatorError {

src/serial.rs

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,79 @@ use crate::{
22
coordinator::CoordinatorError,
33
unpi::{
44
commands::{Command, ParameterValue},
5-
LenTypeInfo, MessageType, Subsystem, UnpiPacket, MAX_PAYLOAD_SIZE,
6-
}, utils::log,
5+
LenTypeInfo, MessageType, Subsystem, UnpiPacket, UnpiPacketSink, MAX_PAYLOAD_SIZE,
6+
},
7+
utils::log,
8+
};
9+
use futures::channel::oneshot;
10+
use futures::{
11+
channel::oneshot::{Receiver, Sender},
12+
lock::Mutex,
13+
AsyncReadExt, AsyncWrite, AsyncWriteExt,
714
};
8-
use futures::executor::block_on;
915
use serialport::SerialPort;
16+
use std::{future::Future, sync::Arc};
17+
use futures::task::Spawn;
1018

11-
impl<'a> UnpiPacket<'a> {
19+
pub struct SimpleSerialPort {
20+
read: Arc<Mutex<Box<dyn SerialPort>>>,
21+
write: Arc<Mutex<Box<dyn SerialPort>>>,
22+
tx: Sender<UnpiPacket<'static>>,
23+
rx: Receiver<UnpiPacket<'static>>,
24+
}
25+
26+
impl SimpleSerialPort {
27+
pub fn new(path: &str, baud_rate: u32) -> Result<Self, CoordinatorError> {
28+
let (tx, rx) = oneshot::channel();
29+
Ok(SimpleSerialPort {
30+
read: Arc::new(Mutex::new(
31+
serialport::new(path, baud_rate)
32+
.timeout(std::time::Duration::from_millis(10))
33+
.open()
34+
.map_err(|_e| CoordinatorError::SerialOpen)?,
35+
)),
36+
write: Arc::new(Mutex::new(
37+
serialport::new(path, baud_rate)
38+
.timeout(std::time::Duration::from_millis(10))
39+
.open()
40+
.map_err(|_e| CoordinatorError::SerialOpen)?,
41+
)),
42+
tx,
43+
rx,
44+
})
45+
}
1246

47+
pub async fn start(&self) -> Result<(), CoordinatorError> {
48+
let mut read = self.read.lock().await;
49+
let mut write = self.write.lock().await;
50+
let (tx, rx) = (self.tx, self.rx);
51+
let mut buffer = [0u8; 256];
52+
let receive = async {
53+
loop {
54+
let len = read.read(&mut buffer).map_err(|_e| CoordinatorError::SerialRead)?;
55+
let packet = UnpiPacket::from_payload(
56+
(&buffer[..len], LenTypeInfo::OneByte),
57+
(MessageType::SREQ, Subsystem::Sys),
58+
0,
59+
)?;
60+
log!("<<< {:?}", packet);
61+
tx.send(packet).map_err(|_e| CoordinatorError::SerialWrite)?;
62+
}
63+
};
64+
let send = async {
65+
loop {
66+
let packet = rx.await.map_err(|_e| CoordinatorError::SerialWrite)?;
67+
let mut buffer = [0u8; 256];
68+
packet.to_bytes(&mut buffer)?;
69+
write.write_all(buffer).map_err(|_e| CoordinatorError::SerialWrite)?;
70+
71+
}
72+
};
73+
todo!()
74+
}
75+
}
76+
77+
impl<'a> UnpiPacket<'a> {
1378
/// Serialized the packet to the serial port
1479
pub fn to_serial<S: SerialPort + ?Sized>(
1580
&self,
@@ -52,8 +117,6 @@ impl<'a> UnpiPacket<'a> {
52117
type_subsystem: (MessageType, Subsystem),
53118
serial: &mut S,
54119
) -> Result<(), CoordinatorError> {
55-
block_on(async move {
56-
Self::from_command_to_serial(command_id, command, parameters, type_subsystem, serial)
57-
})
120+
Self::from_command_to_serial(command_id, command, parameters, type_subsystem, serial)
58121
}
59122
}

src/unpi/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::utils::slice_reader::SliceReader;
2-
use std::io::Write;
2+
use std::{future::Future, io::Write};
33

44
pub mod commands;
55
pub mod subsystems;
@@ -24,6 +24,14 @@ pub struct UnpiPacket<'a> {
2424
pub fcs: u8,
2525
}
2626

27+
pub trait UnpiPacketSink {
28+
fn write(&mut self, packet: &UnpiPacket) -> impl Future<Output = Result<(), UnpiPacketError>>;
29+
}
30+
31+
pub trait UnpiPacketSource {
32+
fn read(&mut self) -> impl Future<Output = Result<UnpiPacket, UnpiPacketError>>;
33+
}
34+
2735
struct Wrapped<T>(T);
2836

2937
#[derive(Debug, PartialEq, Copy, Clone)]

0 commit comments

Comments
 (0)