Skip to content

[Feature Request] Suggest Providing activities extraction utilities to get them from a class and a module #758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
spacether opened this issue Jan 31, 2025 · 6 comments · May be fixed by #759
Labels
enhancement New feature or request

Comments

@spacether
Copy link

spacether commented Jan 31, 2025

Is your feature request related to a problem? Please describe.

Creating activities and decorating them is easy using activy.def in python
Making sure that they are all included in worker launch is more difficult.

Describe the solution you'd like

It would be helpful if a utility was provided that allowed

  • extracting activities from a class
  • extracting activities from a class instance
  • extracting activities from a module

Below is a sample implementation for extraction from class and class instances, assuming async method implementations that uses ast and inspection.

It looks like one could find which methods are decorate by checking if fn.__temporal_activity_definition but that is a private variable name and is not exposed in the temporalio activity.py module. Code that uses __temporal_activity_definition would be simpler and not invoking ast.parse(inspect.getsource(cls)) is preferrable.

import ast
import inspect
import typing


class _MyNodeVisitor(ast.NodeVisitor):
    def __init__(self):
        self.fn_name_to_decorators: dict[str, set[str]] = {}

    def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef):
        self.fn_name_to_decorators[node.name] = set()
        for decorator in node.decorator_list:
            print(decorator)
            if isinstance(decorator, ast.Call):
                # noinspection PyUnresolvedReferences
                name = (
                    decorator.func.attr
                    if isinstance(decorator.func, ast.Attribute)
                    else decorator.func.id
                )
            else:
                # noinspection PyUnresolvedReferences
                name = (
                    decorator.value.id + "." + decorator.attr
                    if isinstance(decorator, ast.Attribute)
                    else decorator.id
                )

            self.fn_name_to_decorators[node.name].add(name)

    def get_fn_name_to_decorators(self) -> dict[str, set[str]]:
        return self.fn_name_to_decorators


class ActivitiesListProvider:
    @classmethod
    def __get_activities(
        cls,
        instance: typing.Union[
            type["ActivitiesListProvider"], "ActivitiesListProvider"
        ],
    ) -> list[typing.Callable]:
        visitor = _MyNodeVisitor()
        visitor.visit(ast.parse(inspect.getsource(cls)))
        fn_name_to_decorators: dict[str, set[str]] = visitor.get_fn_name_to_decorators()

        activities = []
        for fn_name, decorators in fn_name_to_decorators.items():
            if "activity.defn" in decorators:
                method = getattr(instance, fn_name)
                activities.append(method)
        return activities

    def get_activities_from_instance(self) -> list[typing.Callable]:
        return self.__get_activities(self)

    @classmethod
    def get_activities_from_cls(cls) -> list[typing.Callable]:
        return cls.__get_activities(cls)

And some Tests:

from workflow_metrics.temporal_tools import activities_class

from temporalio import activity


class SomeActivities(activities_class.ActivitiesListProvider):
    @activity.defn
    async def instance_method_activity(self):
        pass

    @activity.defn
    async def class_method_activity(self):
        pass

    @staticmethod
    @activity.defn
    async def static_method_activity():
        pass


def test_get_activities_from_cls():
    assert SomeActivities.get_activities_from_cls() == [
        SomeActivities.instance_method_activity,
        SomeActivities.class_method_activity,
        SomeActivities.static_method_activity,
    ]


class ActivitiesClassThatNeedsInstance(activities_class.ActivitiesListProvider):
    @activity.defn
    async def instance_method_activity(self):
        pass

    @activity.defn
    async def class_method_activity(self):
        pass

    @staticmethod
    @activity.defn
    async def static_method_activity():
        pass


def test_get_activities_from_instance():
    inst = ActivitiesClassThatNeedsInstance()
    assert inst.get_activities_from_instance() == [
        inst.instance_method_activity,
        inst.class_method_activity,
        inst.static_method_activity,
    ]

Additional context

In code that I am working on activities are mainly defined in one module when they are fns and in class methods.

@spacether spacether added the enhancement New feature or request label Jan 31, 2025
@spacether spacether changed the title [Feature Request] Suggest Providing a method to get activities from a class, a fn to get activities from a module [Feature Request] Suggest Providing activities extraction utilities to get them from a class and a module Jan 31, 2025
@spacether spacether linked a pull request Feb 2, 2025 that will close this issue
@cretz
Copy link
Member

cretz commented Feb 3, 2025

Sorry, just noticed this feature issue. Having discussion if viability on PR at #759 (comment). Also circulating with team.

@dandavison
Copy link
Contributor

Hi @spacether, thanks for the input here. Before getting into implementation, can you expand on the problem that you want to see solved and what you see as the requirements for possible solutions? I.e. expand on this

Making sure that they are all included in worker launch is more difficult.

E.g. what is it that you don't like currently, and, without speculating about specific implementations, what sorts of behaviors / semantics regarding activities would you like users to be able to express when starting a worker? It might help if you sketch some Worker launch code featuring imaginary APIs / function calls that would give you the semantics that you want.

@spacether
Copy link
Author

spacether commented Feb 3, 2025

So right now it is entirely up to a develop to manually build an explicit list of activities in python. Our activities are already segregated by the kind fo work that they do into a python modules and activity classes. So then when we need to use them in a worker, one has to to list every activity decorated funcction in that module and in one or two classes when we already know that the activities I need come from these 3 sources: 1 module, and two classes. Adding these utilities lets me vend activities from those sources easily.

I would like:

worker = Worker(
  activities=[*ActivitiesClass.get_acitivities(), *get_activities(activities_module)]
)

or

worker = Worker(
  activities_classes=(ActivitiesClass,)
  activities_modules=(activities_module),
)

or

worker = Worker(
  activities=[ActivitiesClass, activities_module, etc...]
)

One could make activities accept a list of (callables or a module or a class that has activities methods in it), where all of the activities from the class or module would be loaded into the worker.

@noxasaxon
Copy link

I recently came to the same conclusion as @spacether and made a similar function for collecting activities as part of a utility library for enforcing best practices at test time to avoid runtime failures.

My developers use a collection function in a worker.py file so that they don't forget to add an activity method to the worker every time they write a new one.

I also have a function that is intended to run in a test, you just import your project's temporal directory module and it gets every @activity.defn method in the module and submodules and so that we can validate everything automatically:

Repo is very much a work in progress still, not much documentation for the function-based validation as opposed to inheriting from a special validator class but the collection methods are here:

https://github.com/noxasaxon/temporal_utils_python/blob/main/src/temporal_utils/collectors.py

@charlesmelby
Copy link

charlesmelby commented Jun 5, 2025

I've resorted more than once to wrapping temporal's activity and workflow decorators in my own to make activity and workflow collection easy.

Typically speaking, I have two needs:

  1. be able to import activities easily from a file or folder
  2. be able to put some filters in place -- e.g. allow certain test-oriented activities to be co-located with standard activities to avoid a fragmented codebase.

Manual import is messy because

  1. it is error-prone, especially in the case of things like feature flags
  2. it makes for poor separation of concerns, since the worker needs to be explicitly aware of every piece of functionality it serves.

Personally I'd be just as happy with a method that provides for easy collection, e.g.

from temporalio import worker
import my_activities
from other_activities import ActivityClass

w = worker.Worker(
  ...,
  activities=worker.collect_activities(
    "path/to/activity/folder",
    my_activities,
    ActivityClass
  ), ...
)

together with some simple filters.
One that would be useful is a skip directive in the activity decorator, which causes collection to be skipped (but doesn't prevent it from being registered explicitly with a worker):

@activity.defn(skip=True)
async def my_activity():
    ...

This would facilitate e.g. test cases, dev/prod feature flags, and the like.

More flexible and comprehensive would be the ability to configure tags (this could easily be used in lieu of skip):

@activity.defn(tags=["dev", "e2e"])
async def prototyped_task():
    ...

together with filtering directives in the collect_activities function:

activities = worker.collect_activities(module, tags=["prod"])
activities = worker.collect_activities(module, exclude_tags=["test"])

Finally, some kind of name pattern filter might be nice (though once again this could generally be handled by tags):

activities = worker.collect_activities(module, regex=r".*_v[45]|database_.*")

My activities are usually bare functions so I don't have a real opinion on the best approach for activities that are owned by a class or class instance.

@charlesmelby
Copy link

charlesmelby commented Jun 5, 2025

As a side note, one important issue to address is being able to deal effectively with mocks for tests.
With my current wrappers I actually just collect the activities at definition time (optionally grouped by worker name), but also provide facilities to override them inside tests:

@flows.activity
async def real_activity():
  ...

@flows.activity(worker="other_worker")
async def other_activity():
  ...

async def real_activity_mock():
  ...

async def other_activity_mock():
  ...

@flows.testing.case
@flows.testing.provide(activities={"real_activity": other_activity_mock})
@pytest.mark.asyncio
async def test_with_mock_other_activity(flow_runner):
  await flow_runner.execute_workflow(my_workflow, ...)

# actually mainly used to mock activities handled by remote workers,
# but can be used with other workers in the same process
@flows.testing.case
@flows.testing.provide("other_worker", activities={"other_activity": other_activity_mock})
@pytest.mark.asyncio
async def test_with_mock_other_activity(flow_runner):
  await flow_runner.execute_workflow(multi_worker_workflow, ...)

With collection by a function as above configuring concise declarative tests might be more challenging.
Including an override directive could be helpful for this, e.g.

worker.collect_activities(module, overrides={"my_activity": my_activity_mock})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants