Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs_src/redis/stream/claiming_manual_ack.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"critical-tasks",
group="task-workers",
consumer="worker-failover",
min_idle_time=30000, # 30 seconds
min_idle_time=20000, # 20 seconds
),
ack_policy=AckPolicy.MANUAL,
)
Expand Down
35 changes: 22 additions & 13 deletions faststream/redis/testing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
from collections.abc import Iterator, Sequence
from contextlib import ExitStack, contextmanager
Expand Down Expand Up @@ -56,7 +57,6 @@ def _patch_producer(self, broker: RedisBroker) -> Iterator[None]:
broker.config.broker_config, FakeProducer(broker, broker.config)
),
)

for publisher in cast("list[LogicPublisher]", broker.publishers):
es.enter_context(
change_producer(publisher, FakeProducer(broker, publisher.config)),
Expand Down Expand Up @@ -148,12 +148,17 @@ async def publish(self, cmd: "RedisPublishCommand") -> int | bytes:
handler = cast("LogicSubscriber", handler)
for visitor in visitors:
if visited_ch := visitor.visit(**destination, sub=handler):
msg = visitor.get_message(
visited_ch,
body,
handler, # type: ignore[arg-type]
)

try:
msg = await asyncio.wait_for(
visitor.get_message(
visited_ch,
body,
handler, # type: ignore[arg-type]
),
timeout=30,
) # 30 secs is Pytest maximum
except TimeoutError:
continue
await self._execute_handler(msg, handler)

return 0
Expand All @@ -175,7 +180,7 @@ async def request(self, cmd: "RedisPublishCommand") -> "PubSubMessage":
handler = cast("LogicSubscriber", handler)
for visitor in visitors:
if visited_ch := visitor.visit(**destination, sub=handler):
msg = visitor.get_message(
msg = await visitor.get_message(
visited_ch,
body,
handler, # type: ignore[arg-type]
Expand Down Expand Up @@ -206,7 +211,7 @@ async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
casted_handler = cast("_ListHandlerMixin", handler)

if casted_handler.list_sub.batch:
msg = visitor.get_message(
msg = await visitor.get_message(
channel=cmd.destination,
body=data_to_send,
sub=casted_handler,
Expand Down Expand Up @@ -265,7 +270,9 @@ def visit(
sub: "LogicSubscriber",
) -> str | None: ...

def get_message(self, channel: str, body: Any, sub: "LogicSubscriber") -> Any: ...
async def get_message(
self, channel: str, body: Any, sub: "LogicSubscriber"
) -> Any: ...


class ChannelVisitor(Visitor):
Expand Down Expand Up @@ -295,7 +302,7 @@ def visit(

return None

def get_message( # type: ignore[override]
async def get_message( # type: ignore[override]
self,
channel: str,
body: Any,
Expand Down Expand Up @@ -326,7 +333,7 @@ def visit(

return None

def get_message( # type: ignore[override]
async def get_message( # type: ignore[override]
self,
channel: str,
body: Any,
Expand Down Expand Up @@ -363,12 +370,14 @@ def visit(

return None

def get_message( # type: ignore[override]
async def get_message( # type: ignore[override]
self,
channel: str,
body: Any,
sub: "_StreamHandlerMixin",
) -> Any:
if sub.stream_sub.min_idle_time:
await asyncio.sleep(sub.stream_sub.min_idle_time / 1000)
if sub.stream_sub.batch:
return BatchStreamMessage(
type="bstream",
Expand Down
Loading