Skip to content

Conversation

@191220029
Copy link
Collaborator

Better IP management

This PR introduces several improvements to the information packet (IP) management in Dagrs:

New Features

InChannels

Added recv_any method to receive data from any available channel without specifying the sender's ID, which returns a tuple of (sender_id, content) to identify the message source.

OutChannels

Added get_receiver_ids method to query available receiver nodes.

Example

recv_any_example.rs demonstrates how to use the recv_any method of InChannels to receive data from any available channel.

This example creates a graph with two senders and one receiver:

  • A normal sender that sends messages immediately
  • A slow sender that delays 500ms before sending messages
  • A receiver that uses recv_any to receive messages from either sender
/// An action that sends a message to its output channel
#[derive(Default)]
pub struct SenderAction {
    message: String,
}

impl SenderAction {
    pub fn new(message: String) -> Self {
        Self { message }
    }
}

#[async_trait]
impl TypedAction for SenderAction {
    type I = ();
    type O = String;

    async fn run(
        &self,
        _: TypedInChannels<Self::I>,
        out: TypedOutChannels<Self::O>,
        _: Arc<EnvVar>,
    ) -> Output {
        // Send the message to all receivers
        out.broadcast(self.message.clone()).await;
        Output::Out(None)
    }
}

/// An action that sends a message to its output channel after a delay
#[derive(Default)]
pub struct SlowSenderAction {
    message: String,
}

impl SlowSenderAction {
    pub fn new(message: String) -> Self {
        Self { message }
    }
}

#[async_trait]
impl TypedAction for SlowSenderAction {
    type I = ();
    type O = String;

    async fn run(
        &self,
        _: TypedInChannels<Self::I>,
        out: TypedOutChannels<Self::O>,
        _: Arc<EnvVar>,
    ) -> Output {
        // Wait for 500ms before sending
        sleep(Duration::from_millis(500)).await;
        // Send the message to all receivers
        out.broadcast(self.message.clone()).await;
        Output::Out(None)
    }
}

So when the receiver calls the "recv_any" method, it will always receive messages from the normal sender, or to be more precise, most of the time. And the second time the receiver calls the "recv_any" method, it will receive messages from the slow sender.

Related Issue

#106 #104

@191220029 191220029 added this to the 0.5.0 milestone Jun 11, 2025
@191220029 191220029 self-assigned this Jun 11, 2025
@191220029 191220029 added the enhancement New feature or request label Jun 11, 2025
@191220029 191220029 requested a review from genedna June 11, 2025 07:17
@191220029 191220029 marked this pull request as ready for review June 11, 2025 07:18
@genedna genedna merged commit 02096e4 into dagrs-dev:main Jun 11, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants