Skip to content

Commit be457ae

Browse files
committed
zephyr: Add sync::channel support
Provide an implementation of channels, inspired by crossbeam-channel. Currently, unbounded channels that use allocation are implemented. Signed-off-by: David Brown <[email protected]>
1 parent c71ed4b commit be457ae

File tree

4 files changed

+336
-0
lines changed

4 files changed

+336
-0
lines changed

zephyr/src/sync.rs

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use core::{
1515
use crate::time::Forever;
1616
use crate::sys::sync as sys;
1717

18+
pub mod channel;
19+
1820
pub mod atomic {
1921
//! Re-export portable atomic.
2022
//!

zephyr/src/sync/channel.rs

+183
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
//! Close-to-Zephyr channels
2+
//!
3+
//! This module attempts to provide a mechanism as close as possible to `crossbeam-channel` as we
4+
//! can get, directly using Zephyr primitives.
5+
//!
6+
//! The channels are built around `k_queue` in Zephyr. As is the case with most Zephyr types,
7+
//! these are typically statically allocated. Similar to the other close-to-zephyr primitives,
8+
//! this means that there is a constructor that can directly take one of these primitives.
9+
//!
10+
//! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr.
11+
//! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an
12+
//! internal `Arc` inside them, but without the overhead of an actual Arc.
13+
14+
extern crate alloc;
15+
16+
use alloc::boxed::Box;
17+
18+
use core::ffi::c_void;
19+
use core::fmt;
20+
use core::marker::PhantomData;
21+
22+
use crate::sys::queue::Queue;
23+
24+
mod counter;
25+
26+
// The zephyr queue does not allocate or manage the data of the messages, so we need to handle
27+
// allocation as such as well. However, we don't need to manage anything, so it is sufficient to
28+
// simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap
29+
// it back into a Box, and give it to the recipient.
30+
31+
/// Create a multi-producer multi-consumer channel of unbounded capacity. The messages are
32+
/// allocated individually as "Box", and the queue is managed by the underlying Zephyr queue.
33+
pub fn unbounded_from<T>(queue: Queue) -> (Sender<T>, Receiver<T>) {
34+
let (s, r) = counter::new(queue);
35+
let s = Sender {
36+
queue: s,
37+
_phantom: PhantomData,
38+
};
39+
let r = Receiver {
40+
queue: r,
41+
_phantom: PhantomData,
42+
};
43+
(s, r)
44+
}
45+
46+
#[repr(C)]
47+
struct Message<T> {
48+
/// The private data used by the kernel to enqueue messages and such.
49+
_private: usize,
50+
/// The actual data being transported.
51+
data: T,
52+
}
53+
54+
impl<T> Message<T> {
55+
fn new(data: T) -> Message<T> {
56+
Message {
57+
_private: 0,
58+
data,
59+
}
60+
}
61+
}
62+
63+
/// The sending side of a channel.
64+
pub struct Sender<T> {
65+
queue: counter::Sender<Queue>,
66+
_phantom: PhantomData<T>,
67+
}
68+
69+
unsafe impl<T: Send> Send for Sender<T> {}
70+
unsafe impl<T: Send> Sync for Sender<T> {}
71+
72+
impl<T> Sender<T> {
73+
/// Sends a message over the given channel. This will perform an alloc of the message, which
74+
/// will have an accompanied free on the recipient side.
75+
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
76+
let msg = Box::new(Message::new(msg));
77+
let msg = Box::into_raw(msg);
78+
unsafe {
79+
self.queue.send(msg as *mut c_void);
80+
}
81+
Ok(())
82+
}
83+
}
84+
85+
impl<T> Drop for Sender<T> {
86+
fn drop(&mut self) {
87+
unsafe {
88+
self.queue.release(|_| {
89+
crate::printkln!("Release");
90+
true
91+
})
92+
}
93+
}
94+
}
95+
96+
impl<T> Clone for Sender<T> {
97+
fn clone(&self) -> Self {
98+
Sender {
99+
queue: self.queue.acquire(),
100+
_phantom: PhantomData,
101+
}
102+
}
103+
}
104+
105+
impl<T: fmt::Debug> fmt::Debug for Sender<T> {
106+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107+
write!(f, "Sender {:?}", *self.queue)
108+
}
109+
}
110+
111+
/// The receiving side of a channel.
112+
pub struct Receiver<T> {
113+
queue: counter::Receiver<Queue>,
114+
_phantom: PhantomData<T>,
115+
}
116+
117+
unsafe impl<T: Send> Send for Receiver<T> {}
118+
unsafe impl<T: Send> Sync for Receiver<T> {}
119+
120+
impl<T> Receiver<T> {
121+
/// Blocks the current thread until a message is received or the channel is empty and
122+
/// disconnected.
123+
///
124+
/// If the channel is empty and not disconnected, this call will block until the receive
125+
/// operation can proceed. If the channel is empty and becomes disconnected, this call will
126+
/// wake up and return an error.
127+
pub fn recv(&self) -> Result<T, RecvError> {
128+
let msg = unsafe {
129+
self.queue.recv()
130+
};
131+
let msg = msg as *mut Message<T>;
132+
let msg = unsafe { Box::from_raw(msg) };
133+
Ok(msg.data)
134+
}
135+
}
136+
137+
impl<T> Drop for Receiver<T> {
138+
fn drop(&mut self) {
139+
unsafe {
140+
self.queue.release(|_| {
141+
crate::printkln!("Release");
142+
true
143+
})
144+
}
145+
}
146+
}
147+
148+
impl<T> Clone for Receiver<T> {
149+
fn clone(&self) -> Self {
150+
Receiver {
151+
queue: self.queue.acquire(),
152+
_phantom: PhantomData,
153+
}
154+
}
155+
}
156+
157+
impl<T> fmt::Debug for Receiver<T> {
158+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159+
write!(f, "Sender {:?}", *self.queue)
160+
}
161+
}
162+
163+
// TODO: Move to err
164+
165+
/// An error returned from the [`send`] method.
166+
///
167+
/// The message could not be sent because the channel is disconnected.
168+
///
169+
/// The error contains the message so it can be recovered.
170+
#[derive(PartialEq, Eq, Clone, Copy)]
171+
pub struct SendError<T>(pub T);
172+
173+
impl<T> fmt::Debug for SendError<T> {
174+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175+
"SendError(..)".fmt(f)
176+
}
177+
}
178+
179+
/// An error returned from the [`recv`] method.
180+
///
181+
/// A message could not be received because the channel is empty and disconnected.
182+
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
183+
pub struct RecvError;

zephyr/src/sync/channel/counter.rs

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
//! Reference counter for channels.
3+
4+
// This file is taken from crossbeam-channels, with modifications to be nostd.
5+
6+
7+
extern crate alloc;
8+
9+
use alloc::boxed::Box;
10+
use core::ops;
11+
use core::ptr::NonNull;
12+
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13+
14+
/// Reference counter internals.
15+
struct Counter<C> {
16+
/// The number of senders associated with the channel.
17+
senders: AtomicUsize,
18+
19+
/// The number of receivers associated with the channel.
20+
receivers: AtomicUsize,
21+
22+
/// Set to `true` if the last sender or the last receiver reference deallocates the channel.
23+
destroy: AtomicBool,
24+
25+
/// The internal channel.
26+
chan: C,
27+
}
28+
29+
/// Wraps a channel into the reference counter.
30+
pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
31+
let counter = NonNull::from(Box::leak(Box::new(Counter {
32+
senders: AtomicUsize::new(1),
33+
receivers: AtomicUsize::new(1),
34+
destroy: AtomicBool::new(false),
35+
chan,
36+
})));
37+
let s = Sender { counter };
38+
let r = Receiver { counter };
39+
(s, r)
40+
}
41+
42+
/// The sending side.
43+
pub(crate) struct Sender<C> {
44+
counter: NonNull<Counter<C>>,
45+
}
46+
47+
impl<C> Sender<C> {
48+
/// Returns the internal `Counter`.
49+
fn counter(&self) -> &Counter<C> {
50+
unsafe { self.counter.as_ref() }
51+
}
52+
53+
/// Acquires another sender reference.
54+
pub(crate) fn acquire(&self) -> Self {
55+
let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
56+
57+
// Cloning senders and calling `mem::forget` on the clones could potentially overflow the
58+
// counter. It's very difficult to recover sensibly from such degenerate scenarios so we
59+
// just abort when the count becomes very large.
60+
if count > isize::MAX as usize {
61+
// TODO: We need some kind of equivalent here.
62+
unimplemented!();
63+
}
64+
65+
Self {
66+
counter: self.counter,
67+
}
68+
}
69+
70+
/// Releases the sender reference.
71+
///
72+
/// Function `disconnect` will be called if this is the last sender reference.
73+
pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
74+
if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
75+
disconnect(&self.counter().chan);
76+
77+
if self.counter().destroy.swap(true, Ordering::AcqRel) {
78+
drop(unsafe { Box::from_raw(self.counter.as_ptr()) });
79+
}
80+
}
81+
}
82+
}
83+
84+
impl<C> ops::Deref for Sender<C> {
85+
type Target = C;
86+
87+
fn deref(&self) -> &C {
88+
&self.counter().chan
89+
}
90+
}
91+
92+
impl<C> PartialEq for Sender<C> {
93+
fn eq(&self, other: &Self) -> bool {
94+
self.counter == other.counter
95+
}
96+
}
97+
98+
/// The receiving side.
99+
pub(crate) struct Receiver<C> {
100+
counter: NonNull<Counter<C>>,
101+
}
102+
103+
impl<C> Receiver<C> {
104+
/// Returns the internal `Counter`.
105+
fn counter(&self) -> &Counter<C> {
106+
unsafe { self.counter.as_ref() }
107+
}
108+
109+
/// Acquires another receiver reference.
110+
pub(crate) fn acquire(&self) -> Self {
111+
let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
112+
113+
// Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
114+
// counter. It's very difficult to recover sensibly from such degenerate scenarios so we
115+
// just abort when the count becomes very large.
116+
if count > isize::MAX as usize {
117+
unimplemented!();
118+
}
119+
120+
Self {
121+
counter: self.counter,
122+
}
123+
}
124+
125+
/// Releases the receiver reference.
126+
///
127+
/// Function `disconnect` will be called if this is the last receiver reference.
128+
pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
129+
if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
130+
disconnect(&self.counter().chan);
131+
132+
if self.counter().destroy.swap(true, Ordering::AcqRel) {
133+
drop(unsafe { Box::from_raw(self.counter.as_ptr()) });
134+
}
135+
}
136+
}
137+
}
138+
139+
impl<C> ops::Deref for Receiver<C> {
140+
type Target = C;
141+
142+
fn deref(&self) -> &C {
143+
&self.counter().chan
144+
}
145+
}
146+
147+
impl<C> PartialEq for Receiver<C> {
148+
fn eq(&self, other: &Self) -> bool {
149+
self.counter == other.counter
150+
}
151+
}

zephyr/src/sys/counter.rs

Whitespace-only changes.

0 commit comments

Comments
 (0)