diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cc9897..dc2a131 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [0.7.0] - 2024-04-21 + +### Added + +- Partial support for pubsub channels ([#18]) + ## [0.6.0] - 2024-03-27 ### Breaking @@ -72,3 +78,4 @@ [#7]: https://github.com/JosuaKrause/redipy/pull/7 [#15]: https://github.com/JosuaKrause/redipy/pull/15 [#17]: https://github.com/JosuaKrause/redipy/pull/17 +[#18]: https://github.com/JosuaKrause/redipy/pull/18 diff --git a/README.md b/README.md index 0365394..6942fbf 100644 --- a/README.md +++ b/README.md @@ -34,13 +34,8 @@ that implement the same functionality, such as: This [medium article][medium] explores some of the rationale behind the library. -### Warning - -This library is still early in development and [not all Redis functions are -available yet][implemented]! If you need certain functionality or found a bug, have a look at the [contributing](#contributing) section. -It is easy to add Redis functions to the API. ## Installation You can install `redipy` using pip: @@ -401,8 +396,9 @@ class RStack: ## Limitations The current limitations of `redipy` are: -- Not all Redis commands are supported yet: This will eventually be resolved. - See [this issue to see the progress][implemented]. +- Some Redis commands are not supported yet: This is likely due to redundant + functionality. For all other cases it will eventually be resolved. + Check [this issue to see the status of redis functions][implemented]. - The API differs slightly: Most notably stored values are always strings (i.e., the bytes returned by Redis are decoded as utf-8). - The semantic of Redis functions inside scripts has been altered to feel more diff --git a/pyproject.toml b/pyproject.toml index 4201e42..21f61b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "redipy" description = "redipy is a uniform interface to Redis-like storage systems. It allows you to use the same Redis API with different backends that implement the same functionality." readme = "README.md" - version = "0.6.0" + version = "0.7.0" authors = [ {name = "Josua Krause", email = "josua.krause@gmail.com"}, ] diff --git a/src/redipy/api.py b/src/redipy/api.py index 1b056d2..dc4a7b9 100644 --- a/src/redipy/api.py +++ b/src/redipy/api.py @@ -16,13 +16,16 @@ RedisClientAPI.""" import contextlib import datetime -from collections.abc import Iterable, Iterator -from typing import cast, get_args, Literal, overload +from collections.abc import Callable, Iterable, Iterator +from typing import cast, get_args, Literal, overload, TypeVar from redipy.backend.backend import ExecFunction from redipy.symbolic.seq import FnContext +T = TypeVar('T') + + RSetMode = Literal[ "always", "if_missing", # NX @@ -1527,6 +1530,45 @@ def smembers(self, key: str) -> set[str]: """ raise NotImplementedError() + def publish(self, key: str, msg: str) -> None: + """ + Publishes a message on a pubsub channel. + + See also the redis documentation: https://redis.io/commands/publish/ + + Args: + key (str): The pubsub key. + msg (str): The message. + """ + raise NotImplementedError() + + def wait_for( + self, + key: str, + predicate: Callable[[], T], + timeout: float | None) -> T | None: + """ + Waits on a pubsub channel until a certain condition is met. This + ignores messages sent on the channel but instead checks the provided + condition once a message has been received. If the condition is + satisfied, its result is returned. + + Args: + key (str): The pubsub key. + predicate (Callable[[], T]): If the result of this condition can + be converted to `True` via `bool` the condition is considered + satisfied and its result is returned. + timeout (float | None): If the predicate has not been fullfilled + within the time set by the timeout in seconds the function + returns None. If timeout is set to None the function will wait + indefinitely. + + Returns: + T | None: The result of the condition is returned or None if the + function timed out. + """ + raise NotImplementedError() + class RedisClientAPI(RedisAPI): """This class enriches the redis API with pipeline and script diff --git a/src/redipy/main.py b/src/redipy/main.py index 66e0298..541a4a1 100644 --- a/src/redipy/main.py +++ b/src/redipy/main.py @@ -16,7 +16,7 @@ import contextlib import datetime from collections.abc import Callable, Iterator -from typing import Literal, overload +from typing import Literal, overload, TypeVar from redipy.api import ( KeyType, @@ -35,6 +35,9 @@ from redipy.symbolic.seq import FnContext +T = TypeVar('T') + + class Redis(RedisClientAPI): """ This class is a wrapper around different runtime backends. Use this class @@ -455,3 +458,13 @@ def scard(self, key: str) -> int: def smembers(self, key: str) -> set[str]: return self._rt.smembers(key) + + def publish(self, key: str, msg: str) -> None: + return self._rt.publish(key, msg) + + def wait_for( + self, + key: str, + predicate: Callable[[], T], + timeout: float | None) -> T | None: + return self._rt.wait_for(key, predicate, timeout) diff --git a/src/redipy/memory/rt.py b/src/redipy/memory/rt.py index 53ecc25..a205111 100644 --- a/src/redipy/memory/rt.py +++ b/src/redipy/memory/rt.py @@ -488,6 +488,18 @@ def smembers(self, key: str) -> set[str]: with self.lock(): return self._sm.smembers(key) + def publish(self, key: str, msg: str) -> None: + with self.lock(): + return self._sm.publish(key, msg) + + def wait_for( + self, + key: str, + predicate: Callable[[], T], + timeout: float | None) -> T | None: + with self.lock(): + return self._sm.wait_for(key, predicate, timeout) + def __str__(self) -> str: return f"{self.__class__.__name__}[{self._sm.get_state()}]" diff --git a/src/redipy/memory/state.py b/src/redipy/memory/state.py index b010dd8..ed0f35c 100644 --- a/src/redipy/memory/state.py +++ b/src/redipy/memory/state.py @@ -15,6 +15,7 @@ import bisect import collections import itertools +import threading import time from collections.abc import Callable, Iterable from datetime import datetime @@ -34,7 +35,13 @@ RSM_EXISTS, RSM_MISSING, ) -from redipy.util import convert_pattern, now, time_diff, to_number_str +from redipy.util import ( + convert_pattern, + now, + reject_patterns, + time_diff, + to_number_str, +) T = TypeVar('T') @@ -1087,6 +1094,8 @@ def __init__(self, state: State) -> None: super().__init__() self._state = state self._now_mono: tuple[float, datetime] | None = None + # FIXME: for now we implement pubsub in the machine + self._pubsub: dict[str, threading.Condition] = {} def set_mono(self, now_mono: tuple[float, datetime] | None) -> None: """ @@ -1579,6 +1588,29 @@ def smembers(self, key: str) -> set[str]: return set() return set(obj) + def publish(self, key: str, msg: str) -> None: + # FIXME: we do not support patterns or messages yet + pubsub_key = reject_patterns(key) + listen = self._pubsub.get(pubsub_key) + if listen is None: + return + with listen: + listen.notify_all() + + def wait_for( + self, + key: str, + predicate: Callable[[], T], + timeout: float | None) -> T | None: + # FIXME: we do not support patterns or messages yet + pubsub_key = reject_patterns(key) + listen = self._pubsub.get(pubsub_key) + if listen is None: + listen = threading.Condition() + self._pubsub[pubsub_key] = listen + with listen: + return listen.wait_for(predicate, timeout) + def __str__(self) -> str: return ( f"{self.__class__.__name__}[state={self._state}]") diff --git a/src/redipy/redis/conn.py b/src/redipy/redis/conn.py index 6fb326c..f80fffd 100644 --- a/src/redipy/redis/conn.py +++ b/src/redipy/redis/conn.py @@ -729,24 +729,7 @@ def wait_for( self, key: str, predicate: Callable[[], T], - timeout: float = 30.0) -> T | None: - """ - Waits until the condition is met. - - Args: - key (str): The key used for the pubsub channel. - - predicate (Callable[[], T]): The condition. If the result can be - interpreted as True (via `bool`) the wait terminates and the - result will be returned. - - timeout (float, optional): Maximum wait before giving up and - returning None. Defaults to 30.0. - - Returns: - T | None: The result of the condition is returned if the wait was - successful. Otherwise None is returned. - """ + timeout: float | None) -> T | None: res = predicate() if bool(res): return res @@ -759,16 +742,25 @@ def wait_for( res = predicate() if bool(res): return res - if time.monotonic() - start_time > timeout: + so_far = time.monotonic() - start_time + if timeout is not None and so_far > timeout: return None + if timeout is None: + wait_time = None + else: + wait_time = max(0, timeout - so_far) psub.get_message( ignore_subscribe_messages=True, - timeout=timeout) + timeout=cast(float, wait_time)) while psub.get_message() is not None: # flushing queue pass finally: psub.unsubscribe() + def publish(self, key: str, msg: str) -> None: + with self.get_connection() as conn: + conn.publish(self.get_pubsub_key(key), msg) + def notify_all(self, key: str) -> None: """ Notifies a pubsub channel. @@ -776,8 +768,7 @@ def notify_all(self, key: str) -> None: Args: key (str): The key used for the pubsub channel. """ - with self.get_connection() as conn: - conn.publish(self.get_pubsub_key(key), "notify") + self.publish(key, "notify") def ping(self) -> None: """ diff --git a/src/redipy/util.py b/src/redipy/util.py index d0c3dbf..d2bb8b5 100644 --- a/src/redipy/util.py +++ b/src/redipy/util.py @@ -1076,6 +1076,23 @@ def normalize_values(res: Any) -> Any: return res +def reject_patterns(key: str) -> str: + """ + Rejects a key if it is a pattern. This method should only be used for + partially implemented functions that would break if a user attempted to + use a pattern as input. + + Args: + key (str): The key. + + Returns: + str: The key. + """ + if "*" in key or "?" in key or "[" in key: + raise ValueError(f"{key=} must not be a pattern") + return key + + def convert_pattern(pattern: str) -> tuple[str, re.Pattern]: """ Convert a redis pattern into a prefix and a regular expression.