Skip to content

Commit c8bc8d8

Browse files
committed
zephyr: sync: channel: Create unbounded channels
Create an implementation of bounded channels, in the spirit of crossbeam-channel. Currently, only the bounded channels are supported, an as we don't yet support recovery from panic, ther is no poisoning. As the underlying Zephyr queues don't support deallocation, drop is also a no-op. Signed-off-by: David Brown <[email protected]>
1 parent 8a280ba commit c8bc8d8

File tree

4 files changed

+523
-0
lines changed

4 files changed

+523
-0
lines changed

samples/philosophers/src/channel.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright (c) 2023 Linaro LTD
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Synchronizer using channels
5+
//!
6+
//! Synchronize between the philosophers using channels to communicate with a thread that handles
7+
//! the messages.
8+
9+
extern crate alloc;
10+
11+
use alloc::vec::Vec;
12+
use alloc::boxed::Box;
13+
14+
use zephyr::sync::channel::{self, Receiver, Sender};
15+
use zephyr::{
16+
kobj_define,
17+
sync::Arc,
18+
};
19+
use zephyr::object::KobjInit;
20+
21+
use crate::{NUM_PHIL, ForkSync};
22+
23+
/// An implementation of ForkSync that uses a server commnicated with channels to perform the
24+
/// synchronization.
25+
#[derive(Debug)]
26+
struct ChannelSync {
27+
command: Sender<Command>,
28+
reply_send: Sender<()>,
29+
reply_recv: Receiver<()>,
30+
}
31+
32+
#[derive(Debug)]
33+
enum Command {
34+
Acquire(usize, Sender<()>),
35+
Release(usize),
36+
}
37+
38+
/// This implements a single Fork on the server side for the ChannelSync.
39+
enum ChannelFork {
40+
/// The fork is free,
41+
Free,
42+
/// The work is in use, nobody is waiting.
43+
InUse,
44+
/// The fork is in use, and someone is waiting on it.
45+
InUseWait(Sender<()>),
46+
}
47+
48+
impl Default for ChannelFork {
49+
fn default() -> Self {
50+
ChannelFork::Free
51+
}
52+
}
53+
54+
impl ChannelFork {
55+
/// Attempt to aquire the work. If it is free, reply to the sender, otherwise, track them to
56+
/// reply to them when the fork is freed up.
57+
fn acquire(&mut self, reply: Sender<()>) {
58+
// For debugging, just stop here, and wait for a stack report.
59+
let next = match *self {
60+
ChannelFork::Free => {
61+
// Reply immediately that this fork is free.
62+
reply.send(()).unwrap();
63+
ChannelFork::InUse
64+
}
65+
ChannelFork::InUse => {
66+
// The fork is being used, become the waiter.
67+
ChannelFork::InUseWait(reply)
68+
}
69+
ChannelFork::InUseWait(_) => {
70+
// There is already a wait. Something has gone wrong as this should never happen.
71+
panic!("Mutliple waiters on fork");
72+
}
73+
};
74+
*self = next;
75+
}
76+
77+
/// Release the fork. This is presumably sent from the same sender that requested it, although
78+
/// this is not checked.
79+
fn release(&mut self) {
80+
let next = match self {
81+
ChannelFork::Free => {
82+
// An error case, the fork is not in use, it shouldn't be freed.
83+
panic!("Release of fork that is not in use");
84+
}
85+
ChannelFork::InUse => {
86+
// The fork is in use, and nobody else is waiting.
87+
ChannelFork::Free
88+
}
89+
ChannelFork::InUseWait(waiter) => {
90+
// The fork is in use by us, and someone else is waiting. Tell the other waiter
91+
// they now have the work.
92+
waiter.send(()).unwrap();
93+
ChannelFork::InUse
94+
}
95+
};
96+
*self = next;
97+
}
98+
}
99+
100+
impl ChannelSync {
101+
pub fn new(
102+
command: Sender<Command>,
103+
reply: (Sender<()>, Receiver<()>)) -> ChannelSync
104+
{
105+
ChannelSync {
106+
command,
107+
reply_send: reply.0,
108+
reply_recv: reply.1,
109+
}
110+
}
111+
}
112+
113+
/// Generate a syncer out of a ChannelSync.
114+
#[allow(dead_code)]
115+
pub fn get_channel_syncer() -> Vec<Arc<dyn ForkSync>> {
116+
COMMAND_QUEUE.init();
117+
let command_queue = COMMAND_QUEUE.get();
118+
let (cq_send, cq_recv) = channel::unbounded_from(command_queue);
119+
let reply_queues = REPLY_QUEUES.each_ref().map(|m| {
120+
m.init();
121+
channel::unbounded_from(m.get())
122+
});
123+
let syncer = reply_queues.into_iter().map(|rqueue| {
124+
let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue))
125+
as Box<dyn ForkSync>;
126+
Arc::from(item)
127+
});
128+
129+
let channel_thread = CHANNEL_THREAD.spawn(CHANNEL_STACK.token(), move || {
130+
channel_thread(cq_recv);
131+
});
132+
channel_thread.start();
133+
134+
syncer.collect()
135+
}
136+
137+
/// The thread that handles channel requests.
138+
///
139+
/// Spawned when we are using the channel syncer.
140+
fn channel_thread(cq_recv: Receiver<Command>) {
141+
let mut forks = [(); NUM_PHIL].each_ref().map(|_| ChannelFork::default());
142+
143+
loop {
144+
match cq_recv.recv().unwrap() {
145+
Command::Acquire(fork, reply) => {
146+
forks[fork].acquire(reply);
147+
}
148+
Command::Release(fork) => {
149+
forks[fork].release();
150+
}
151+
}
152+
}
153+
}
154+
155+
impl ForkSync for ChannelSync {
156+
fn take(&self, index: usize) {
157+
self.command.send(Command::Acquire(index, self.reply_send.clone())).unwrap();
158+
// When the reply comes, we know we have the resource.
159+
self.reply_recv.recv().unwrap();
160+
}
161+
162+
fn release(&self, index: usize) {
163+
self.command.send(Command::Release(index)).unwrap();
164+
// Release does not have a reply.
165+
}
166+
}
167+
168+
kobj_define! {
169+
static CHANNEL_STACK: ThreadStack<2054>;
170+
static CHANNEL_THREAD: StaticThread;
171+
172+
// For communicating using Queue, there is one to the main thread (the manager), and one back
173+
// to each philosopher.
174+
static COMMAND_QUEUE: StaticQueue;
175+
static REPLY_QUEUES: [StaticQueue; NUM_PHIL];
176+
}

zephyr/src/sync.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ use core::{
1515
use crate::time::{Forever, NoWait};
1616
use crate::sys::sync as sys;
1717

18+
// Channels are currently only available with allocation. Bounded channels later might be
19+
// available.
20+
#[cfg(CONFIG_RUST_ALLOC)]
21+
pub mod channel;
22+
1823
pub mod atomic {
1924
//! Re-export portable atomic.
2025
//!

zephyr/src/sync/channel.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
//! Close-to-Zephyr channels
2+
//!
3+
//! This module attempts to provide a mechanism as close as possible to `crossbeam-channel` as we
4+
//! can get, directly using Zephyr primitives.
5+
//!
6+
//! The channels are built around `k_queue` in Zephyr. As is the case with most Zephyr types,
7+
//! these are typically statically allocated. Similar to the other close-to-zephyr primitives,
8+
//! this means that there is a constructor that can directly take one of these primitives.
9+
//!
10+
//! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr.
11+
//! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an
12+
//! internal `Arc` inside them, but without the overhead of an actual Arc.
13+
14+
extern crate alloc;
15+
16+
use alloc::boxed::Box;
17+
18+
use core::ffi::c_void;
19+
use core::fmt;
20+
use core::marker::PhantomData;
21+
22+
use crate::sys::queue::Queue;
23+
24+
mod counter;
25+
26+
// The zephyr queue does not allocate or manage the data of the messages, so we need to handle
27+
// allocation as such as well. However, we don't need to manage anything, so it is sufficient to
28+
// simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap
29+
// it back into a Box, and give it to the recipient.
30+
31+
/// Create a multi-producer multi-consumer channel of unbounded capacity. The messages are
32+
/// allocated individually as "Box", and the queue is managed by the underlying Zephyr queue.
33+
pub fn unbounded_from<T>(queue: Queue) -> (Sender<T>, Receiver<T>) {
34+
let (s, r) = counter::new(queue);
35+
let s = Sender {
36+
queue: s,
37+
_phantom: PhantomData,
38+
};
39+
let r = Receiver {
40+
queue: r,
41+
_phantom: PhantomData,
42+
};
43+
(s, r)
44+
}
45+
46+
/// The underlying type for Messages through Zephyr's [`Queue`].
47+
///
48+
/// This wrapper is used internally to wrap user messages through the queue. It is not useful in
49+
/// safe code, but may be useful for implementing other types of message queues.
50+
#[repr(C)]
51+
pub struct Message<T> {
52+
/// The private data used by the kernel to enqueue messages and such.
53+
_private: usize,
54+
/// The actual data being transported.
55+
data: T,
56+
}
57+
58+
impl<T> Message<T> {
59+
fn new(data: T) -> Message<T> {
60+
Message {
61+
_private: 0,
62+
data,
63+
}
64+
}
65+
}
66+
67+
/// The sending side of a channel.
68+
pub struct Sender<T> {
69+
queue: counter::Sender<Queue>,
70+
_phantom: PhantomData<T>,
71+
}
72+
73+
unsafe impl<T: Send> Send for Sender<T> {}
74+
unsafe impl<T: Send> Sync for Sender<T> {}
75+
76+
impl<T> Sender<T> {
77+
/// Sends a message over the given channel. This will perform an alloc of the message, which
78+
/// will have an accompanied free on the recipient side.
79+
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
80+
let msg = Box::new(Message::new(msg));
81+
let msg = Box::into_raw(msg);
82+
unsafe {
83+
self.queue.send(msg as *mut c_void);
84+
}
85+
Ok(())
86+
}
87+
}
88+
89+
impl<T> Drop for Sender<T> {
90+
fn drop(&mut self) {
91+
unsafe {
92+
self.queue.release(|_| {
93+
crate::printkln!("Release");
94+
true
95+
})
96+
}
97+
}
98+
}
99+
100+
impl<T> Clone for Sender<T> {
101+
fn clone(&self) -> Self {
102+
Sender {
103+
queue: self.queue.acquire(),
104+
_phantom: PhantomData,
105+
}
106+
}
107+
}
108+
109+
impl<T: fmt::Debug> fmt::Debug for Sender<T> {
110+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111+
write!(f, "Sender {:?}", *self.queue)
112+
}
113+
}
114+
115+
/// The receiving side of a channel.
116+
pub struct Receiver<T> {
117+
queue: counter::Receiver<Queue>,
118+
_phantom: PhantomData<T>,
119+
}
120+
121+
unsafe impl<T: Send> Send for Receiver<T> {}
122+
unsafe impl<T: Send> Sync for Receiver<T> {}
123+
124+
impl<T> Receiver<T> {
125+
/// Blocks the current thread until a message is received or the channel is empty and
126+
/// disconnected.
127+
///
128+
/// If the channel is empty and not disconnected, this call will block until the receive
129+
/// operation can proceed. If the channel is empty and becomes disconnected, this call will
130+
/// wake up and return an error.
131+
pub fn recv(&self) -> Result<T, RecvError> {
132+
let msg = unsafe {
133+
self.queue.recv()
134+
};
135+
let msg = msg as *mut Message<T>;
136+
let msg = unsafe { Box::from_raw(msg) };
137+
Ok(msg.data)
138+
}
139+
}
140+
141+
impl<T> Drop for Receiver<T> {
142+
fn drop(&mut self) {
143+
unsafe {
144+
self.queue.release(|_| {
145+
crate::printkln!("Release");
146+
true
147+
})
148+
}
149+
}
150+
}
151+
152+
impl<T> Clone for Receiver<T> {
153+
fn clone(&self) -> Self {
154+
Receiver {
155+
queue: self.queue.acquire(),
156+
_phantom: PhantomData,
157+
}
158+
}
159+
}
160+
161+
impl<T> fmt::Debug for Receiver<T> {
162+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163+
write!(f, "Sender {:?}", *self.queue)
164+
}
165+
}
166+
167+
// TODO: Move to err
168+
169+
/// An error returned from the [`send`] method.
170+
///
171+
/// The message could not be sent because the channel is disconnected.
172+
///
173+
/// The error contains the message so it can be recovered.
174+
///
175+
/// [`send`]: Sender::send
176+
#[derive(PartialEq, Eq, Clone, Copy)]
177+
pub struct SendError<T>(pub T);
178+
179+
impl<T> fmt::Debug for SendError<T> {
180+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181+
"SendError(..)".fmt(f)
182+
}
183+
}
184+
185+
/// An error returned from the [`recv`] method.
186+
///
187+
/// A message could not be received because the channel is empty and disconnected.
188+
///
189+
/// [`recv`]: Receiver::recv
190+
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
191+
pub struct RecvError;

0 commit comments

Comments
 (0)