Skip to content

Support for dynamic dictionaries of buffers #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
36d604b
Introducing the concept of a buffer map
mxgrey Jan 23, 2025
d9b9be1
Refactoring Buffered trait
mxgrey Jan 24, 2025
763244b
Figuring out how to dynamically access buffers
mxgrey Jan 24, 2025
96953c5
Prototype of AnyBufferMut
mxgrey Jan 27, 2025
f3ddbf5
Any buffer access is working
mxgrey Jan 29, 2025
9935817
Add AnyBuffer drain and restructure any_buffer module
mxgrey Jan 29, 2025
442c676
Clean up any_buffer implementation
mxgrey Jan 29, 2025
d44e383
Introducing JsonBuffer type
mxgrey Jan 29, 2025
70cec2c
Refactoring the use of AnyBufferStorageAccessInterface
mxgrey Jan 30, 2025
8f79d53
Fix global retention of AnyBufferAccessInterface
mxgrey Jan 30, 2025
b21c218
Fleshing out implementation of JsonBuffer
mxgrey Jan 30, 2025
792e0b6
Almost done with JsonBuffer
mxgrey Jan 31, 2025
ae65f24
Implemented Joined and Accessed for AnyBuffer and JsonBuffer
mxgrey Feb 2, 2025
7c9d490
Add tests for JsonBuffer
mxgrey Feb 2, 2025
0d1224f
Implement JsonBufferView
mxgrey Feb 2, 2025
5dcae45
Implement AnyBufferView
mxgrey Feb 2, 2025
ca7a226
Draft an example implementation of JoinedValue
mxgrey Feb 2, 2025
096ccf4
Fix formatting
mxgrey Feb 2, 2025
800a112
Remove use of 1.75 unstable function
mxgrey Feb 2, 2025
de6596d
Implement more general buffer downcasting
mxgrey Feb 3, 2025
bd32a47
fix doc comments
mxgrey Feb 3, 2025
c419ab3
Refactor buffer keys
mxgrey Feb 3, 2025
a6bf08d
Implement more general buffer key downcasting
mxgrey Feb 3, 2025
a40b8d3
Invert the implementation of JoinedValue
mxgrey Feb 6, 2025
617e22f
Refactor Bufferable so we no longer need join_into
mxgrey Feb 6, 2025
4c087ea
Fix style
mxgrey Feb 6, 2025
858c7e6
Update docs
mxgrey Feb 6, 2025
aa22f87
Fix style
mxgrey Feb 6, 2025
5dd9a30
Merge branch 'main' into buffer_map
mxgrey Feb 14, 2025
f64871e
add derive `JoinedValue` to implement trait automatically (#53)
koonpeng Feb 17, 2025
e94c170
Generalize buffer map keys
mxgrey Feb 17, 2025
6e19af7
Support for joining Vec and json values
mxgrey Feb 17, 2025
4e35ec8
Working towards BufferKeyMap implementation
mxgrey Feb 18, 2025
87387e3
BufferKeyMap macro (#56)
mxgrey Feb 20, 2025
47bd4d0
Merge branch 'main' into buffer_map
mxgrey Feb 20, 2025
c096209
Rename traits to improve semantics (#57)
mxgrey Feb 20, 2025
911a5e1
Tighten up leaks
mxgrey Feb 21, 2025
8a7603c
Fix style
mxgrey Feb 21, 2025
530c92f
Iterate on require buffer functions
mxgrey Feb 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ jobs:
run: cargo test --features single_threaded_async

- name: Build docs
run: cargo doc
run: cargo doc --all-features

203 changes: 152 additions & 51 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@ use bevy_ecs::{

use std::{ops::RangeBounds, sync::Arc};

use thiserror::Error as ThisError;

use crate::{
Builder, Chain, Gate, GateState, InputSlot, NotifyBufferUpdate, OnNewBufferValue, UnusedTarget,
};

mod any_buffer;
pub use any_buffer::*;

mod buffer_access_lifecycle;
pub(crate) use buffer_access_lifecycle::*;

mod buffer_key_builder;
pub(crate) use buffer_key_builder::*;

mod buffer_map;
pub use buffer_map::*;

mod buffer_storage;
pub(crate) use buffer_storage::*;

Expand All @@ -46,12 +54,16 @@ pub use bufferable::*;
mod manage_buffer;
pub use manage_buffer::*;

#[cfg(feature = "diagram")]
mod json_buffer;
#[cfg(feature = "diagram")]
pub use json_buffer::*;

/// A buffer is a special type of node within a workflow that is able to store
/// and release data. When a session is finished, the buffered data from the
/// session will be automatically cleared.
pub struct Buffer<T> {
pub(crate) scope: Entity,
pub(crate) source: Entity,
pub(crate) location: BufferLocation,
pub(crate) _ignore: std::marker::PhantomData<fn(T)>,
}

Expand All @@ -61,11 +73,11 @@ impl<T> Buffer<T> {
&self,
builder: &'b mut Builder<'w, 's, 'a>,
) -> Chain<'w, 's, 'a, 'b, ()> {
assert_eq!(self.scope, builder.scope);
assert_eq!(self.scope(), builder.scope);
let target = builder.commands.spawn(UnusedTarget).id();
builder
.commands
.add(OnNewBufferValue::new(self.source, target));
.add(OnNewBufferValue::new(self.id(), target));
Chain::new(target, builder)
}

Expand All @@ -77,24 +89,82 @@ impl<T> Buffer<T> {
T: Clone,
{
CloneFromBuffer {
scope: self.scope,
source: self.source,
location: self.location,
_ignore: Default::default(),
}
}

/// Get an input slot for this buffer.
pub fn input_slot(self) -> InputSlot<T> {
InputSlot::new(self.scope, self.source)
InputSlot::new(self.scope(), self.id())
}

/// Get the entity ID of the buffer.
pub fn id(&self) -> Entity {
self.location.source
}

/// Get the ID of the workflow that the buffer is associated with.
pub fn scope(&self) -> Entity {
self.location.scope
}

/// Get general information about the buffer.
pub fn location(&self) -> BufferLocation {
self.location
}

/// Cast this into an [`AnyBuffer`].
pub fn as_any_buffer(&self) -> AnyBuffer
where
T: 'static + Send + Sync,
{
self.clone().into()
}
}

impl<T> Clone for Buffer<T> {
fn clone(&self) -> Self {
*self
}
}

impl<T> Copy for Buffer<T> {}

/// The general identifying information for a buffer to locate it within the
/// world. This does not indicate anything about the type of messages that the
/// buffer can contain.
#[derive(Clone, Copy, Debug)]
pub struct BufferLocation {
/// The entity ID of the buffer.
pub scope: Entity,
/// The ID of the workflow that the buffer is associated with.
pub source: Entity,
}

#[derive(Clone, Copy)]
pub struct CloneFromBuffer<T: Clone> {
pub(crate) scope: Entity,
pub(crate) source: Entity,
pub(crate) location: BufferLocation,
pub(crate) _ignore: std::marker::PhantomData<fn(T)>,
}

impl<T: Clone> CloneFromBuffer<T> {
/// Get the entity ID of the buffer.
pub fn id(&self) -> Entity {
self.location.source
}

/// Get the ID of the workflow that the buffer is associated with.
pub fn scope(&self) -> Entity {
self.location.scope
}

/// Get general information about the buffer.
pub fn location(&self) -> BufferLocation {
self.location
}
}

/// Settings to describe the behavior of a buffer.
#[derive(Default, Clone, Copy)]
pub struct BufferSettings {
Expand Down Expand Up @@ -157,62 +227,44 @@ impl Default for RetentionPolicy {
}
}

impl<T> Clone for Buffer<T> {
fn clone(&self) -> Self {
*self
}
}

impl<T> Copy for Buffer<T> {}

impl<T: Clone> Clone for CloneFromBuffer<T> {
fn clone(&self) -> Self {
*self
}
}

impl<T: Clone> Copy for CloneFromBuffer<T> {}

/// This key can unlock access to the contents of a buffer by passing it into
/// [`BufferAccess`] or [`BufferAccessMut`].
///
/// To obtain a `BufferKey`, use [`Chain::with_access`][1], or [`listen`][2].
///
/// [1]: crate::Chain::with_access
/// [2]: crate::Bufferable::listen
/// [2]: crate::Accessible::listen
pub struct BufferKey<T> {
buffer: Entity,
session: Entity,
accessor: Entity,
lifecycle: Option<Arc<BufferAccessLifecycle>>,
tag: BufferKeyTag,
_ignore: std::marker::PhantomData<fn(T)>,
}

impl<T> Clone for BufferKey<T> {
fn clone(&self) -> Self {
Self {
buffer: self.buffer,
session: self.session,
accessor: self.accessor,
lifecycle: self.lifecycle.as_ref().map(Arc::clone),
tag: self.tag.clone(),
_ignore: Default::default(),
}
}
}

impl<T> BufferKey<T> {
/// The buffer ID of this key.
pub fn id(&self) -> Entity {
self.buffer
pub fn buffer(&self) -> Entity {
self.tag.buffer
}

/// The session that this key belongs to.
pub fn session(&self) -> Entity {
self.session
self.tag.session
}

pub fn tag(&self) -> &BufferKeyTag {
&self.tag
}

pub(crate) fn is_in_use(&self) -> bool {
self.lifecycle.as_ref().is_some_and(|l| l.is_in_use())
self.tag.is_in_use()
}

// We do a deep clone of the key when distributing it to decouple the
Expand All @@ -224,6 +276,38 @@ impl<T> BufferKey<T> {
// need to have their own independent lifecycles or else we won't detect
// when the workflow has dropped them.
pub(crate) fn deep_clone(&self) -> Self {
Self {
tag: self.tag.deep_clone(),
_ignore: Default::default(),
}
}
}

impl<T> std::fmt::Debug for BufferKey<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufferKey")
.field("message_type_name", &std::any::type_name::<T>())
.field("tag", &self.tag)
.finish()
}
}

/// The identifying information for a buffer key. This does not indicate
/// anything about the type of messages that the buffer can contain.
#[derive(Clone)]
pub struct BufferKeyTag {
pub buffer: Entity,
pub session: Entity,
pub accessor: Entity,
pub lifecycle: Option<Arc<BufferAccessLifecycle>>,
}

impl BufferKeyTag {
pub fn is_in_use(&self) -> bool {
self.lifecycle.as_ref().is_some_and(|l| l.is_in_use())
}

pub fn deep_clone(&self) -> Self {
let mut deep = self.clone();
deep.lifecycle = self
.lifecycle
Expand All @@ -233,6 +317,17 @@ impl<T> BufferKey<T> {
}
}

impl std::fmt::Debug for BufferKeyTag {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufferKeyTag")
.field("buffer", &self.buffer)
.field("session", &self.session)
.field("accessor", &self.accessor)
.field("in_use", &self.is_in_use())
.finish()
}
}

/// This system parameter lets you get read-only access to a buffer that exists
/// within a workflow. Use a [`BufferKey`] to unlock the access.
///
Expand All @@ -247,9 +342,9 @@ where

impl<'w, 's, T: 'static + Send + Sync> BufferAccess<'w, 's, T> {
pub fn get<'a>(&'a self, key: &BufferKey<T>) -> Result<BufferView<'a, T>, QueryEntityError> {
let session = key.session;
let session = key.session();
self.query
.get(key.buffer)
.get(key.buffer())
.map(|(storage, gate)| BufferView {
storage,
gate,
Expand All @@ -276,9 +371,9 @@ where
T: 'static + Send + Sync,
{
pub fn get<'a>(&'a self, key: &BufferKey<T>) -> Result<BufferView<'a, T>, QueryEntityError> {
let session = key.session;
let session = key.session();
self.query
.get(key.buffer)
.get(key.buffer())
.map(|(storage, gate)| BufferView {
storage,
gate,
Expand All @@ -290,10 +385,10 @@ where
&'a mut self,
key: &BufferKey<T>,
) -> Result<BufferMut<'w, 's, 'a, T>, QueryEntityError> {
let buffer = key.buffer;
let session = key.session;
let accessor = key.accessor;
self.query.get_mut(key.buffer).map(|(storage, gate)| {
let buffer = key.buffer();
let session = key.session();
let accessor = key.tag.accessor;
self.query.get_mut(key.buffer()).map(|(storage, gate)| {
BufferMut::new(storage, gate, buffer, session, accessor, &mut self.commands)
})
}
Expand Down Expand Up @@ -424,7 +519,7 @@ where
self.len() == 0
}

/// Check whether the gate of this buffer is open or closed
/// Check whether the gate of this buffer is open or closed.
pub fn gate(&self) -> Gate {
self.gate
.map
Expand Down Expand Up @@ -467,7 +562,7 @@ where
self.storage.drain(self.session, range)
}

/// Pull the oldest item from the buffer
/// Pull the oldest item from the buffer.
pub fn pull(&mut self) -> Option<T> {
self.modified = true;
self.storage.pull(self.session)
Expand Down Expand Up @@ -500,7 +595,7 @@ where
// continuous systems with BufferAccessMut from running at the same time no
// matter what the buffer type is.

/// Tell the buffer [`Gate`] to open
/// Tell the buffer [`Gate`] to open.
pub fn open_gate(&mut self) {
if let Some(gate) = self.gate.map.get_mut(&self.session) {
if *gate != Gate::Open {
Expand All @@ -510,7 +605,7 @@ where
}
}

/// Tell the buffer [`Gate`] to close
/// Tell the buffer [`Gate`] to close.
pub fn close_gate(&mut self) {
if let Some(gate) = self.gate.map.get_mut(&self.session) {
*gate = Gate::Closed;
Expand All @@ -519,7 +614,7 @@ where
}
}

/// Perform an action on the gate of the buffer
/// Perform an action on the gate of the buffer.
pub fn gate_action(&mut self, action: Gate) {
match action {
Gate::Open => self.open_gate(),
Expand Down Expand Up @@ -569,6 +664,12 @@ where
}
}

#[derive(ThisError, Debug, Clone)]
pub enum BufferError {
#[error("The key was unable to identify a buffer")]
BufferMissing,
}

#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*, Gate};
Expand Down
Loading
Loading