Skip to content

Commit 559f7f0

Browse files
committed
samples: philosopher: Add channel synchronizer
Add a synchronizer that works with the sync::channel abstraction. Signed-off-by: David Brown <[email protected]>
1 parent a998797 commit 559f7f0

File tree

4 files changed

+197
-0
lines changed

4 files changed

+197
-0
lines changed

samples/philosophers/Kconfig

+7
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,11 @@ choice
2020
help
2121
Use to have the dining philosophers sample use a single data structure, protected
2222
by a sync::Mutex and coordinated with a sync::Condvar, to synchronize.
23+
24+
config SYNC_CHANNEL
25+
bool "Use sync::channel to synchronize forks"
26+
help
27+
Use to have the dining philosophers sample use a worker thread, communicating via
28+
channels to synchronize.
29+
2330
endchoice

samples/philosophers/sample.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,8 @@ tests:
2222
min_ram: 32
2323
extra_configs:
2424
- CONFIG_SYNC_CONDVAR=y
25+
sample.rust.philosopher.channel:
26+
tags: introduction
27+
min_ram: 32
28+
extra_configs:
29+
- CONFIG_SYNC_CHANNEL=y

samples/philosophers/src/channel.rs

+176
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright (c) 2023 Linaro LTD
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Synchronizer using channels
5+
//!
6+
//! Synchronize between the philosophers using channels to communicate with a thread that handles
7+
//! the messages.
8+
9+
extern crate alloc;
10+
11+
use alloc::vec::Vec;
12+
use alloc::boxed::Box;
13+
14+
use zephyr::sync::channel::{self, Receiver, Sender};
15+
use zephyr::{
16+
kobj_define,
17+
sync::Arc,
18+
};
19+
use zephyr::object::KobjInit;
20+
21+
use crate::{NUM_PHIL, ForkSync};
22+
23+
/// An implementation of ForkSync that uses a server commnicated with channels to perform the
24+
/// synchronization.
25+
#[derive(Debug)]
26+
struct ChannelSync {
27+
command: Sender<Command>,
28+
reply_send: Sender<()>,
29+
reply_recv: Receiver<()>,
30+
}
31+
32+
#[derive(Debug)]
33+
enum Command {
34+
Acquire(usize, Sender<()>),
35+
Release(usize),
36+
}
37+
38+
/// This implements a single Fork on the server side for the ChannelSync.
39+
enum ChannelFork {
40+
/// The fork is free,
41+
Free,
42+
/// The work is in use, nobody is waiting.
43+
InUse,
44+
/// The fork is in use, and someone is waiting on it.
45+
InUseWait(Sender<()>),
46+
}
47+
48+
impl Default for ChannelFork {
49+
fn default() -> Self {
50+
ChannelFork::Free
51+
}
52+
}
53+
54+
impl ChannelFork {
55+
/// Attempt to aquire the work. If it is free, reply to the sender, otherwise, track them to
56+
/// reply to them when the fork is freed up.
57+
fn acquire(&mut self, reply: Sender<()>) {
58+
// For debugging, just stop here, and wait for a stack report.
59+
let next = match *self {
60+
ChannelFork::Free => {
61+
// Reply immediately that this fork is free.
62+
reply.send(()).unwrap();
63+
ChannelFork::InUse
64+
}
65+
ChannelFork::InUse => {
66+
// The fork is being used, become the waiter.
67+
ChannelFork::InUseWait(reply)
68+
}
69+
ChannelFork::InUseWait(_) => {
70+
// There is already a wait. Something has gone wrong as this should never happen.
71+
panic!("Mutliple waiters on fork");
72+
}
73+
};
74+
*self = next;
75+
}
76+
77+
/// Release the fork. This is presumably sent from the same sender that requested it, although
78+
/// this is not checked.
79+
fn release(&mut self) {
80+
let next = match self {
81+
ChannelFork::Free => {
82+
// An error case, the fork is not in use, it shouldn't be freed.
83+
panic!("Release of fork that is not in use");
84+
}
85+
ChannelFork::InUse => {
86+
// The fork is in use, and nobody else is waiting.
87+
ChannelFork::Free
88+
}
89+
ChannelFork::InUseWait(waiter) => {
90+
// The fork is in use by us, and someone else is waiting. Tell the other waiter
91+
// they now have the work.
92+
waiter.send(()).unwrap();
93+
ChannelFork::InUse
94+
}
95+
};
96+
*self = next;
97+
}
98+
}
99+
100+
impl ChannelSync {
101+
pub fn new(
102+
command: Sender<Command>,
103+
reply: (Sender<()>, Receiver<()>)) -> ChannelSync
104+
{
105+
ChannelSync {
106+
command,
107+
reply_send: reply.0,
108+
reply_recv: reply.1,
109+
}
110+
}
111+
}
112+
113+
/// Generate a syncer out of a ChannelSync.
114+
#[allow(dead_code)]
115+
pub fn get_channel_syncer() -> Vec<Arc<dyn ForkSync>> {
116+
COMMAND_QUEUE.init();
117+
let command_queue = COMMAND_QUEUE.get();
118+
let (cq_send, cq_recv) = channel::unbounded_from(command_queue);
119+
let reply_queues = REPLY_QUEUES.each_ref().map(|m| {
120+
m.init();
121+
channel::unbounded_from(m.get())
122+
});
123+
let syncer = reply_queues.into_iter().map(|rqueue| {
124+
let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue))
125+
as Box<dyn ForkSync>;
126+
Arc::from(item)
127+
});
128+
129+
let channel_thread = CHANNEL_THREAD.spawn(CHANNEL_STACK.token(), move || {
130+
channel_thread(cq_recv);
131+
});
132+
channel_thread.start();
133+
134+
syncer.collect()
135+
}
136+
137+
/// The thread that handles channel requests.
138+
///
139+
/// Spawned when we are using the channel syncer.
140+
fn channel_thread(cq_recv: Receiver<Command>) {
141+
let mut forks = [(); NUM_PHIL].each_ref().map(|_| ChannelFork::default());
142+
143+
loop {
144+
match cq_recv.recv().unwrap() {
145+
Command::Acquire(fork, reply) => {
146+
forks[fork].acquire(reply);
147+
}
148+
Command::Release(fork) => {
149+
forks[fork].release();
150+
}
151+
}
152+
}
153+
}
154+
155+
impl ForkSync for ChannelSync {
156+
fn take(&self, index: usize) {
157+
self.command.send(Command::Acquire(index, self.reply_send.clone())).unwrap();
158+
// When the reply comes, we know we have the resource.
159+
self.reply_recv.recv().unwrap();
160+
}
161+
162+
fn release(&self, index: usize) {
163+
self.command.send(Command::Release(index)).unwrap();
164+
// Release does not have a reply.
165+
}
166+
}
167+
168+
kobj_define! {
169+
static CHANNEL_STACK: ThreadStack<2054>;
170+
static CHANNEL_THREAD: StaticThread;
171+
172+
// For communicating using Queue, there is one to the main thread (the manager), and one back
173+
// to each philosopher.
174+
static COMMAND_QUEUE: StaticQueue;
175+
static REPLY_QUEUES: [StaticQueue; NUM_PHIL];
176+
}

samples/philosophers/src/lib.rs

+9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
extern crate alloc;
1212

13+
#[allow(unused_imports)]
1314
use alloc::boxed::Box;
1415
use alloc::vec::Vec;
1516
use zephyr::object::KobjInit;
@@ -26,9 +27,12 @@ use zephyr::{
2627
use crate::condsync::CondSync;
2728
#[allow(unused_imports)]
2829
use crate::sysmutex::SysMutexSync;
30+
#[allow(unused_imports)]
31+
use crate::channel::get_channel_syncer;
2932

3033
mod condsync;
3134
mod sysmutex;
35+
mod channel;
3236

3337
/// How many philosophers. There will be the same number of forks.
3438
const NUM_PHIL: usize = 6;
@@ -120,6 +124,11 @@ fn get_syncer() -> Vec<Arc<dyn ForkSync>> {
120124
result
121125
}
122126

127+
#[cfg(CONFIG_SYNC_CHANNEL)]
128+
fn get_syncer() -> Vec<Arc<dyn ForkSync>> {
129+
get_channel_syncer()
130+
}
131+
123132
fn phil_thread(n: usize, syncer: Arc<dyn ForkSync>, stats: Arc<Mutex<Stats>>) {
124133
printkln!("Child {} started: {:?}", n, syncer);
125134

0 commit comments

Comments
 (0)