Skip to content

Commit

Permalink
feat: Configurable loading of event bus producer (#85)
Browse files Browse the repository at this point in the history
This defines an API for IDAs to use when producing events that allows the
choice of Event Bus implementation to be configured rather than written in
code. Implementations can document the appropriate setting for deployers
to use.

Caching the producer instance means that we automatically get a
long-lived instance without the implementation itself having to
perform caching itself.

This is part of #87
  • Loading branch information
timmc-edx authored Nov 22, 2022
1 parent e875dd8 commit 2959be7
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 4 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
----------

[3.1.0] - 2022-11-22
--------------------
Added
~~~~~
* Configurable loader for producer side of Event Bus in ``openedx_events.event_bus``.

[3.0.1] - 2022-10-31
--------------------
Fixed
Expand Down Expand Up @@ -65,7 +71,7 @@ Fixed
[0.11.0] - 2022-07-21
---------------------
Added
~~~~~~~
~~~~~
* Added new content_authoring module with new COURSE_CATALOG_INFO_CHANGED signal

[0.10.0] - 2022-05-20
Expand Down
2 changes: 1 addition & 1 deletion openedx_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
more information about the project.
"""

__version__ = "3.0.1"
__version__ = "3.1.0"
121 changes: 120 additions & 1 deletion openedx_events/event_bus/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,120 @@
"""Classes and utility functions for the event bus."""
"""
Classes and utility functions for the event bus.
This module includes the entry point for the producer.
API:
- ``get_producer`` returns an ``EventBusProducer`` singleton that should be used for sending all events
to the Event Bus. The backing implementation is chosen via the Django setting ``EVENT_BUS_PRODUCER``.
"""

import warnings
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import NoReturn, Optional

from django.conf import settings
from django.dispatch import receiver
from django.test.signals import setting_changed
from django.utils.module_loading import import_string

from openedx_events.tooling import OpenEdxPublicSignal


def _try_load(*, setting_name: str, expected_class: type, default):
"""
Load an instance of ``expected_class`` as indicated by ``setting_name``.
The setting points to a callable (function or class) that will fetch or create an
instance of the expected class. If the configuration is missing or invalid,
or the callable throws an exception or returns the wrong type, the default is
returned instead.
Arguments:
setting_name: Name of a Django setting containing a dotted module path, indicating a callable
expected_class: The callable must produce an instance of this class object (or a subclass)
default: Object to return if any part of the lookup or loading fails
"""
constructor_path = getattr(settings, setting_name, None)
if constructor_path is None:
warnings.warn(f"Event Bus setting {setting_name} is missing; component will be inactive")
return default

try:
constructor = import_string(constructor_path)
instance = constructor()
if isinstance(instance, expected_class):
return instance
else:
warnings.warn(
f"{constructor_path} from {setting_name} returned unexpected type {type(instance)}; "
"component will be inactive"
)
return default
except BaseException as e:
warnings.warn(
f"Failed to load {expected_class} from setting {setting_name}: {e!r}; "
"component will be inactive"
)
return default


class EventBusProducer(ABC):
"""
Parent class for event bus producer implementations.
"""

@abstractmethod
def send(
self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
) -> None:
"""
Send a signal event to the event bus under the specified topic.
Arguments:
signal: The original OpenEdxPublicSignal the event was sent to
topic: The event bus topic for the event (without any environmental prefix)
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
event_data: The event data (kwargs) sent to the signal
"""


class NoEventBusProducer(EventBusProducer):
"""
Stub implementation to "load" when no implementation is properly configured.
"""

def send(
self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
) -> None:
"""Do nothing."""


# .. setting_name: EVENT_BUS_PRODUCER
# .. setting_default: None
# .. setting_description: String naming a callable (function or class) that can be called to create
# or retrieve an instance of EventBusProducer when ``openedx_events.event_bus.get_producer`` is
# called. The format of the string is a dotted path to an attribute in a module, e.g.
# ``some.module.path.EventBusImplementation``. This producer will be managed as a singleton
# by openedx_events. If setting is not supplied or the callable raises an exception or does not return
# an instance of EventBusProducer, calls to the producer will be ignored with a warning at startup.

@lru_cache # will just be one cache entry, in practice
def get_producer() -> EventBusProducer:
"""
Create or retrieve the producer implementation, as configured.
If misconfigured, returns a fake implementation that can be called but does nothing.
"""
return _try_load(
setting_name='EVENT_BUS_PRODUCER',
expected_class=EventBusProducer, default=NoEventBusProducer(),
)


@receiver(setting_changed)
def _reset_state(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
get_producer.cache_clear()
112 changes: 112 additions & 0 deletions openedx_events/event_bus/tests/test_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Tests for event bus implementation loader.
"""

import warnings
from contextlib import contextmanager
from unittest import TestCase

from django.test import override_settings

from openedx_events.event_bus import _try_load, get_producer
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED


@contextmanager
def assert_warnings(warning_messages: list):
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always')
yield
assert [str(w.message) for w in caught_warnings] == warning_messages


class TestLoader(TestCase):

# No, the "constructors" here don't make much sense, but I didn't
# want to create a bunch of test classes/factory functions, so I'm
# using built-in functions instead.

def test_unconfigured(self):
with assert_warnings(["Event Bus setting DOES_NOT_EXIST is missing; component will be inactive"]):
loaded = _try_load(
setting_name="DOES_NOT_EXIST",
expected_class=dict, default={'def': 'ault'},
)
assert loaded == {'def': 'ault'}

@override_settings(EB_LOAD_PATH='builtins.dict')
def test_success(self):
with assert_warnings([]):
loaded = _try_load(
setting_name="EB_LOAD_PATH",
expected_class=dict, default={'def': 'ault'},
)
assert loaded == {}

@override_settings(EB_LOAD_PATH='builtins.list')
def test_wrong_type(self):
with assert_warnings([
"builtins.list from EB_LOAD_PATH returned unexpected type <class 'list'>; "
"component will be inactive"
]):
loaded = _try_load(
setting_name="EB_LOAD_PATH",
expected_class=dict, default={'def': 'ault'},
)
assert loaded == {'def': 'ault'}

@override_settings(EB_LOAD_PATH='no_module_here.foo.nope')
def test_missing_module(self):
with assert_warnings([
"Failed to load <class 'dict'> from setting EB_LOAD_PATH: "
"ModuleNotFoundError(\"No module named 'no_module_here'\"); "
"component will be inactive"
]):
loaded = _try_load(
setting_name="EB_LOAD_PATH",
expected_class=dict, default={'def': 'ault'},
)
assert loaded == {'def': 'ault'}

@override_settings(EB_LOAD_PATH='builtins.does_not_exist')
def test_missing_attribute(self):
with assert_warnings([
"Failed to load <class 'dict'> from setting EB_LOAD_PATH: "
"ImportError('Module \"builtins\" does not define a \"does_not_exist\" attribute/class'); "
"component will be inactive"
]):
loaded = _try_load(
setting_name="EB_LOAD_PATH",
expected_class=dict, default={'def': 'ault'},
)
assert loaded == {'def': 'ault'}

@override_settings(EB_LOAD_PATH='builtins.len')
def test_bad_args_for_callable(self):
with assert_warnings([
"Failed to load <class 'dict'> from setting EB_LOAD_PATH: "
"TypeError('len() takes exactly one argument (0 given)'); "
"component will be inactive"
]):
loaded = _try_load(
setting_name="EB_LOAD_PATH",
expected_class=dict, default={'def': 'ault'},
)
assert loaded == {'def': 'ault'}


class TestProducer(TestCase):

@override_settings(EVENT_BUS_PRODUCER=None)
def test_default_does_nothing(self):
"""
Test that the default is of the right class but does nothing.
"""
producer = get_producer()

with assert_warnings([]):
# Nothing thrown, no warnings.
assert producer.send(
signal=SESSION_LOGIN_COMPLETED, topic='user-logins',
event_key_field='user.id', event_data={},
) is None
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.0.0
current_version = 3.1.0
commit = True
tag = True

Expand Down

0 comments on commit 2959be7

Please sign in to comment.