diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b7056f84..2efb4841 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 with: - path: zephyr-rust-lang + path: apptest - name: Set up Python uses: actions/setup-python@v5 @@ -36,7 +36,7 @@ jobs: - name: Setup Zephyr project uses: zephyrproject-rtos/action-zephyr-setup@v1 with: - app-path: zephyr-rust-lang + app-path: apptest manifest-file-name: ci-manifest.yml toolchains: arm-zephyr-eabi:riscv64-zephyr-elf @@ -51,9 +51,13 @@ jobs: rustup target add thumbv8m.main-none-eabi - name: Build firmware - working-directory: zephyr-rust-lang + working-directory: apptest shell: bash run: | cargo --version - west twister -T samples -T tests -v --inline-logs --integration + lscpu + df -h + + west twister -M all -T samples -T tests -v --inline-logs --integration -j 4 \ + $(cat etc/platforms.txt) diff --git a/etc/platforms.txt b/etc/platforms.txt new file mode 100644 index 00000000..32579f8e --- /dev/null +++ b/etc/platforms.txt @@ -0,0 +1,8 @@ +-p mps2/an385 +-p mps2/an521/cpu0 +-p qemu_cortex_m0 +-p qemu_cortex_m3 +-p qemu_riscv32 +-p qemu_riscv32/qemu_virt_riscv32/smp +-p qemu_riscv64 +-p qemu_riscv64/qemu_virt_riscv64/smp diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index 2e481703..3b0ca539 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -7,12 +7,36 @@ source "Kconfig.zephyr" choice prompt "Select Synchronization implementation" - default SYNC_SYS_SEMAPHORE + default SYNC_CHANNEL config SYNC_SYS_SEMAPHORE bool "Use sys::Semaphore to synchronize forks" help - Use to have the dining philosophers sample use sys::Semaphore, with one per form, - to synchronize. + Use to have the dining philosophers sample use sys::Semaphore, with one per fork, to + synchronize. + + config SYNC_SYS_DYNAMIC_SEMAPHORE + bool "Use a dynamic sys::Semaphore to synchronize forks" + help + Use to have the dining philosophers sample use sys::Semaphore, with one per fork, to + synchronize. The Semaphores will be dynamically allocated. + + config SYNC_SYS_MUTEX + bool "Use sys::Semaphore to synchronize forks" + help + Use to have the dining philosophers sample use sys::Mutex, with one per fork, to + synchronize. + + config SYNC_CONDVAR + bool "Use sync::Condvar and sync::Mutex to synchronize forks" + help + Use to have the dining philosophers sample use a single data structure, protected + by a sync::Mutex and coordinated with a sync::Condvar, to synchronize. + + config SYNC_CHANNEL + bool "Use sync::channel to synchronize forks" + help + Use to have the dining philosophers sample use a worker thread, communicating via + channels to synchronize. endchoice diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index cfd1eb47..8e938bd2 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -8,10 +8,7 @@ common: regex: # Match the statistics, and make sure that each philosopher has at least 10 (two digits) # meals. - # - "^\\[\\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}\\]" - # - # Until the stastics have been implemented, just match on one of the children thinking - - "^Child 5 thinking \\(\\d+ ticks.*" + - "^\\[\\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}\\]" tags: rust filter: CONFIG_RUST_SUPPORTED tests: @@ -20,3 +17,23 @@ tests: min_ram: 32 extra_configs: - CONFIG_SYNC_SYS_SEMAPHORE=y + sample.rust.philosopher.dynsemaphore: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_SYS_DYNAMIC_SEMAPHORE=y + sample.rust.philosopher.sysmutex: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_SYS_MUTEX=y + sample.rust.philosopher.condvar: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_CONDVAR=y + sample.rust.philosopher.channel: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_CHANNEL=y diff --git a/samples/philosophers/src/channel.rs b/samples/philosophers/src/channel.rs new file mode 100644 index 00000000..399bb2b3 --- /dev/null +++ b/samples/philosophers/src/channel.rs @@ -0,0 +1,167 @@ +// Copyright (c) 2023 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Synchronizer using channels +//! +//! Synchronize between the philosophers using channels to communicate with a thread that handles +//! the messages. + +extern crate alloc; + +use alloc::vec::Vec; +use alloc::boxed::Box; + +use zephyr::sync::channel::{self, Receiver, Sender}; +use zephyr::{ + kobj_define, + sync::Arc, +}; + +use crate::{NUM_PHIL, ForkSync}; + +/// An implementation of ForkSync that uses a server commnicated with channels to perform the +/// synchronization. +#[derive(Debug)] +struct ChannelSync { + command: Sender, + reply_send: Sender<()>, + reply_recv: Receiver<()>, +} + +#[derive(Debug)] +enum Command { + Acquire(usize, Sender<()>), + Release(usize), +} + +/// This implements a single Fork on the server side for the ChannelSync. +enum ChannelFork { + /// The fork is free, + Free, + /// The work is in use, nobody is waiting. + InUse, + /// The fork is in use, and someone is waiting on it. + InUseWait(Sender<()>), +} + +impl Default for ChannelFork { + fn default() -> Self { + ChannelFork::Free + } +} + +impl ChannelFork { + /// Attempt to aquire the work. If it is free, reply to the sender, otherwise, track them to + /// reply to them when the fork is freed up. + fn acquire(&mut self, reply: Sender<()>) { + // For debugging, just stop here, and wait for a stack report. + let next = match *self { + ChannelFork::Free => { + // Reply immediately that this fork is free. + reply.send(()).unwrap(); + ChannelFork::InUse + } + ChannelFork::InUse => { + // The fork is being used, become the waiter. + ChannelFork::InUseWait(reply) + } + ChannelFork::InUseWait(_) => { + // There is already a wait. Something has gone wrong as this should never happen. + panic!("Mutliple waiters on fork"); + } + }; + *self = next; + } + + /// Release the fork. This is presumably sent from the same sender that requested it, although + /// this is not checked. + fn release(&mut self) { + let next = match self { + ChannelFork::Free => { + // An error case, the fork is not in use, it shouldn't be freed. + panic!("Release of fork that is not in use"); + } + ChannelFork::InUse => { + // The fork is in use, and nobody else is waiting. + ChannelFork::Free + } + ChannelFork::InUseWait(waiter) => { + // The fork is in use by us, and someone else is waiting. Tell the other waiter + // they now have the work. + waiter.send(()).unwrap(); + ChannelFork::InUse + } + }; + *self = next; + } +} + +impl ChannelSync { + pub fn new( + command: Sender, + reply: (Sender<()>, Receiver<()>)) -> ChannelSync + { + ChannelSync { + command, + reply_send: reply.0, + reply_recv: reply.1, + } + } +} + +/// Generate a syncer out of a ChannelSync. +#[allow(dead_code)] +pub fn get_channel_syncer() -> Vec> { + let (cq_send, cq_recv) = channel::unbounded(); + let reply_queues = [(); NUM_PHIL].each_ref().map(|()| { + channel::unbounded() + }); + let syncer = reply_queues.into_iter().map(|rqueue| { + let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue)) + as Box; + Arc::from(item) + }); + + let channel_child = CHANNEL_THREAD.init_once(CHANNEL_STACK.init_once(()).unwrap()).unwrap(); + channel_child.spawn(move || { + channel_thread(cq_recv); + }); + + syncer.collect() +} + +/// The thread that handles channel requests. +/// +/// Spawned when we are using the channel syncer. +fn channel_thread(cq_recv: Receiver) { + let mut forks = [(); NUM_PHIL].each_ref().map(|_| ChannelFork::default()); + + loop { + match cq_recv.recv().unwrap() { + Command::Acquire(fork, reply) => { + forks[fork].acquire(reply); + } + Command::Release(fork) => { + forks[fork].release(); + } + } + } +} + +impl ForkSync for ChannelSync { + fn take(&self, index: usize) { + self.command.send(Command::Acquire(index, self.reply_send.clone())).unwrap(); + // When the reply comes, we know we have the resource. + self.reply_recv.recv().unwrap(); + } + + fn release(&self, index: usize) { + self.command.send(Command::Release(index)).unwrap(); + // Release does not have a reply. + } +} + +kobj_define! { + static CHANNEL_STACK: ThreadStack<2054>; + static CHANNEL_THREAD: StaticThread; +} diff --git a/samples/philosophers/src/condsync.rs b/samples/philosophers/src/condsync.rs new file mode 100644 index 00000000..f8266e90 --- /dev/null +++ b/samples/philosophers/src/condsync.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! # sync::Mutex/sync::Condvar implementation of ForkSync +//! +//! This implementation of the Fork synchronizer uses a single data object, protected by a +//! `sync::Mutex`, and coordinated by a `sync::Condvar`. + +use crate::{ + ForkSync, + NUM_PHIL, +}; +use zephyr::sync::Mutex; +use zephyr::sync::Condvar; +// use zephyr::time::Forever; + +#[derive(Debug)] +pub struct CondSync { + /// The lock that holds the flag for each philosopher. + lock: Mutex<[bool; NUM_PHIL]>, + /// Condition variable to wake other threads. + cond: Condvar, +} + +impl CondSync { + #[allow(dead_code)] + pub fn new() -> CondSync { + CondSync { + lock: Mutex::new([false; NUM_PHIL]), + cond: Condvar::new(), + } + } +} + +impl ForkSync for CondSync { + fn take(&self, index: usize) { + let mut lock = self.lock.lock().unwrap(); + while lock[index] { + lock = self.cond.wait(lock).unwrap(); + } + lock[index] = true; + } + + fn release(&self, index: usize) { + let mut lock = self.lock.lock().unwrap(); + lock[index] = false; + // No predictible waiter, so must wake everyone. + self.cond.notify_all(); + } +} diff --git a/samples/philosophers/src/dynsemsync.rs b/samples/philosophers/src/dynsemsync.rs new file mode 100644 index 00000000..8015ffe9 --- /dev/null +++ b/samples/philosophers/src/dynsemsync.rs @@ -0,0 +1,51 @@ +// Copyright (c) 2023 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Semaphore based sync. +//! +//! This is the simplest type of sync, which uses a single semaphore per fork. + +extern crate alloc; + +use alloc::vec::Vec; +use alloc::boxed::Box; + +use zephyr::{ + sync::Arc, sys::sync::Semaphore, time::Forever +}; + +use crate::{ForkSync, NUM_PHIL}; + +#[derive(Debug)] +pub struct SemSync { + /// The forks for this philosopher. This is a big excessive, as we really don't need all of + /// them, but the ForSync code uses the index here. + forks: [Arc; NUM_PHIL], +} + +impl ForkSync for SemSync { + fn take(&self, index: usize) { + self.forks[index].take(Forever).unwrap(); + } + + fn release(&self, index: usize) { + self.forks[index].give(); + } +} + +#[allow(dead_code)] +pub fn dyn_semaphore_sync() -> Vec> { + let forks = [(); NUM_PHIL].each_ref().map(|()| { + Arc::new(Semaphore::new(1, 1).unwrap()) + }); + + let syncers = (0..NUM_PHIL).map(|_| { + let syncer = SemSync { + forks: forks.clone(), + }; + let item = Box::new(syncer) as Box; + Arc::from(item) + }).collect(); + + syncers +} diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index 7e514305..35be8f25 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -18,13 +18,25 @@ use zephyr::{ printkln, kobj_define, sys::uptime_get, - sync::Arc, + sync::{Arc, Mutex}, }; // These are optional, based on Kconfig, so allow them to be unused. #[allow(unused_imports)] +use crate::condsync::CondSync; +#[allow(unused_imports)] +use crate::sysmutex::SysMutexSync; +#[allow(unused_imports)] +use crate::channel::get_channel_syncer; +#[allow(unused_imports)] use crate::semsync::semaphore_sync; +#[allow(unused_imports)] +use crate::dynsemsync::dyn_semaphore_sync; +mod channel; +mod condsync; +mod dynsemsync; +mod sysmutex; mod semsync; /// How many philosophers. There will be the same number of forks. @@ -59,14 +71,17 @@ extern "C" fn rust_main() { zephyr::kconfig::CONFIG_BOARD); printkln!("Time tick: {}", zephyr::time::SYS_FREQUENCY); + let stats = Arc::new(Mutex::new_from(Stats::default(), STAT_MUTEX.init_once(()).unwrap())); + let syncers = get_syncer(); printkln!("Pre fork"); for (i, syncer) in (0..NUM_PHIL).zip(syncers.into_iter()) { + let child_stat = stats.clone(); let thread = PHIL_THREADS[i].init_once(PHIL_STACKS[i].init_once(()).unwrap()).unwrap(); thread.spawn(move || { - phil_thread(i, syncer); + phil_thread(i, syncer, child_stat); }); } @@ -74,6 +89,7 @@ extern "C" fn rust_main() { loop { // Periodically, printout the stats. zephyr::time::sleep(delay); + stats.lock().unwrap().show(); } } @@ -82,7 +98,42 @@ fn get_syncer() -> Vec> { semaphore_sync() } -fn phil_thread(n: usize, syncer: Arc) { +#[cfg(CONFIG_SYNC_SYS_DYNAMIC_SEMAPHORE)] +fn get_syncer() -> Vec> { + dyn_semaphore_sync() +} + +#[cfg(CONFIG_SYNC_SYS_MUTEX)] +fn get_syncer() -> Vec> { + let syncer = Box::new(SysMutexSync::new()) + as Box; + let syncer: Arc = Arc::from(syncer); + let mut result = Vec::new(); + for _ in 0..NUM_PHIL { + result.push(syncer.clone()); + } + result +} + +#[cfg(CONFIG_SYNC_CONDVAR)] +fn get_syncer() -> Vec> { + // Condvar version + let syncer = Box::new(CondSync::new()) + as Box; + let syncer: Arc = Arc::from(syncer); + let mut result = Vec::new(); + for _ in 0..NUM_PHIL { + result.push(syncer.clone()); + } + result +} + +#[cfg(CONFIG_SYNC_CHANNEL)] +fn get_syncer() -> Vec> { + get_channel_syncer() +} + +fn phil_thread(n: usize, syncer: Arc, stats: Arc>) { printkln!("Child {} started: {:?}", n, syncer); // Determine our two forks. @@ -95,26 +146,26 @@ fn phil_thread(n: usize, syncer: Arc) { loop { { - printkln!("Child {} hungry", n); - printkln!("Child {} take left fork", n); + // printkln!("Child {} hungry", n); + // printkln!("Child {} take left fork", n); syncer.take(forks.0); - printkln!("Child {} take right fork", n); + // printkln!("Child {} take right fork", n); syncer.take(forks.1); let delay = get_random_delay(n, 25); - printkln!("Child {} eating ({} ms)", n, delay); + // printkln!("Child {} eating ({} ms)", n, delay); sleep(delay); - // stats.lock().unwrap().record_eat(n, delay); + stats.lock().unwrap().record_eat(n, delay); // Release the forks. - printkln!("Child {} giving up forks", n); + // printkln!("Child {} giving up forks", n); syncer.release(forks.1); syncer.release(forks.0); let delay = get_random_delay(n, 25); - printkln!("Child {} thinking ({} ms)", n, delay); + // printkln!("Child {} thinking ({} ms)", n, delay); sleep(delay); - // stats.lock().unwrap().record_think(n, delay); + stats.lock().unwrap().record_think(n, delay); } } } @@ -128,7 +179,36 @@ fn get_random_delay(id: usize, period: usize) -> Duration { Duration::millis_at_least(((delay + 1) * period) as Tick) } +/// Instead of just printint out so much information that the data just scolls by, gather +/// statistics. +#[derive(Default)] +struct Stats { + /// How many times each philosopher has gone through the loop. + count: [u64; NUM_PHIL], + /// How much time each philosopher has spent eating. + eating: [u64; NUM_PHIL], + /// How much time each philosopher has spent thinking. + thinking: [u64; NUM_PHIL], +} + +impl Stats { + fn record_eat(&mut self, index: usize, time: Duration) { + self.eating[index] += time.to_millis(); + } + + fn record_think(&mut self, index: usize, time: Duration) { + self.thinking[index] += time.to_millis(); + self.count[index] += 1; + } + + fn show(&self) { + printkln!("{:?}, e:{:?}, t:{:?}", self.count, self.eating, self.thinking); + } +} + kobj_define! { static PHIL_THREADS: [StaticThread; NUM_PHIL]; static PHIL_STACKS: [ThreadStack; NUM_PHIL]; + + static STAT_MUTEX: StaticMutex; } diff --git a/samples/philosophers/src/semsync.rs b/samples/philosophers/src/semsync.rs index 03bf8ce9..889ab412 100644 --- a/samples/philosophers/src/semsync.rs +++ b/samples/philosophers/src/semsync.rs @@ -11,9 +11,7 @@ use alloc::vec::Vec; use alloc::boxed::Box; use zephyr::{ - kobj_define, - sync::Arc, - time::Forever, + kobj_define, sync::Arc, sys::sync::Semaphore, time::Forever }; use crate::{ForkSync, NUM_PHIL}; @@ -22,7 +20,7 @@ use crate::{ForkSync, NUM_PHIL}; pub struct SemSync { /// The forks for this philosopher. This is a big excessive, as we really don't need all of /// them, but the ForSync code uses the index here. - forks: [zephyr::sys::sync::Semaphore; NUM_PHIL], + forks: [Arc; NUM_PHIL], } impl ForkSync for SemSync { @@ -39,7 +37,7 @@ impl ForkSync for SemSync { pub fn semaphore_sync() -> Vec> { let forks = SEMS.each_ref().map(|m| { // Each fork starts as taken. - m.init_once((1, 1)).unwrap() + Arc::new(m.init_once((1, 1)).unwrap()) }); let syncers = (0..NUM_PHIL).map(|_| { diff --git a/samples/philosophers/src/sysmutex.rs b/samples/philosophers/src/sysmutex.rs new file mode 100644 index 00000000..969efb5f --- /dev/null +++ b/samples/philosophers/src/sysmutex.rs @@ -0,0 +1,44 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! # sys::Mutex implementation of ForkSync +//! +//! This is a simple implementation of the Fork synchronizer that uses underlying Zephyr `k_mutex` +//! wrapped in `sys::Mutex`. The ForkSync semantics map simply to these. + +use crate::{ + ForkSync, + NUM_PHIL, +}; +use zephyr::sys::sync::Mutex; +use zephyr::time::Forever; + +type SysMutexes = [Mutex; NUM_PHIL]; + +/// A simple implementation of ForkSync based on underlying Zephyr sys::Mutex, which uses explicit +/// lock and release semantics. + +#[derive(Debug)] +pub struct SysMutexSync { + locks: SysMutexes, +} + +impl SysMutexSync { + #[allow(dead_code)] + pub fn new() -> SysMutexSync { + let locks = [(); NUM_PHIL].each_ref().map(|()| { + Mutex::new().unwrap() + }); + SysMutexSync { locks } + } +} + +impl ForkSync for SysMutexSync { + fn take(&self, index: usize) { + self.locks[index].lock(Forever).unwrap(); + } + + fn release(&self, index: usize) { + self.locks[index].unlock().unwrap(); + } +} diff --git a/zephyr/src/object.rs b/zephyr/src/object.rs index 417c1bd3..a6d136ff 100644 --- a/zephyr/src/object.rs +++ b/zephyr/src/object.rs @@ -79,8 +79,17 @@ //! [`kobj_define!`]: crate::kobj_define //! [`init_once`]: StaticKernelObject::init_once +#[cfg(CONFIG_RUST_ALLOC)] +extern crate alloc; + use core::{cell::UnsafeCell, mem}; +#[cfg(CONFIG_RUST_ALLOC)] +use core::pin::Pin; + +#[cfg(CONFIG_RUST_ALLOC)] +use alloc::boxed::Box; + use crate::sync::atomic::{AtomicUsize, Ordering}; // The kernel object itself must be wrapped in `UnsafeCell` in Rust. This does several thing, but @@ -91,28 +100,6 @@ use crate::sync::atomic::{AtomicUsize, Ordering}; // the mutations happen from C code, so this is less important than the data being placed in the // proper section. Many will have the link section overridden by the `kobj_define` macro. -/// A kernel object represented statically in Rust code. -/// -/// These should not be declared directly by the user, as they generally need linker decorations to -/// be properly registered in Zephyr as kernel objects. The object has the underlying Zephyr type -/// T, and the wrapper type W. -/// -/// Kernel objects will have their `StaticThing` implemented as `StaticKernelObject` where -/// `kobj` is the type of the underlying Zephyr object. `Thing` will usually be a struct with a -/// single field, which is a `*mut kobj`. -/// -/// TODO: Can we avoid the public fields with a const new method? -/// -/// TODO: Handling const-defined alignment for these. -pub struct StaticKernelObject { - #[allow(dead_code)] - /// The underlying zephyr kernel object. - pub value: UnsafeCell, - /// Initialization status of this object. Most objects will start uninitialized and be - /// initialized manually. - pub init: AtomicUsize, -} - /// Define the Wrapping of a kernel object. /// /// This trait defines the association between a static kernel object and the two associated Rust @@ -154,6 +141,28 @@ pub const KOBJ_INITING: usize = 1; /// take has been called. And shouldn't be allowed additional times. pub const KOBJ_INITIALIZED: usize = 2; +/// A kernel object represented statically in Rust code. +/// +/// These should not be declared directly by the user, as they generally need linker decorations to +/// be properly registered in Zephyr as kernel objects. The object has the underlying Zephyr type +/// T, and the wrapper type W. +/// +/// Kernel objects will have their `StaticThing` implemented as `StaticKernelObject` where +/// `kobj` is the type of the underlying Zephyr object. `Thing` will usually be a struct with a +/// single field, which is a `*mut kobj`. +/// +/// TODO: Can we avoid the public fields with a const new method? +/// +/// TODO: Handling const-defined alignment for these. +pub struct StaticKernelObject { + #[allow(dead_code)] + /// The underlying zephyr kernel object. + pub value: UnsafeCell, + /// Initialization status of this object. Most objects will start uninitialized and be + /// initialized manually. + pub init: AtomicUsize, +} + impl StaticKernelObject where StaticKernelObject: Wrapped, @@ -161,7 +170,7 @@ where /// Construct an empty of these objects, with the zephyr data zero-filled. This is safe in the /// sense that Zephyr we track the initialization, they start in the uninitialized state, and /// the zero value of the initialize atomic indicates that it is uninitialized. - pub const fn new() -> StaticKernelObject { + pub const unsafe fn new() -> StaticKernelObject { StaticKernelObject { value: unsafe { mem::zeroed() }, init: AtomicUsize::new(KOBJ_UNINITIALIZED), @@ -189,6 +198,41 @@ where } } +/// Objects that can be fixed or allocated. +/// +/// When using Rust threads from userspace, the `kobj_define` declarations and the complexity behind +/// it is required. If all Rust use of kernel objects is from system threads, and dynamic memory is +/// available, kernel objects can be freeallocated, as long as the allocations themselves are +/// pinned. This `Fixed` encapsulates both of these. +pub enum Fixed { + /// Objects that have been statically declared and just pointed to. + Static(*mut T), + /// Objects that are owned by the wrapper, and contained here. + #[cfg(CONFIG_RUST_ALLOC)] + Owned(Pin>>), +} + +impl Fixed { + /// Get the raw pointer out of the fixed object. + /// + /// Returns the `*mut T` pointer held by this object. It is either just the static pointer, or + /// the pointer outside of the unsafe cell holding the dynamic kernel object. + pub fn get(&self) -> *mut T { + match self { + Fixed::Static(ptr) => *ptr, + #[cfg(CONFIG_RUST_ALLOC)] + Fixed::Owned(item) => item.get(), + } + } + + /// Construct a new fixed from an allocation. Note that the object will not be fixed in memory, + /// until _after_ this returns, and it should not be initialized until then. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new(item: T) -> Fixed { + Fixed::Owned(Box::pin(UnsafeCell::new(item))) + } +} + /// Declare a static kernel object. This helps declaring static values of Zephyr objects. /// /// This can typically be used as: @@ -236,6 +280,40 @@ macro_rules! _kobj_rule { unsafe { ::core::mem::zeroed() }; }; + // static NAME: StaticMutex + ($v:vis, $name:ident, StaticMutex) => { + #[link_section = concat!("._k_mutex.static.", stringify!($name), ".", file!(), line!())] + $v static $name: $crate::sys::sync::StaticMutex = + unsafe { $crate::sys::sync::StaticMutex::new() }; + }; + + // static NAMES: [StaticMutex; COUNT]; + ($v:vis, $name:ident, [StaticMutex; $size:expr]) => { + #[link_section = concat!("._k_mutex.static.", stringify!($name), ".", file!(), line!())] + $v static $name: [$crate::sys::sync::StaticMutex; $size] = + // This isn't Copy, intentionally, so initialize the whole thing with zerored memory. + // Relying on the atomic to be 0 for the uninitialized state. + // [$crate::sys::sync::StaticMutex::new(); $size]; + unsafe { ::core::mem::zeroed() }; + }; + + // static NAME: StaticCondvar; + ($v:vis, $name:ident, StaticCondvar) => { + #[link_section = concat!("._k_condvar.static.", stringify!($name), ".", file!(), line!())] + $v static $name: $crate::sys::sync::StaticCondvar = + unsafe { $crate::sys::sync::StaticCondvar::new() }; + }; + + // static NAMES: [StaticCondvar; COUNT]; + ($v:vis, $name:ident, [StaticCondvar; $size:expr]) => { + #[link_section = concat!("._k_condvar.static.", stringify!($name), ".", file!(), line!())] + $v static $name: [$crate::sys::sync::StaticCondvar; $size] = + // This isn't Copy, intentionally, so initialize the whole thing with zerored memory. + // Relying on the atomic to be 0 for the uninitialized state. + // [$crate::sys::sync::StaticMutex::new(); $size]; + unsafe { ::core::mem::zeroed() }; + }; + // static THREAD: staticThread; ($v:vis, $name:ident, StaticThread) => { // Since the static object has an atomic that we assume is initialized, we cannot use the @@ -275,6 +353,19 @@ macro_rules! _kobj_rule { ($v:vis, $name:ident, [ThreadStack<{$size:expr}>; $asize:expr]) => { $crate::_kobj_stack!($v, $name, $size, $asize); }; + + // Queues. + ($v:vis, $name: ident, StaticQueue) => { + #[link_section = concat!("._k_queue.static.", stringify!($name), ".", file!(), line!())] + $v static $name: $crate::sys::queue::StaticQueue = + unsafe { ::core::mem::zeroed() }; + }; + + ($v:vis, $name: ident, [StaticQueue; $size:expr]) => { + #[link_section = concat!("._k_queue.static.", stringify!($name), ".", file!(), line!())] + $v static $name: [$crate::sys::queue::StaticQueue; $size] = + unsafe { ::core::mem::zeroed() }; + }; } #[doc(hidden)] diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index bdce4e20..8003d9e2 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -5,6 +5,21 @@ //! [`crossbeam-channel`](https://docs.rs/crossbeam-channel/latest/crossbeam_channel/), in as much //! as it makes sense. +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + ops::{Deref, DerefMut}, +}; + +use crate::time::{Forever, NoWait}; +use crate::sys::sync as sys; + +// Channels are currently only available with allocation. Bounded channels later might be +// available. +#[cfg(CONFIG_RUST_ALLOC)] +pub mod channel; + pub mod atomic { //! Re-export portable atomic. //! @@ -22,3 +37,240 @@ pub mod atomic { #[cfg(CONFIG_RUST_ALLOC)] pub use portable_atomic_util::Arc; + +// Channels are currently only available with allocation. Bounded channels later might be +// available. + +/// Until poisoning is implemented, mutexes never return an error, and we just get back the guard. +pub type LockResult = Result; + +/// The return type from [`Mutex::try_lock`]. +/// +/// The error indicates the reason for the failure. Until poisoning is +/// implemented, there is only a single type of failure. +pub type TryLockResult = Result; + +/// An enumeration of possible errors associated with a [`TryLockResult`]. +/// +/// Note that until Poisoning is implemented, there is only one value of this. +pub enum TryLockError { + /// The lock could not be acquired at this time because the operation would otherwise block. + WouldBlock, +} + +/// A mutual exclusion primitive useful for protecting shared data. +/// +/// This mutex will block threads waiting for the lock to become available. This is modeled after +/// [`std::sync::Mutex`](https://doc.rust-lang.org/stable/std/sync/struct.Mutex.html), and attempts +/// to implement that API as closely as makes sense on Zephyr. Currently, it has the following +/// differences: +/// - Poisoning: This does not yet implement poisoning, as there is no way to recover from panic at +/// this time on Zephyr. +/// - Allocation: `new` is not yet provided, and will be provided once kernel object pools are +/// implemented. Please use `new_from` which takes a reference to a statically allocated +/// `sys::Mutex`. +pub struct Mutex { + inner: sys::Mutex, + // poison: ... + data: UnsafeCell, +} + +// At least if correctly done, the Mutex provides for Send and Sync as long as the inner data +// supports Send. +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Mutex {:?}", self.inner) + } +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is dropped (faslls +/// out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its [`Deref`] and +/// [`DerefMut`] implementations. +/// +/// This structure is created by the [`lock`] and [`try_lock`] methods on [`Mutex`]. +/// +/// [`lock`]: Mutex::lock +/// [`try_lock`]: Mutex::try_lock +/// +/// Taken directly from +/// [`std::sync::MutexGuard`](https://doc.rust-lang.org/stable/std/sync/struct.MutexGuard.html). +pub struct MutexGuard<'a, T: ?Sized + 'a> { + lock: &'a Mutex, + // until is implemented, we have to mark unsend + // explicitly. This can be done by holding Phantom data with an unsafe cell in it. + _nosend: PhantomData>, +} + +// Make sure the guard doesn't get sent. +// Negative trait bounds are unstable, see marker above. +// impl !Send for MutexGuard<'_, T> {} +unsafe impl Sync for MutexGuard<'_, T> {} + +impl Mutex { + /// Construct a new wrapped Mutex, using the given underlying sys mutex. This is different that + /// `std::sync::Mutex` in that in Zephyr, objects are frequently allocated statically, and the + /// sys Mutex will be taken by this structure. It is safe to share the underlying Mutex between + /// different items, but without careful use, it is easy to deadlock, so it is not recommended. + pub const fn new_from(t: T, raw_mutex: sys::Mutex) -> Mutex { + Mutex { inner: raw_mutex, data: UnsafeCell::new(t) } + } + + /// Construct a new Mutex, dynamically allocating the underlying sys Mutex. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new(t: T) -> Mutex { + Mutex::new_from(t, sys::Mutex::new().unwrap()) + } +} + +impl Mutex { + /// Acquires a mutex, blocking the current thread until it is able to do so. + /// + /// This function will block the local thread until it is available to acquire the mutex. Upon + /// returning, the thread is the only thread with the lock held. An RAII guard is returned to + /// allow scoped unlock of the lock. When the guard goes out of scope, the mutex will be + /// unlocked. + /// + /// In `std`, an attempt to lock a mutex by a thread that already holds the mutex is + /// unspecified. Zephyr explicitly supports this behavior, by simply incrementing a lock + /// count. + pub fn lock(&self) -> LockResult> { + // With `Forever`, should never return an error. + self.inner.lock(Forever).unwrap(); + unsafe { + Ok(MutexGuard::new(self)) + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then [`Err`] is returned. Otherwise, an RAII + /// guard is returned. The lock will be unlocked when the guard is dropped. + /// + /// This function does not block. + pub fn try_lock(&self) -> TryLockResult> { + match self.inner.lock(NoWait) { + Ok(()) => { + unsafe { + Ok(MutexGuard::new(self)) + } + } + // TODO: It might be better to distinguish these errors, and only return the WouldBlock + // if that is the corresponding error. But, the lock shouldn't fail in Zephyr. + Err(_) => { + Err(TryLockError::WouldBlock) + } + } + } +} + +impl<'mutex, T: ?Sized> MutexGuard<'mutex, T> { + unsafe fn new(lock: &'mutex Mutex) -> MutexGuard<'mutex, T> { + // poison todo + MutexGuard { lock, _nosend: PhantomData } + } +} + +impl Deref for MutexGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { + &*self.lock.data.get() + } + } +} + +impl DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +impl Drop for MutexGuard<'_, T> { + #[inline] + fn drop(&mut self) { + self.lock.inner.unlock().unwrap(); + } +} + +/// Inspired by +/// [`std::sync::Condvar`](https://doc.rust-lang.org/stable/std/sync/struct.Condvar.html), +/// implemented directly using `z_condvar` in Zephyr. +/// +/// Condition variables represent the ability to block a thread such that it consumes no CPU time +/// while waiting for an even to occur. Condition variables are typically associated with a +/// boolean predicate (a condition) and a mutex. The predicate is always verified inside of the +/// mutex before determining that a thread must block. +/// +/// Functions in this module will block the current **thread** of execution. Note that any attempt +/// to use multiple mutexces on the same condition variable may result in a runtime panic. +pub struct Condvar { + inner: sys::Condvar, +} + +impl Condvar { + /// Construct a new wrapped Condvar, using the given underlying `k_condvar`. + /// + /// This is different from `std::sync::Condvar` in that in Zephyr, objects are frequently + /// allocated statically, and the sys Condvar will be taken by this structure. + pub const fn new_from(raw_condvar: sys::Condvar) -> Condvar { + Condvar { inner: raw_condvar } + } + + /// Construct a new Condvar, dynamically allocating the underlying Zephyr `k_condvar`. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Condvar { + Condvar::new_from(sys::Condvar::new().unwrap()) + } + + /// Blocks the current thread until this conditional variable receives a notification. + /// + /// This function will automatically unlock the mutex specified (represented by `guard`) and + /// block the current thread. This means that any calls to `notify_one` or `notify_all` which + /// happen logically after the mutex is unlocked are candidates to wake this thread up. When + /// this function call returns, the lock specified will have been re-equired. + /// + /// Note that this function is susceptable to spurious wakeups. Condition variables normally + /// have a boolean predicate associated with them, and the predicate must always be checked + /// each time this function returns to protect against spurious wakeups. + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> LockResult> { + self.inner.wait(&guard.lock.inner); + Ok(guard) + } + + // TODO: wait_while + // TODO: wait_timeout_ms + // TODO: wait_timeout + // TODO: wait_timeout_while + + /// Wakes up one blocked thread on this condvar. + /// + /// If there is a blocked thread on this condition variable, then it will be woken up from its + /// call to `wait` or `wait_timeout`. Calls to `notify_one` are not buffered in any way. + /// + /// To wakeup all threads, see `notify_all`. + pub fn notify_one(&self) { + self.inner.notify_one(); + } + + /// Wakes up all blocked threads on this condvar. + /// + /// This methods will ensure that any current waiters on the condition variable are awoken. + /// Calls to `notify_all()` are not buffered in any way. + /// + /// To wake up only one thread, see `notify_one`. + pub fn notify_all(&self) { + self.inner.notify_all(); + } +} + +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Condvar {:?}", self.inner) + } +} diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs new file mode 100644 index 00000000..c541b75d --- /dev/null +++ b/zephyr/src/sync/channel.rs @@ -0,0 +1,206 @@ +//! Close-to-Zephyr channels +//! +//! This module attempts to provide a mechanism as close as possible to `crossbeam-channel` as we +//! can get, directly using Zephyr primitives. +//! +//! The channels are built around `k_queue` in Zephyr. As is the case with most Zephyr types, +//! these are typically statically allocated. Similar to the other close-to-zephyr primitives, +//! this means that there is a constructor that can directly take one of these primitives. +//! +//! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr. +//! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an +//! internal `Arc` inside them, but without the overhead of an actual Arc. + +extern crate alloc; + +use alloc::boxed::Box; + +use core::ffi::c_void; +use core::fmt; +use core::marker::PhantomData; + +use crate::sys::queue::Queue; + +mod counter; + +// The zephyr queue does not allocate or manage the data of the messages, so we need to handle +// allocation as such as well. However, we don't need to manage anything, so it is sufficient to +// simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap +// it back into a Box, and give it to the recipient. + +/// Create a multi-producer multi-consumer channel of unbounded capacity, using an existing Queue +/// object. +/// +/// The messages are allocated individually as "Box", and the queue is managed by the underlying +/// Zephyr queue. +pub fn unbounded_from(queue: Queue) -> (Sender, Receiver) { + let (s, r) = counter::new(queue); + let s = Sender { + queue: s, + _phantom: PhantomData, + }; + let r = Receiver { + queue: r, + _phantom: PhantomData, + }; + (s, r) +} + +/// Create a multi-producer multi-consumer channel of unbounded capacity. +/// +/// The messages are allocated individually as "Box". The underlying Zephyr queue will be +/// dynamically allocated. +/// +/// **Note**: Currently Drop is not propertly supported on Zephyr. If all senders are dropped, any +/// receivers will likely be blocked forever. Any data that has been queued and not received will +/// be leaked when all receivers have been droped. +pub fn unbounded() -> (Sender, Receiver) { + unbounded_from(Queue::new().unwrap()) +} + +/// The underlying type for Messages through Zephyr's [`Queue`]. +/// +/// This wrapper is used internally to wrap user messages through the queue. It is not useful in +/// safe code, but may be useful for implementing other types of message queues. +#[repr(C)] +pub struct Message { + /// The private data used by the kernel to enqueue messages and such. + _private: usize, + /// The actual data being transported. + data: T, +} + +impl Message { + fn new(data: T) -> Message { + Message { + _private: 0, + data, + } + } +} + +/// The sending side of a channel. +pub struct Sender { + queue: counter::Sender, + _phantom: PhantomData, +} + +unsafe impl Send for Sender {} +unsafe impl Sync for Sender {} + +impl Sender { + /// Sends a message over the given channel. This will perform an alloc of the message, which + /// will have an accompanied free on the recipient side. + pub fn send(&self, msg: T) -> Result<(), SendError> { + let msg = Box::new(Message::new(msg)); + let msg = Box::into_raw(msg); + unsafe { + self.queue.send(msg as *mut c_void); + } + Ok(()) + } +} + +impl Drop for Sender { + fn drop(&mut self) { + unsafe { + self.queue.release(|_| { + crate::printkln!("Release"); + true + }) + } + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Sender { + queue: self.queue.acquire(), + _phantom: PhantomData, + } + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Sender {:?}", *self.queue) + } +} + +/// The receiving side of a channel. +pub struct Receiver { + queue: counter::Receiver, + _phantom: PhantomData, +} + +unsafe impl Send for Receiver {} +unsafe impl Sync for Receiver {} + +impl Receiver { + /// Blocks the current thread until a message is received or the channel is empty and + /// disconnected. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed. If the channel is empty and becomes disconnected, this call will + /// wake up and return an error. + pub fn recv(&self) -> Result { + let msg = unsafe { + self.queue.recv() + }; + let msg = msg as *mut Message; + let msg = unsafe { Box::from_raw(msg) }; + Ok(msg.data) + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + unsafe { + self.queue.release(|_| { + crate::printkln!("Release"); + true + }) + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + Receiver { + queue: self.queue.acquire(), + _phantom: PhantomData, + } + } +} + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Sender {:?}", *self.queue) + } +} + +// TODO: Move to err + +/// An error returned from the [`send`] method. +/// +/// The message could not be sent because the channel is disconnected. +/// +/// The error contains the message so it can be recovered. +/// +/// [`send`]: Sender::send +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError(pub T); + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendError(..)".fmt(f) + } +} + +/// An error returned from the [`recv`] method. +/// +/// A message could not be received because the channel is empty and disconnected. +/// +/// [`recv`]: Receiver::recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct RecvError; diff --git a/zephyr/src/sync/channel/counter.rs b/zephyr/src/sync/channel/counter.rs new file mode 100644 index 00000000..4b28ae84 --- /dev/null +++ b/zephyr/src/sync/channel/counter.rs @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: Apache-2.0 +//! Reference counter for channels. + +// This file is taken from crossbeam-channels, with modifications to be nostd. + + +extern crate alloc; + +use alloc::boxed::Box; +use core::ops; +use core::ptr::NonNull; +use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +/// Reference counter internals. +struct Counter { + /// The number of senders associated with the channel. + senders: AtomicUsize, + + /// The number of receivers associated with the channel. + receivers: AtomicUsize, + + /// Set to `true` if the last sender or the last receiver reference deallocates the channel. + destroy: AtomicBool, + + /// The internal channel. + chan: C, +} + +/// Wraps a channel into the reference counter. +pub(crate) fn new(chan: C) -> (Sender, Receiver) { + let counter = NonNull::from(Box::leak(Box::new(Counter { + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + destroy: AtomicBool::new(false), + chan, + }))); + let s = Sender { counter }; + let r = Receiver { counter }; + (s, r) +} + +/// The sending side. +pub(crate) struct Sender { + counter: NonNull>, +} + +impl Sender { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter { + unsafe { self.counter.as_ref() } + } + + /// Acquires another sender reference. + pub(crate) fn acquire(&self) -> Self { + let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); + + // Cloning senders and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + // TODO: We need some kind of equivalent here. + unimplemented!(); + } + + Self { + counter: self.counter, + } + } + + /// Releases the sender reference. + /// + /// Function `disconnect` will be called if this is the last sender reference. + pub(crate) unsafe fn release bool>(&self, disconnect: F) { + if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(unsafe { Box::from_raw(self.counter.as_ptr()) }); + } + } + } +} + +impl ops::Deref for Sender { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl PartialEq for Sender { + fn eq(&self, other: &Self) -> bool { + self.counter == other.counter + } +} + +/// The receiving side. +pub(crate) struct Receiver { + counter: NonNull>, +} + +impl Receiver { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter { + unsafe { self.counter.as_ref() } + } + + /// Acquires another receiver reference. + pub(crate) fn acquire(&self) -> Self { + let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); + + // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + unimplemented!(); + } + + Self { + counter: self.counter, + } + } + + /// Releases the receiver reference. + /// + /// Function `disconnect` will be called if this is the last receiver reference. + pub(crate) unsafe fn release bool>(&self, disconnect: F) { + if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(unsafe { Box::from_raw(self.counter.as_ptr()) }); + } + } + } +} + +impl ops::Deref for Receiver { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl PartialEq for Receiver { + fn eq(&self, other: &Self) -> bool { + self.counter == other.counter + } +} diff --git a/zephyr/src/sys.rs b/zephyr/src/sys.rs index dfb16e4d..0783d3af 100644 --- a/zephyr/src/sys.rs +++ b/zephyr/src/sys.rs @@ -11,6 +11,7 @@ use zephyr_sys::k_timeout_t; +pub mod queue; pub mod sync; pub mod thread; diff --git a/zephyr/src/sys/queue.rs b/zephyr/src/sys/queue.rs new file mode 100644 index 00000000..f111c613 --- /dev/null +++ b/zephyr/src/sys/queue.rs @@ -0,0 +1,102 @@ +//! Lightweight wrapper around Zephyr's `k_queue`. +//! +//! The underlying operations on the `k_queue` are all unsafe, as the model does not match the +//! borrowing model that Rust expects. This module is mainly intended to be used by the +//! implementation of `zephyr::sys::channel`, which can be used without needing unsafe. + +use core::ffi::c_void; +use core::fmt; +use core::mem; + +use zephyr_sys::{ + k_queue, + k_queue_init, + k_queue_append, + k_queue_get, +}; + +use crate::error::Result; +use crate::sys::K_FOREVER; +use crate::object::{Fixed, StaticKernelObject, Wrapped}; + +/// A wrapper around a Zephyr `k_queue` object. +pub struct Queue { + item: Fixed, +} + +unsafe impl Sync for StaticKernelObject { } + +unsafe impl Sync for Queue { } +unsafe impl Send for Queue { } + +impl Queue { + /// Create a new Queue, dynamically allocated. + /// + /// This Queue can only be used from system threads. + /// + /// **Note**: When a Queue is dropped, any messages that have been added to the queue will be + /// leaked. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + k_queue_init(item.get()); + } + Ok(Queue { item }) + } + + /// Append an element to the end of a queue. + /// + /// This adds an element to the given [`Queue`]. Zephyr requires the + /// first word of this message to be available for the OS to enqueue + /// the message. See [`Message`] for details on how this can be used + /// safely. + /// + /// [`Message`]: crate::sync::channel::Message + pub unsafe fn send(&self, data: *mut c_void) { + k_queue_append(self.item.get(), data) + } + + /// Get an element from a queue. + /// + /// This routine removes the first data item from the [`Queue`]. + pub unsafe fn recv(&self) -> *mut c_void { + k_queue_get(self.item.get(), K_FOREVER) + } +} + +impl Wrapped for StaticKernelObject { + type T = Queue; + + type I = (); + + fn get_wrapped(&self, _arg: Self::I) -> Queue { + let ptr = self.value.get(); + unsafe { + k_queue_init(ptr); + } + Queue { + item: Fixed::Static(ptr), + } + } +} + +/// A statically defined Zephyr `k_queue`. +/// +/// This should be declared as follows: +/// ``` +/// kobj_define! { +/// static MY_QUEUE: StaticQueue; +/// } +/// +/// let my_queue = MY_QUEUE.init_once(()); +/// +/// my_queue.send(...); +/// ``` +pub type StaticQueue = StaticKernelObject; + +impl fmt::Debug for Queue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Queue {:?}", self.item.get()) + } +} diff --git a/zephyr/src/sys/sync.rs b/zephyr/src/sys/sync.rs index b19b593f..19d459e6 100644 --- a/zephyr/src/sys/sync.rs +++ b/zephyr/src/sys/sync.rs @@ -30,117 +30,16 @@ //! Later, there will be a pool mechanism to allow these kernel objects to be allocated and freed //! from a pool, although the objects will still be statically allocated. -use core::ffi::c_uint; -use core::fmt; - -use crate::{ - error::{Result, to_result_void}, - object::{StaticKernelObject, Wrapped}, - raw::{ - k_sem, - k_sem_init, - k_sem_take, - k_sem_give, - k_sem_reset, - k_sem_count_get, - }, - time::Timeout, +pub mod mutex; +pub mod semaphore; + +pub use mutex::{ + Condvar, + StaticCondvar, + Mutex, + StaticMutex, +}; +pub use semaphore::{ + Semaphore, + StaticSemaphore, }; - -pub use crate::raw::K_SEM_MAX_LIMIT; - -/// A zephyr `k_sem` usable from safe Rust code. -#[derive(Clone)] -pub struct Semaphore { - /// The raw Zephyr `k_sem`. - item: *mut k_sem, -} - -/// By nature, Semaphores are both Sync and Send. Safety is handled by the underlying Zephyr -/// implementation (which is why Clone is also implemented). -unsafe impl Sync for Semaphore {} -unsafe impl Send for Semaphore {} - -impl Semaphore { - /// Take a semaphore. - /// - /// Can be called from ISR if called with [`NoWait`]. - /// - /// [`NoWait`]: crate::time::NoWait - pub fn take(&self, timeout: T) -> Result<()> - where T: Into, - { - let timeout: Timeout = timeout.into(); - let ret = unsafe { - k_sem_take(self.item, timeout.0) - }; - to_result_void(ret) - } - - /// Give a semaphore. - /// - /// This routine gives to the semaphore, unless the semaphore is already at its maximum - /// permitted count. - pub fn give(&self) { - unsafe { - k_sem_give(self.item) - } - } - - /// Resets a semaphor's count to zero. - /// - /// This resets the count to zero. Any outstanding [`take`] calls will be aborted with - /// `Error(EAGAIN)`. - /// - /// [`take`]: Self::take - pub fn reset(&mut self) { - unsafe { - k_sem_reset(self.item) - } - } - - /// Get a semaphore's count. - /// - /// Returns the current count. - pub fn count_get(&mut self) -> usize { - unsafe { - k_sem_count_get(self.item) as usize - } - } -} - -/// A static Zephyr `k_sem`. -/// -/// This is intended to be used from within the `kobj_define!` macro. It declares a static ksem -/// that will be properly registered with the Zephyr kernel object system. Call [`init_once`] to -/// get the [`Semaphore`] that is represents. -/// -/// [`init_once`]: StaticKernelObject::init_once -pub type StaticSemaphore = StaticKernelObject; - -unsafe impl Sync for StaticSemaphore {} - -impl Wrapped for StaticKernelObject { - type T = Semaphore; - - /// The initializer for Semaphores is the initial count, and the count limit (which can be - /// K_SEM_MAX_LIMIT, re-exported here. - type I = (c_uint, c_uint); - - // TODO: Thoughts about how to give parameters to the initialzation. - fn get_wrapped(&self, arg: Self::I) -> Semaphore { - let ptr = self.value.get(); - unsafe { - k_sem_init(ptr, arg.0, arg.1); - } - Semaphore { - item: ptr, - } - } -} - -impl fmt::Debug for Semaphore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sys::Semaphore") - } -} diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs new file mode 100644 index 00000000..66bf2a86 --- /dev/null +++ b/zephyr/src/sys/sync/mutex.rs @@ -0,0 +1,212 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Zephyr `k_mutex` wrapper. +//! +//! This module implements a thing wrapper around the `k_mutex` type in Zephyr. It works with the +//! kernel [`object`] system, to allow the mutexes to be defined statically. +//! +//! [`object`]: crate::object + +use core::fmt; +use core::mem; +use crate::{ + error::{Result, to_result_void}, + raw::{ + k_condvar, + k_condvar_init, + k_condvar_broadcast, + k_condvar_signal, + k_condvar_wait, + k_mutex, + k_mutex_init, + k_mutex_lock, + k_mutex_unlock, + }, + time::Timeout, +}; +use crate::object::{ + Fixed, + StaticKernelObject, + Wrapped, +}; +use crate::sys::K_FOREVER; + +/// A Zephyr `k_mutux` usable from safe Rust code. +/// +/// This merely wraps a pointer to the kernel object. It implements clone, send and sync as it is +/// safe to have multiple instances of these, as well as use them across multiple threads. +/// +/// Note that these are Safe in the sense that memory safety is guaranteed. Attempts to +/// recursively lock, or incorrect nesting can easily result in deadlock. +/// +/// Safety: Typically, the Mutex type in Rust does not implement Clone, and must be shared between +/// threads using Arc. However, these sys Mutexes are wrappers around static kernel objects, and +/// Drop doesn't make sense for them. In addition, Arc requires alloc, and one possible place to +/// make use of the sys Mutex is to be able to do so in an environment without alloc. +/// +/// This mutex type of only of limited use to application programs. It can be used as a simple +/// binary semaphore, although it has strict semantics, requiring the release to be called by the +/// same thread that called lock. It can be used to protect data that Rust itself is either not +/// managing, or is managing in an unsafe way. +/// +/// For a Mutex type that is useful in a Rust type of manner, please see the regular [`sync::Mutex`] +/// type. +/// +/// [`sync::Mutex`]: http://example.com/TODO +pub struct Mutex { + /// The raw Zephyr mutex. + item: Fixed, +} + +impl Mutex { + /// Create a new Mutex in an unlocked state. + /// + /// Create a new dynamically allocated Mutex. The Mutex can only be used from system threads. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + to_result_void(k_mutex_init(item.get()))?; + } + Ok(Mutex { item }) + } + + /// Lock a Zephyr Mutex. + /// + /// Will wait for the lock, returning status, with `Ok(())` indicating the lock has been + /// acquired, and an error indicating a timeout (Zephyr returns different errors depending on + /// the reason). + pub fn lock(&self, timeout: T) -> Result<()> + where T: Into, + { + let timeout: Timeout = timeout.into(); + to_result_void(unsafe { k_mutex_lock(self.item.get(), timeout.0) }) + } + + /// Unlock a Zephyr Mutex. + /// + /// The mutex must already be locked by the calling thread. Mutexes may not be unlocked in + /// ISRs. + pub fn unlock(&self) -> Result<()> { + to_result_void(unsafe { k_mutex_unlock(self.item.get()) }) + } +} + + +/// A static Zephyr `k_mutex` +/// +/// This is intended to be used from within the `kobj_define!` macro. It declares a static +/// `k_mutex` that will be properly registered with the Zephyr object system. Call [`init_once`] to +/// get the [`Mutex`] that it represents. +/// +/// [`init_once`]: StaticMutex::init_once +pub type StaticMutex = StaticKernelObject; + +unsafe impl Sync for Mutex {} +unsafe impl Send for Mutex {} + +// Sync and Send are meaningful, as the underlying Zephyr API can use these values from any thread. +// Care must be taken to use these in a safe manner. +unsafe impl Sync for StaticMutex {} +unsafe impl Send for StaticMutex {} + +impl Wrapped for StaticKernelObject { + type T = Mutex; + + /// Mutex initializers take no argument. + type I = (); + + fn get_wrapped(&self, _arg: Self::I) -> Mutex { + let ptr = self.value.get(); + unsafe { + k_mutex_init(ptr); + } + Mutex { + item: Fixed::Static(ptr), + } + } +} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Mutex {:?}", self.item.get()) + } +} + +/// A Condition Variable +/// +/// Lightweight wrappers for Zephyr's `k_condvar`. +pub struct Condvar { + /// The underlying `k_condvar`. + item: Fixed, +} + +#[doc(hidden)] +pub type StaticCondvar = StaticKernelObject; + +unsafe impl Sync for StaticKernelObject { } + +unsafe impl Sync for Condvar {} +unsafe impl Send for Condvar {} + +impl Condvar { + /// Create a new Condvar. + /// + /// Create a new dynamically allocated Condvar. The Condvar can only be used from system threads. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + to_result_void(k_condvar_init(item.get()))?; + } + Ok(Condvar { item }) + } + + /// Wait for someone else using this mutex/condvar pair to notify. + /// + /// Note that this requires the lock to be held by use, but as this is a low-level binding to + /// Zephyr's interfaces, this is not enforced. See [`sync::Condvar`] for a safer and easier to + /// use interface. + /// + /// [`sync::Condvar`]: http://www.example.com/TODO + // /// [`sync::Condvar`]: crate::sync::Condvar + pub fn wait(&self, lock: &Mutex) { + unsafe { k_condvar_wait(self.item.get(), lock.item.get(), K_FOREVER); } + } + + // TODO: timeout. + + /// Wake a single thread waiting on this condition variable. + pub fn notify_one(&self) { + unsafe { k_condvar_signal(self.item.get()); } + } + + /// Wake all threads waiting on this condition variable. + pub fn notify_all(&self) { + unsafe { k_condvar_broadcast(self.item.get()); } + } +} + +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Condvar {:?}", self.item.get()) + } +} + +impl Wrapped for StaticCondvar { + type T = Condvar; + + /// Condvar initializers take no argument. + type I = (); + + fn get_wrapped(&self, _arg: Self::I) -> Condvar { + let ptr = self.value.get(); + unsafe { + k_condvar_init(ptr); + } + Condvar { + item: Fixed::Static(ptr), + } + } +} diff --git a/zephyr/src/sys/sync/semaphore.rs b/zephyr/src/sys/sync/semaphore.rs new file mode 100644 index 00000000..ddb54d04 --- /dev/null +++ b/zephyr/src/sys/sync/semaphore.rs @@ -0,0 +1,136 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Zephyr Semaphore support +//! +//! This is a thin wrapper around Zephyr's `k_sem`. This is one of the few of the `sys` primitives +//! in Zephyr that is actually perfectly usable on its own, without needing additional wrappers. +//! +//! Zephyr implements counting semaphores, with both an upper and lower bound on the count. Note +//! that calling 'give' on a semaphore that is at the maximum count will discard the 'give' +//! operation, which in situation where counting is actually desired, will result in the count being +//! incorrect. + +use core::ffi::c_uint; +use core::fmt; +use core::mem; + +use crate::{ + error::{to_result_void, Result}, + object::{Fixed, StaticKernelObject, Wrapped}, + raw::{ + k_sem, k_sem_count_get, k_sem_give, k_sem_init, k_sem_reset, k_sem_take + }, + time::Timeout, +}; + +pub use crate::raw::K_SEM_MAX_LIMIT; + +/// A zephyr `k_sem` usable from safe Rust code. +pub struct Semaphore { + /// The raw Zephyr `k_sem`. + item: Fixed, +} + +/// By nature, Semaphores are both Sync and Send. Safety is handled by the underlying Zephyr +/// implementation (which is why Clone is also implemented). +unsafe impl Sync for Semaphore {} +unsafe impl Send for Semaphore {} + +impl Semaphore { + /// Create a new semaphore. + /// + /// Create a new dynamically allocated Semaphore. This semaphore can only be used from system + /// threads. The arguments are as described in [the + /// docs](https://docs.zephyrproject.org/latest/kernel/services/synchronization/semaphores.html). + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new(initial_count: c_uint, limit: c_uint) -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + to_result_void(k_sem_init(item.get(), initial_count, limit))?; + } + Ok(Semaphore { item }) + } + + /// Take a semaphore. + /// + /// Can be called from ISR if called with [`NoWait`]. + /// + /// [`NoWait`]: crate::time::NoWait + pub fn take(&self, timeout: T) -> Result<()> + where T: Into, + { + let timeout: Timeout = timeout.into(); + let ret = unsafe { + k_sem_take(self.item.get(), timeout.0) + }; + to_result_void(ret) + } + + /// Give a semaphore. + /// + /// This routine gives to the semaphore, unless the semaphore is already at its maximum + /// permitted count. + pub fn give(&self) { + unsafe { + k_sem_give(self.item.get()) + } + } + + /// Resets a semaphor's count to zero. + /// + /// This resets the count to zero. Any outstanding [`take`] calls will be aborted with + /// `Error(EAGAIN)`. + /// + /// [`take`]: Self::take + pub fn reset(&mut self) { + unsafe { + k_sem_reset(self.item.get()) + } + } + + /// Get a semaphore's count. + /// + /// Returns the current count. + pub fn count_get(&mut self) -> usize { + unsafe { + k_sem_count_get(self.item.get()) as usize + } + } +} + +/// A static Zephyr `k_sem`. +/// +/// This is intended to be used from within the `kobj_define!` macro. It declares a static ksem +/// that will be properly registered with the Zephyr kernel object system. Call [`init_once`] to +/// get the [`Semaphore`] that is represents. +/// +/// [`init_once`]: StaticKernelObject::init_once +pub type StaticSemaphore = StaticKernelObject; + +unsafe impl Sync for StaticSemaphore {} + +impl Wrapped for StaticKernelObject { + type T = Semaphore; + + /// The initializer for Semaphores is the initial count, and the count limit (which can be + /// K_SEM_MAX_LIMIT, re-exported here. + type I = (c_uint, c_uint); + + // TODO: Thoughts about how to give parameters to the initialzation. + fn get_wrapped(&self, arg: Self::I) -> Semaphore { + let ptr = self.value.get(); + unsafe { + k_sem_init(ptr, arg.0, arg.1); + } + Semaphore { + item: Fixed::Static(ptr), + } + } +} + +impl fmt::Debug for Semaphore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Semaphore") + } +}