Skip to content

Commit

Permalink
add pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
JosuaKrause committed Apr 22, 2024
1 parent 5449843 commit 9b9550e
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 34 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [0.7.0] - 2024-04-21

### Added

- Partial support for pubsub channels ([#18])

## [0.6.0] - 2024-03-27

### Breaking
Expand Down Expand Up @@ -72,3 +78,4 @@
[#7]: https://github.com/JosuaKrause/redipy/pull/7
[#15]: https://github.com/JosuaKrause/redipy/pull/15
[#17]: https://github.com/JosuaKrause/redipy/pull/17
[#18]: https://github.com/JosuaKrause/redipy/pull/18
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,8 @@ that implement the same functionality, such as:

This [medium article][medium] explores some of the rationale behind the library.

### Warning

This library is still early in development and [not all Redis functions are
available yet][implemented]!
If you need certain functionality or found a bug, have a look at the
[contributing](#contributing) section.
It is easy to add Redis functions to the API.

## Installation<a id="installation"></a>
You can install `redipy` using pip:
Expand Down Expand Up @@ -401,8 +396,9 @@ class RStack:
## Limitations<a id="limitations"></a>
The current limitations of `redipy` are:

- Not all Redis commands are supported yet: This will eventually be resolved.
See [this issue to see the progress][implemented].
- Some Redis commands are not supported yet: This is likely due to redundant
functionality. For all other cases it will eventually be resolved.
Check [this issue to see the status of redis functions][implemented].
- The API differs slightly: Most notably stored values are always strings
(i.e., the bytes returned by Redis are decoded as utf-8).
- The semantic of Redis functions inside scripts has been altered to feel more
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "redipy"
description = "redipy is a uniform interface to Redis-like storage systems. It allows you to use the same Redis API with different backends that implement the same functionality."
readme = "README.md"
version = "0.6.0"
version = "0.7.0"
authors = [
{name = "Josua Krause", email = "[email protected]"},
]
Expand Down
46 changes: 44 additions & 2 deletions src/redipy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
RedisClientAPI."""
import contextlib
import datetime
from collections.abc import Iterable, Iterator
from typing import cast, get_args, Literal, overload
from collections.abc import Callable, Iterable, Iterator
from typing import cast, get_args, Literal, overload, TypeVar

from redipy.backend.backend import ExecFunction
from redipy.symbolic.seq import FnContext


T = TypeVar('T')


RSetMode = Literal[
"always",
"if_missing", # NX
Expand Down Expand Up @@ -1527,6 +1530,45 @@ def smembers(self, key: str) -> set[str]:
"""
raise NotImplementedError()

def publish(self, key: str, msg: str) -> None:
"""
Publishes a message on a pubsub channel.
See also the redis documentation: https://redis.io/commands/publish/
Args:
key (str): The pubsub key.
msg (str): The message.
"""
raise NotImplementedError()

def wait_for(
self,
key: str,
predicate: Callable[[], T],
timeout: float | None) -> T | None:
"""
Waits on a pubsub channel until a certain condition is met. This
ignores messages sent on the channel but instead checks the provided
condition once a message has been received. If the condition is
satisfied, its result is returned.
Args:
key (str): The pubsub key.
predicate (Callable[[], T]): If the result of this condition can
be converted to `True` via `bool` the condition is considered
satisfied and its result is returned.
timeout (float | None): If the predicate has not been fullfilled
within the time set by the timeout in seconds the function
returns None. If timeout is set to None the function will wait
indefinitely.
Returns:
T | None: The result of the condition is returned or None if the
function timed out.
"""
raise NotImplementedError()


class RedisClientAPI(RedisAPI):
"""This class enriches the redis API with pipeline and script
Expand Down
15 changes: 14 additions & 1 deletion src/redipy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import contextlib
import datetime
from collections.abc import Callable, Iterator
from typing import Literal, overload
from typing import Literal, overload, TypeVar

from redipy.api import (
KeyType,
Expand All @@ -35,6 +35,9 @@
from redipy.symbolic.seq import FnContext


T = TypeVar('T')


class Redis(RedisClientAPI):
"""
This class is a wrapper around different runtime backends. Use this class
Expand Down Expand Up @@ -455,3 +458,13 @@ def scard(self, key: str) -> int:

def smembers(self, key: str) -> set[str]:
return self._rt.smembers(key)

def publish(self, key: str, msg: str) -> None:
return self._rt.publish(key, msg)

def wait_for(
self,
key: str,
predicate: Callable[[], T],
timeout: float | None) -> T | None:
return self._rt.wait_for(key, predicate, timeout)
12 changes: 12 additions & 0 deletions src/redipy/memory/rt.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,18 @@ def smembers(self, key: str) -> set[str]:
with self.lock():
return self._sm.smembers(key)

def publish(self, key: str, msg: str) -> None:
with self.lock():
return self._sm.publish(key, msg)

def wait_for(
self,
key: str,
predicate: Callable[[], T],
timeout: float | None) -> T | None:
with self.lock():
return self._sm.wait_for(key, predicate, timeout)

def __str__(self) -> str:
return f"{self.__class__.__name__}[{self._sm.get_state()}]"

Expand Down
34 changes: 33 additions & 1 deletion src/redipy/memory/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import bisect
import collections
import itertools
import threading
import time
from collections.abc import Callable, Iterable
from datetime import datetime
Expand All @@ -34,7 +35,13 @@
RSM_EXISTS,
RSM_MISSING,
)
from redipy.util import convert_pattern, now, time_diff, to_number_str
from redipy.util import (
convert_pattern,
now,
reject_patterns,
time_diff,
to_number_str,
)


T = TypeVar('T')
Expand Down Expand Up @@ -1087,6 +1094,8 @@ def __init__(self, state: State) -> None:
super().__init__()
self._state = state
self._now_mono: tuple[float, datetime] | None = None
# FIXME: for now we implement pubsub in the machine
self._pubsub: dict[str, threading.Condition] = {}

def set_mono(self, now_mono: tuple[float, datetime] | None) -> None:
"""
Expand Down Expand Up @@ -1579,6 +1588,29 @@ def smembers(self, key: str) -> set[str]:
return set()
return set(obj)

def publish(self, key: str, msg: str) -> None:
# FIXME: we do not support patterns or messages yet
pubsub_key = reject_patterns(key)
listen = self._pubsub.get(pubsub_key)
if listen is None:
return
with listen:
listen.notify_all()

def wait_for(
self,
key: str,
predicate: Callable[[], T],
timeout: float | None) -> T | None:
# FIXME: we do not support patterns or messages yet
pubsub_key = reject_patterns(key)
listen = self._pubsub.get(pubsub_key)
if listen is None:
listen = threading.Condition()
self._pubsub[pubsub_key] = listen
with listen:
return listen.wait_for(predicate, timeout)

def __str__(self) -> str:
return (
f"{self.__class__.__name__}[state={self._state}]")
Expand Down
35 changes: 13 additions & 22 deletions src/redipy/redis/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,24 +729,7 @@ def wait_for(
self,
key: str,
predicate: Callable[[], T],
timeout: float = 30.0) -> T | None:
"""
Waits until the condition is met.
Args:
key (str): The key used for the pubsub channel.
predicate (Callable[[], T]): The condition. If the result can be
interpreted as True (via `bool`) the wait terminates and the
result will be returned.
timeout (float, optional): Maximum wait before giving up and
returning None. Defaults to 30.0.
Returns:
T | None: The result of the condition is returned if the wait was
successful. Otherwise None is returned.
"""
timeout: float | None) -> T | None:
res = predicate()
if bool(res):
return res
Expand All @@ -759,25 +742,33 @@ def wait_for(
res = predicate()
if bool(res):
return res
if time.monotonic() - start_time > timeout:
so_far = time.monotonic() - start_time
if timeout is not None and so_far > timeout:
return None
if timeout is None:
wait_time = None
else:
wait_time = max(0, timeout - so_far)
psub.get_message(
ignore_subscribe_messages=True,
timeout=timeout)
timeout=cast(float, wait_time))
while psub.get_message() is not None: # flushing queue
pass
finally:
psub.unsubscribe()

def publish(self, key: str, msg: str) -> None:
with self.get_connection() as conn:
conn.publish(self.get_pubsub_key(key), msg)

def notify_all(self, key: str) -> None:
"""
Notifies a pubsub channel.
Args:
key (str): The key used for the pubsub channel.
"""
with self.get_connection() as conn:
conn.publish(self.get_pubsub_key(key), "notify")
self.publish(key, "notify")

def ping(self) -> None:
"""
Expand Down
17 changes: 17 additions & 0 deletions src/redipy/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,23 @@ def normalize_values(res: Any) -> Any:
return res


def reject_patterns(key: str) -> str:
"""
Rejects a key if it is a pattern. This method should only be used for
partially implemented functions that would break if a user attempted to
use a pattern as input.
Args:
key (str): The key.
Returns:
str: The key.
"""
if "*" in key or "?" in key or "[" in key:
raise ValueError(f"{key=} must not be a pattern")
return key


def convert_pattern(pattern: str) -> tuple[str, re.Pattern]:
"""
Convert a redis pattern into a prefix and a regular expression.
Expand Down

0 comments on commit 9b9550e

Please sign in to comment.