Skip to content

Commit c698a3c

Browse files
committed
channel
1 parent dc8b5e0 commit c698a3c

File tree

6 files changed

+97
-70
lines changed

6 files changed

+97
-70
lines changed

src/cc253x.rs

+30-19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
coordinator::{AddressMode, Coordinator, CoordinatorError, LedStatus, ResetType},
3-
serial::{simple_serial_port::SimpleSerialPort, SimpleSerial},
3+
serial::{simple_serial_port::SimpleSerialPort, SubscriptionSerial},
44
subscription::{Predicate, Subscription, SubscriptionService},
55
unpi::{
66
commands::{get_command_by_name, ParameterValue},
@@ -19,30 +19,32 @@ const MAXIMUM_ZIGBEE_PAYLOAD_SIZE: usize = 255;
1919

2020
type Container = Vec<u8>;
2121

22-
pub struct CC253X<S: SimpleSerial> {
22+
pub struct CC253X<S: SubscriptionSerial> {
2323
_supports_led: Option<bool>,
2424
subscriptions: Arc<Mutex<SubscriptionService<UnpiPacket<Container>>>>,
2525
serial: Arc<Mutex<S>>,
2626
}
2727

2828
impl CC253X<SimpleSerialPort> {
2929
pub fn from_path(path: PathBuf, baud_rate: u32) -> Result<Self, CoordinatorError> {
30+
let subscriptions = Arc::new(Mutex::new(SubscriptionService::new()));
31+
3032
let mut serial = SimpleSerialPort::new(
3133
path.to_str()
3234
.ok_or(CoordinatorError::Io("not a path".to_string()))?,
3335
baud_rate,
36+
subscriptions.clone(),
3437
)?;
3538
serial.start()?;
36-
let subscriptions = SubscriptionService::new();
3739
Ok(Self {
3840
serial: Arc::new(Mutex::new(serial)),
3941
_supports_led: None,
40-
subscriptions: Arc::new(Mutex::new(subscriptions)),
42+
subscriptions: subscriptions.clone(),
4143
})
4244
}
4345
}
4446

45-
impl<S: SimpleSerial> CC253X<S> {
47+
impl<S: SubscriptionSerial> CC253X<S> {
4648
pub async fn wait_for(
4749
&self,
4850
name: &str,
@@ -54,26 +56,33 @@ impl<S: SimpleSerial> CC253X<S> {
5456
let command =
5557
get_command_by_name(&subsystem, name).ok_or(CoordinatorError::NoCommandWithName)?;
5658
let subscriptions = self.subscriptions.clone();
57-
let (tx, rx): (
59+
let (tx, mut rx): (
5860
Sender<UnpiPacket<Container>>,
5961
Receiver<UnpiPacket<Container>>,
6062
) = oneshot::channel();
63+
64+
let mut s = subscriptions.lock().await;
65+
let subscription = Subscription::SingleShot(
66+
Predicate(Box::new(move |packet: &UnpiPacket<Container>| {
67+
packet.type_subsystem == (message_type, subsystem) && packet.command == command.id
68+
})),
69+
tx,
70+
);
71+
s.subscribe(subscription);
72+
drop(s);
73+
log!("waiting for packet");
74+
//println!("{:?}", rx.try_recv());
6175
let packet = rx.await.map_err(|_| CoordinatorError::SubscriptionError)?;
62-
subscriptions
63-
.lock()
64-
.await
65-
.subscribe(Subscription::SingleShot(
66-
Predicate(Box::new(move |packet: &UnpiPacket<Container>| {
67-
packet.type_subsystem == (message_type, subsystem)
68-
&& packet.command == command.id
69-
})),
70-
tx,
71-
));
76+
// loop {
77+
// println!("{:?}", rx.try_recv());
78+
// std::thread::sleep(std::time::Duration::from_millis(1000));
79+
// }
80+
log!("returning packet");
7281
Ok(packet)
7382
}
7483
}
7584

76-
impl<S: SimpleSerial> Coordinator for CC253X<S> {
85+
impl<S: SubscriptionSerial> Coordinator for CC253X<S> {
7786
type ZclFrame = psila_data::cluster_library::ClusterLibraryHeader;
7887

7988
type ZclPayload<'a> = ([u8; MAXIMUM_ZIGBEE_PAYLOAD_SIZE], usize);
@@ -100,6 +109,7 @@ impl<S: SimpleSerial> Coordinator for CC253X<S> {
100109
let command = get_command_by_name(&Subsystem::Sys, "version")
101110
.ok_or(CoordinatorError::NoCommandWithName)?;
102111
let serial = self.serial.clone();
112+
let wait = self.wait_for("version", MessageType::SRESP, Subsystem::Sys, None);
103113
let send = async {
104114
let packet = UnpiPacket::from_command_owned(
105115
LenTypeInfo::OneByte,
@@ -110,8 +120,9 @@ impl<S: SimpleSerial> Coordinator for CC253X<S> {
110120
serial.lock().await.write(&packet).await?;
111121
Ok::<(), CoordinatorError>(())
112122
};
113-
let wait = self.wait_for("version", MessageType::SRESP, Subsystem::Sys, None);
114-
let (_, packet) = futures::try_join!(send, wait)?;
123+
println!("waiting for future!!!!");
124+
let (_s, packet) = futures::try_join!(send, wait)?;
125+
println!("returned future!!!!");
115126
let r = command.read_and_fill(packet.payload.as_slice())?;
116127
Ok(r.get(&"majorrel").cloned())
117128
}

src/serial/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@ use std::future::Future;
1111

1212
pub mod simple_serial_port;
1313

14-
pub trait SimpleSerial {
14+
pub trait SubscriptionSerial {
1515
type Sender;
1616
type Receiver;
1717

18-
fn read(&mut self) -> impl Future<Output = Result<UnpiPacket<Vec<u8>>, CoordinatorError>>;
1918
fn write(
2019
&mut self,
2120
packet: &UnpiPacket<Vec<u8>>,
@@ -80,4 +79,6 @@ pub enum SerialThreadError {
8079
SerialRead,
8180
SerialWrite,
8281
MalformedPacket,
82+
SubscriptionWrite,
83+
PacketParse
8384
}

src/serial/simple_serial_port.rs

+29-42
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,56 @@
1-
use super::{SerialThreadError, SimpleSerial};
1+
use super::{SerialThreadError, SubscriptionSerial};
22
use crate::{
33
coordinator::CoordinatorError,
4-
unpi::{LenTypeInfo, MessageType, Subsystem, UnpiPacket},
4+
subscription::SubscriptionService,
5+
unpi::{LenTypeInfo, UnpiPacket},
56
utils::log,
67
};
7-
use futures::channel::mpsc::{Receiver, Sender};
88
use futures::StreamExt;
99
use futures::{channel::mpsc, executor::block_on, SinkExt};
10-
use std::thread::JoinHandle;
10+
use futures::{
11+
channel::mpsc::{Receiver, Sender},
12+
lock::Mutex,
13+
};
14+
use std::{sync::Arc, thread::JoinHandle};
1115

1216
type Container = Vec<u8>;
1317

1418
const DEFAULT_READ_TIMEOUT_MS: u64 = 10;
15-
const DEFAULT_WRITE_TIMEOUT_MS: u64 = 10;
1619

1720
// Simplest possible serial port implementation
1821
pub struct SimpleSerialPort {
1922
path: String,
2023
baud_rate: u32,
21-
// from the serial port to the coordinator
22-
serial_to: (
23-
Option<Sender<UnpiPacket<Container>>>,
24-
Option<Receiver<UnpiPacket<Container>>>,
25-
),
2624
// from the coordinator to the serial port
2725
to_serial: (
2826
Option<Sender<UnpiPacket<Container>>>,
2927
Option<Receiver<UnpiPacket<Container>>>,
3028
),
3129
read_thread: Option<JoinHandle<Result<(), SerialThreadError>>>,
3230
write_thread: Option<JoinHandle<Result<(), SerialThreadError>>>,
31+
subscription_service: Arc<Mutex<SubscriptionService<UnpiPacket<Container>>>>,
3332
}
3433

3534
impl SimpleSerialPort {
36-
pub fn new(path: &str, baud_rate: u32) -> Result<Self, CoordinatorError> {
37-
let serial_to = mpsc::channel(10);
38-
let serial_to = (Some(serial_to.0), Some(serial_to.1));
35+
pub fn new(
36+
path: &str,
37+
baud_rate: u32,
38+
subscription_service: Arc<Mutex<SubscriptionService<UnpiPacket<Container>>>>,
39+
) -> Result<Self, CoordinatorError> {
3940
let to_serial = mpsc::channel(10);
4041
let to_serial = (Some(to_serial.0), Some(to_serial.1));
4142
Ok(SimpleSerialPort {
4243
path: path.to_string(),
4344
baud_rate,
44-
serial_to,
4545
to_serial,
4646
read_thread: None,
4747
write_thread: None,
48+
subscription_service,
4849
})
4950
}
5051
}
5152

52-
impl SimpleSerial for SimpleSerialPort {
53+
impl SubscriptionSerial for SimpleSerialPort {
5354
type Sender = Sender<UnpiPacket<Container>>;
5455
type Receiver = Receiver<UnpiPacket<Container>>;
5556

@@ -61,25 +62,24 @@ impl SimpleSerial for SimpleSerialPort {
6162
let mut write = read
6263
.try_clone()
6364
.map_err(|e| CoordinatorError::SerialOpen(e.to_string()))?;
64-
let mut tx = self
65-
.serial_to
66-
.0
67-
.take()
68-
.ok_or(CoordinatorError::SerialChannelMissing)?;
65+
66+
let subscription_service = self.subscription_service.clone();
6967
let receive_from_serial_send_to_channel = move || -> Result<(), SerialThreadError> {
7068
loop {
7169
let mut buffer = [0u8; 256];
7270
let len = read
7371
.read(&mut buffer)
7472
.map_err(|_e| SerialThreadError::SerialRead)?;
75-
let packet: UnpiPacket<Vec<u8>> = UnpiPacket::from_payload_owned(
76-
(&buffer[..len].to_vec(), LenTypeInfo::OneByte),
77-
(MessageType::SREQ, Subsystem::Sys),
78-
0,
79-
)
80-
.map_err(|_| SerialThreadError::MalformedPacket)?;
81-
log!("<<< {:?}", packet);
82-
block_on(tx.send(packet)).map_err(|_e| SerialThreadError::SerialWrite)?;
73+
if let Some(start_of_frame_position) = buffer.iter().position(|&x| x == 0xfe) {
74+
let packet: UnpiPacket<Vec<u8>> = UnpiPacket::try_from((
75+
&buffer[start_of_frame_position..len],
76+
LenTypeInfo::OneByte,
77+
))
78+
.map_err(|_e| SerialThreadError::PacketParse)?
79+
.to_owned();
80+
let send = async { subscription_service.lock().await.notify(packet) };
81+
block_on(send).map_err(|_| SerialThreadError::SubscriptionWrite)?;
82+
}
8383
}
8484
};
8585
let mut rx = self
@@ -93,8 +93,7 @@ impl SimpleSerial for SimpleSerialPort {
9393
log!(">>> {:?}", packet);
9494
packet
9595
.to_serial(&mut *write)
96-
.map_err(|_e| SerialThreadError::SerialWrite)
97-
.unwrap();
96+
.map_err(|_e| SerialThreadError::SerialWrite)?;
9897
}
9998
Ok::<(), SerialThreadError>(())
10099
})?;
@@ -107,18 +106,6 @@ impl SimpleSerial for SimpleSerialPort {
107106
Ok(())
108107
}
109108

110-
async fn read(&mut self) -> Result<UnpiPacket<Vec<u8>>, CoordinatorError> {
111-
let rx = self
112-
.serial_to
113-
.1
114-
.as_mut()
115-
.ok_or(CoordinatorError::SerialChannelMissing)?;
116-
rx.next()
117-
.await
118-
.ok_or(CoordinatorError::NoResponse)
119-
.map_err(|_e| CoordinatorError::SerialRead)
120-
}
121-
122109
async fn write(&mut self, packet: &UnpiPacket<Vec<u8>>) -> Result<(), CoordinatorError> {
123110
let tx = self
124111
.to_serial

src/subscription.rs

+13-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::utils::{err, log};
12
use std::collections::VecDeque;
23

34
pub struct Predicate<T>(pub Box<dyn Fn(&T) -> bool + Send + Sync>);
@@ -31,14 +32,15 @@ pub struct SubscriptionService<T> {
3132
subscriptions: VecDeque<Subscription<T>>,
3233
}
3334

34-
impl<T: Clone + PartialEq> SubscriptionService<T> {
35+
impl<T: Clone + PartialEq + std::fmt::Debug> SubscriptionService<T> {
3536
pub fn new() -> Self {
3637
Self {
3738
subscriptions: VecDeque::new(),
3839
}
3940
}
4041

4142
pub fn subscribe(&mut self, subscription: Subscription<T>) {
43+
log!("adding subscription {:?}", subscription);
4244
self.subscriptions.push_front(subscription);
4345
}
4446

@@ -54,17 +56,22 @@ impl<T: Clone + PartialEq> SubscriptionService<T> {
5456
.map(|x| (x.0, x.1.is_single_shot()))
5557
{
5658
if is_single_shot {
59+
println!("found single shot subscription");
5760
let subscription = self
5861
.subscriptions
5962
.remove(position)
6063
.ok_or(SubscriptionError::MissingSubscription)?;
61-
subscription
64+
println!("subscription: {:?}", subscription);
65+
println!("subscriptions: {:?}", self.subscriptions);
66+
let tx = subscription
6267
.to_single_shot()
6368
.ok_or(SubscriptionError::NotSingleShot)?
64-
.1
65-
.send(value.clone())
69+
.1;
70+
tx.send(value.clone())
6671
.map_err(|_| SubscriptionError::Send)?;
72+
log!("sent packet");
6773
} else {
74+
println!("found multiple shot subscription");
6875
let subscription = self.subscriptions.get_mut(position).unwrap();
6976
match subscription {
7077
Subscription::SingleShot(_, _) => return Err(SubscriptionError::Unreachable),
@@ -74,6 +81,8 @@ impl<T: Clone + PartialEq> SubscriptionService<T> {
7481
}
7582
}
7683
}
84+
} else {
85+
err!("No subscription found for {:?}", value);
7786
}
7887
Ok(())
7988
}

src/unpi/mod.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use commands::{Command, ParameterValue};
2-
31
use crate::{coordinator::CoordinatorError, utils::slice_reader::SliceReader};
2+
use commands::{Command, ParameterValue};
43
use std::{future::Future, io::Write};
54

65
pub mod commands;
@@ -363,6 +362,16 @@ impl<'a> UnpiPacket<&'a [u8]> {
363362
let fcs = h.checksum()?;
364363
Ok(UnpiPacket { fcs, ..h })
365364
}
365+
366+
pub fn to_owned(&self) -> UnpiPacket<Vec<u8>> {
367+
UnpiPacket {
368+
len: self.len.clone(),
369+
type_subsystem: self.type_subsystem.clone(),
370+
command: self.command,
371+
payload: self.payload.to_vec(),
372+
fcs: self.fcs,
373+
}
374+
}
366375
}
367376

368377
impl<T> UnpiPacket<T>

src/utils/mod.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,19 @@ macro_rules! log {
88
}
99
pub(crate) use log;
1010

11+
#[allow(unused_macros)]
1112
macro_rules! warnn {
1213
($($arg:tt)*) => {
1314
eprintln!($($arg)*);
1415
}
1516
}
16-
pub(crate) use warnn;
17+
#[allow(unused_imports)]
18+
pub(crate) use warnn;
19+
20+
21+
macro_rules! err {
22+
($($arg:tt)*) => {
23+
eprintln!($($arg)*);
24+
}
25+
}
26+
pub(crate) use err;

0 commit comments

Comments
 (0)