Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Expand Core API documentation on RPC and Pub/Sub messaging #5333

Open
ekzhu opened this issue Feb 3, 2025 · 1 comment
Open

[Python] Expand Core API documentation on RPC and Pub/Sub messaging #5333

ekzhu opened this issue Feb 3, 2025 · 1 comment
Assignees
Labels
documentation Improvements or additions to documentation needs-triage proj-core
Milestone

Comments

@ekzhu
Copy link
Collaborator

ekzhu commented Feb 3, 2025

Currently the only documentation for RPC and Pub/Sub messaging is in the two sub sections: https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/framework/message-and-communication.html#message-and-communication

Several recent issues show a gap of understanding in using this. e.g., #5317

Let's expand the documentation on messaging into two new sections:

  • Direct (RPC) Messages
  • Broadcast (Pub-Sub) Messages

In the Broadcast Messages section, explain the following topics:

  1. How to publish a message: from the runtime directly, and from an agent to a TopicId
  2. How to subscribe to a topic type: using typic_subscription decorator, and using runtime.add_subscription method.
  3. Show an simple example of the Modifier-Checker using add_subscription and publish to TopicId specifically.
  4. A more advanced example of multiple subscriptions for Checker and two Modifier agents.
  5. Show how to use default_subscription and DefaultTopicId to simplify the code -- and a note on a global publishing scope.

Other tasks:

  • Link to the topic and subscription core concept and cookbook.
@ekzhu ekzhu added documentation Improvements or additions to documentation proj-core labels Feb 3, 2025
@ekzhu ekzhu added this to the python-v0.4.6 milestone Feb 3, 2025
@ekzhu ekzhu self-assigned this Feb 3, 2025
@ekzhu
Copy link
Collaborator Author

ekzhu commented Feb 3, 2025

Example of 1 Modifier and 1 Checker

import asyncio
from dataclasses import dataclass
from typing import Callable

from autogen_core import (
    AgentId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TypeSubscription,
    message_handler,
    TopicId,
)


@dataclass
class Message:
    content: int


class Modifier(RoutedAgent):
    def __init__(self, topic_type: str, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent.")
        self._topic_type = topic_type
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\nModifier ({self.id.type}):\nModified {message.content} to {val}")
        # Publish the modified value to its own topic.
        await self.publish_message(Message(content=val), TopicId(type=self._topic_type, source=self.id.key))


class Checker(RoutedAgent):
    def __init__(self, run_until: Callable[[int], bool]) -> None:
        super().__init__("A checker agent.")
        self._run_until = run_until

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if not self._run_until(message.content):
            print(f"{'-'*80}\nChecker on topic {ctx.topic_id.type}:\n{message.content} passed the check, continue.")  # type: ignore
            # Publish the message to the same topic it received from.
            await self.publish_message(Message(content=message.content), ctx.topic_id)  # type: ignore
        else:
            print(f"{'-'*80}\nChecker on topic {ctx.topic_id.type}:\n{message.content} failed the check, stopping.")  # type: ignore


async def main() -> None:
    # Create an local embedded runtime.
    runtime = SingleThreadedAgentRuntime()

    topic_type = "modifier_checker"

    # Register the modifier and checker agents by providing
    # their agent types, the factory functions for creating instance and subscriptions.
    await Modifier.register(
        runtime,
        "modifier",
        lambda: Modifier(topic_type=topic_type, modify_val=lambda x: x - 1),
    )
    await runtime.add_subscription(TypeSubscription(topic_type, "modifier"))

    await Checker.register(
        runtime,
        "checker",
        # Run until the value is less than or equal to 1
        lambda: Checker(run_until=lambda x: x <= 1),
    )
    await runtime.add_subscription(TypeSubscription(topic_type, "checker"))

    # Start the runtime and send a direct message to both modifiers.
    runtime.start()
    await runtime.send_message(Message(content=10), AgentId("modifier", "default"))
    await runtime.stop_when_idle()


asyncio.run(main())

Example of 2 Modifiers and 1 Checker with multiple subscriptions.

import asyncio
from dataclasses import dataclass
from typing import Callable

from autogen_core import (
    AgentId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TypeSubscription,
    message_handler,
    TopicId,
)


@dataclass
class Message:
    content: int


class Modifier(RoutedAgent):
    def __init__(self, topic_type: str, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent.")
        self._topic_type = topic_type
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\nModifier ({self.id.type}):\nModified {message.content} to {val}")
        # Publish the modified value to its own topic.
        await self.publish_message(Message(content=val), TopicId(self._topic_type, self.id.key))


class Checker(RoutedAgent):
    def __init__(self, run_until: Callable[[int], bool]) -> None:
        super().__init__("A checker agent.")
        self._run_until = run_until

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if not self._run_until(message.content):
            print(f"{'-'*80}\nChecker on topic {ctx.topic_id.type}:\n{message.content} passed the check, continue.")  # type: ignore
            # Publish the message to the same topic it received from.
            await self.publish_message(Message(content=message.content), ctx.topic_id)  # type: ignore
        else:
            print(f"{'-'*80}\nChecker on topic {ctx.topic_id.type}:\n{message.content} failed the check, stopping.")  # type: ignore


async def main() -> None:
    # Create an local embedded runtime.
    runtime = SingleThreadedAgentRuntime()

    topic_type_1 = "modifier1_checker"
    topic_type_2 = "modifier2_checker"

    # Register the modifier and checker agents by providing
    # their agent types, the factory functions for creating instance and subscriptions.
    await Modifier.register(
        runtime,
        "modifier1",
        lambda: Modifier(topic_type=topic_type_1, modify_val=lambda x: x - 1),
    )
    # First modifer is subscribed to its own topic.
    await runtime.add_subscription(TypeSubscription(topic_type_1, "modifier1"))

    await Modifier.register(
        runtime,
        "modifier2",
        lambda: Modifier(topic_type=topic_type_2, modify_val=lambda x: x - 1),
    )
    # Second modifer also subscribed to its own topic.
    await runtime.add_subscription(TypeSubscription(topic_type_2, "modifier2"))

    await Checker.register(
        runtime,
        "checker",
        # Run until the value is less than or equal to 1
        lambda: Checker(run_until=lambda x: x <= 1),
    )
    # Checker is subscribed to the topic of both modifiers.
    await runtime.add_subscription(TypeSubscription(topic_type_1, "checker"))
    await runtime.add_subscription(TypeSubscription(topic_type_2, "checker"))

    # Start the runtime and send a direct message to both modifiers.
    runtime.start()
    await runtime.send_message(Message(content=10), AgentId("modifier1", "default"))
    await runtime.send_message(Message(content=10), AgentId("modifier2", "default"))
    await runtime.stop_when_idle()


asyncio.run(main())

@ekzhu ekzhu changed the title Expand Core API documentation on RPC and Pub/Sub messaging [Python] Expand Core API documentation on RPC and Pub/Sub messaging Feb 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation needs-triage proj-core
Projects
None yet
Development

No branches or pull requests

1 participant