Skip to content

Commit dc8b5e0

Browse files
committed
led works again
1 parent 817c3dd commit dc8b5e0

File tree

10 files changed

+483
-240
lines changed

10 files changed

+483
-240
lines changed

Diff for: examples/cc253x.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use pasts::Executor;
22
use rusty_zigbee_dongle::{
3-
cc253x::CC2531X,
3+
cc253x::CC253X,
44
coordinator::{Coordinator, LedStatus},
55
};
66
use std::path::PathBuf;
77

88
fn main() {
99
let looping = async {
10-
let cc2531 = CC2531X::from_path(PathBuf::from("/dev/ttyACM0"), 115_200).unwrap();
10+
let cc2531 = CC253X::from_path(PathBuf::from("/dev/ttyACM0"), 115_200).unwrap();
11+
1112
// Not all firmware versions support LED write as far as I understood
1213
let a = async {
1314
loop {

Diff for: src/cc253x.rs

+84-75
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,79 @@
11
use crate::{
22
coordinator::{AddressMode, Coordinator, CoordinatorError, LedStatus, ResetType},
3+
serial::{simple_serial_port::SimpleSerialPort, SimpleSerial},
4+
subscription::{Predicate, Subscription, SubscriptionService},
35
unpi::{
4-
commands::{get_command_by_name, ParameterValue, ParametersValueMap},
5-
LenTypeInfo, MessageType, Subsystem, UnpiPacket, MAX_FRAME_SIZE,
6+
commands::{get_command_by_name, ParameterValue},
7+
LenTypeInfo, MessageType, Subsystem, UnpiPacket,
68
},
7-
utils::{log, warnn},
9+
utils::log,
810
};
9-
use futures::lock::Mutex;
10-
use serialport::SerialPort;
11-
use std::{path::PathBuf, sync::Arc, time::Duration};
11+
use futures::{
12+
channel::oneshot::{self, Receiver, Sender},
13+
lock::Mutex,
14+
};
15+
use std::{path::PathBuf, sync::Arc};
1216

1317
//TODO: fix this
1418
const MAXIMUM_ZIGBEE_PAYLOAD_SIZE: usize = 255;
1519

16-
pub struct CC2531X {
20+
type Container = Vec<u8>;
21+
22+
pub struct CC253X<S: SimpleSerial> {
1723
_supports_led: Option<bool>,
18-
read: Arc<Mutex<Box<dyn SerialPort>>>,
19-
write: Arc<Mutex<Box<dyn SerialPort>>>,
24+
subscriptions: Arc<Mutex<SubscriptionService<UnpiPacket<Container>>>>,
25+
serial: Arc<Mutex<S>>,
2026
}
2127

22-
impl CC2531X {
28+
impl CC253X<SimpleSerialPort> {
2329
pub fn from_path(path: PathBuf, baud_rate: u32) -> Result<Self, CoordinatorError> {
24-
let serial = serialport::new(path.to_str().unwrap(), baud_rate)
25-
.timeout(Duration::from_millis(10))
26-
.open()
27-
.map_err(|_e| CoordinatorError::SerialOpen)?;
30+
let mut serial = SimpleSerialPort::new(
31+
path.to_str()
32+
.ok_or(CoordinatorError::Io("not a path".to_string()))?,
33+
baud_rate,
34+
)?;
35+
serial.start()?;
36+
let subscriptions = SubscriptionService::new();
2837
Ok(Self {
29-
read: Arc::new(Mutex::new(
30-
serial.try_clone().map_err(|_e| CoordinatorError::Io)?,
31-
)),
32-
write: Arc::new(Mutex::new(serial)),
38+
serial: Arc::new(Mutex::new(serial)),
3339
_supports_led: None,
40+
subscriptions: Arc::new(Mutex::new(subscriptions)),
3441
})
3542
}
43+
}
3644

45+
impl<S: SimpleSerial> CC253X<S> {
3746
pub async fn wait_for(
3847
&self,
3948
name: &str,
4049
message_type: MessageType,
4150
subsystem: Subsystem,
4251
_timeout: Option<std::time::Duration>,
43-
) -> Result<ParametersValueMap, CoordinatorError> {
52+
) -> Result<UnpiPacket<Container>, CoordinatorError> {
4453
log!("waiting for {:?}", name);
4554
let command =
4655
get_command_by_name(&subsystem, name).ok_or(CoordinatorError::NoCommandWithName)?;
47-
let mut buffer = [0; MAX_FRAME_SIZE];
48-
let lock = self.read.lock();
49-
let len = lock
56+
let subscriptions = self.subscriptions.clone();
57+
let (tx, rx): (
58+
Sender<UnpiPacket<Container>>,
59+
Receiver<UnpiPacket<Container>>,
60+
) = oneshot::channel();
61+
let packet = rx.await.map_err(|_| CoordinatorError::SubscriptionError)?;
62+
subscriptions
63+
.lock()
5064
.await
51-
.read(&mut buffer)
52-
.map_err(|_e| CoordinatorError::Io)?;
53-
let packet = UnpiPacket::from_payload(
54-
(&buffer[..len], LenTypeInfo::OneByte),
55-
(message_type, subsystem),
56-
command.id,
57-
)?;
58-
log!("<<< {:?}", packet);
59-
if packet.type_subsystem == (message_type, subsystem) && packet.command == command.id {
60-
let response = command.read_and_fill(packet.payload)?;
61-
Ok(response)
62-
} else {
63-
warnn!("rejecting packet: {:?}", packet);
64-
Err(CoordinatorError::Io)
65-
}
65+
.subscribe(Subscription::SingleShot(
66+
Predicate(Box::new(move |packet: &UnpiPacket<Container>| {
67+
packet.type_subsystem == (message_type, subsystem)
68+
&& packet.command == command.id
69+
})),
70+
tx,
71+
));
72+
Ok(packet)
6673
}
6774
}
6875

69-
impl Coordinator for CC2531X {
76+
impl<S: SimpleSerial> Coordinator for CC253X<S> {
7077
type ZclFrame = psila_data::cluster_library::ClusterLibraryHeader;
7178

7279
type ZclPayload<'a> = ([u8; MAXIMUM_ZIGBEE_PAYLOAD_SIZE], usize);
@@ -92,20 +99,21 @@ impl Coordinator for CC2531X {
9299
async fn version(&self) -> Result<Option<ParameterValue>, CoordinatorError> {
93100
let command = get_command_by_name(&Subsystem::Sys, "version")
94101
.ok_or(CoordinatorError::NoCommandWithName)?;
102+
let serial = self.serial.clone();
95103
let send = async {
96-
let mut lock = self.write.lock().await;
97-
UnpiPacket::from_command_to_serial_async(
98-
command.id,
99-
command,
100-
&[],
104+
let packet = UnpiPacket::from_command_owned(
105+
LenTypeInfo::OneByte,
101106
(MessageType::SREQ, Subsystem::Sys),
102-
&mut **lock,
103-
)
104-
.await
107+
&[],
108+
command,
109+
)?;
110+
serial.lock().await.write(&packet).await?;
111+
Ok::<(), CoordinatorError>(())
105112
};
106113
let wait = self.wait_for("version", MessageType::SRESP, Subsystem::Sys, None);
107-
let r = futures::try_join!(send, wait)?;
108-
Ok(r.1.get(&"majorrel").cloned())
114+
let (_, packet) = futures::try_join!(send, wait)?;
115+
let r = command.read_and_fill(packet.payload.as_slice())?;
116+
Ok(r.get(&"majorrel").cloned())
109117
}
110118

111119
async fn reset(&self, reset_type: ResetType) -> Result<(), CoordinatorError> {
@@ -116,24 +124,23 @@ impl Coordinator for CC2531X {
116124
ResetType::Hard => &[("type", ParameterValue::U8(0))],
117125
};
118126

119-
let mut lock = self.write.lock().await;
120-
UnpiPacket::from_command_to_serial(
121-
command.id,
122-
command,
127+
let serial = self.serial.clone();
128+
let packet = UnpiPacket::from_command_owned(
129+
LenTypeInfo::OneByte,
130+
(MessageType::SREQ, Subsystem::Sys),
123131
parameters,
124-
(MessageType::SREQ, Subsystem::Util),
125-
&mut **lock,
132+
command,
126133
)?;
127-
128-
Ok(())
134+
serial.lock().await.write(&packet).await?;
135+
Ok::<(), CoordinatorError>(())
129136
}
130137

131138
async fn set_led(&self, led_status: LedStatus) -> Result<(), CoordinatorError> {
132139
let command = get_command_by_name(&Subsystem::Util, "led_control")
133140
.ok_or(CoordinatorError::NoCommandWithName)?;
134141
//TODO: const firmwareControlsLed = parseInt(this.version.revision) >= 20211029;
135142
let firmware_controls_led = true;
136-
let mut lock = self.write.lock().await;
143+
137144
let parameters = match led_status {
138145
LedStatus::Disable => {
139146
if firmware_controls_led {
@@ -158,14 +165,14 @@ impl Coordinator for CC2531X {
158165
],
159166
};
160167

161-
UnpiPacket::from_command_to_serial(
162-
command.id,
163-
command,
164-
parameters,
168+
let serial = self.serial.clone();
169+
let packet = UnpiPacket::from_command_owned(
170+
LenTypeInfo::OneByte,
165171
(MessageType::SREQ, Subsystem::Util),
166-
&mut **lock,
172+
parameters,
173+
command,
167174
)?;
168-
175+
serial.lock().await.write(&packet).await?;
169176
Ok(())
170177
}
171178

@@ -192,14 +199,15 @@ impl Coordinator for CC2531X {
192199

193200
let command = get_command_by_name(&Subsystem::Zdo, "management_network_update_request")
194201
.ok_or(CoordinatorError::NoCommandWithName)?;
195-
let mut lock = self.write.lock().await;
196-
UnpiPacket::from_command_to_serial(
197-
command.id,
198-
command,
199-
parameters,
202+
203+
let serial = self.serial.clone();
204+
let packet = UnpiPacket::from_command_owned(
205+
LenTypeInfo::OneByte,
200206
(MessageType::SREQ, Subsystem::Zdo),
201-
&mut **lock,
207+
parameters,
208+
command,
202209
)?;
210+
serial.lock().await.write(&packet).await?;
203211

204212
Ok(())
205213
}
@@ -212,14 +220,15 @@ impl Coordinator for CC2531X {
212220

213221
let command = get_command_by_name(&Subsystem::Zdo, "stack_tune")
214222
.ok_or(CoordinatorError::NoCommandWithName)?;
215-
let mut lock = self.write.lock().await;
216-
UnpiPacket::from_command_to_serial(
217-
command.id,
218-
command,
219-
parameters,
223+
224+
let serial = self.serial.clone();
225+
let packet = UnpiPacket::from_command_owned(
226+
LenTypeInfo::OneByte,
220227
(MessageType::SREQ, Subsystem::Zdo),
221-
&mut **lock,
228+
parameters,
229+
command,
222230
)?;
231+
serial.lock().await.write(&packet).await?;
223232
Ok(())
224233
}
225234

Diff for: src/coordinator.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,24 @@ pub enum ResetType {
5656

5757
#[derive(Debug)]
5858
pub enum CoordinatorError {
59-
SerialOpen,
59+
SerialOpen(String),
6060
SerialWrite,
6161
SerialRead,
6262
NoCommandWithName,
63-
Io,
63+
Io(String),
6464
Parameter(ParameterError),
6565
InvalidChannel,
6666
ResponseMismatch,
6767
Map(MapError),
6868
NoRequest,
6969
NoResponse,
70-
70+
SerialChannelMissing,
71+
SubscriptionError
7172
}
7273

7374
impl From<std::io::Error> for CoordinatorError {
74-
fn from(_e: std::io::Error) -> Self {
75-
CoordinatorError::Io
75+
fn from(e: std::io::Error) -> Self {
76+
CoordinatorError::Io(e.to_string())
7677
}
7778
}
7879

Diff for: src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod utils;
44
#[cfg(feature="cc2531x")]
55
pub mod cc253x;
66
pub mod serial;
7+
pub mod subscription;
78

89

910
#[cfg(test)]

0 commit comments

Comments
 (0)