Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in pubsub #966

Open
wants to merge 5 commits into
base: series/1.x
Choose a base branch
from

Conversation

arturaz
Copy link
Collaborator

@arturaz arturaz commented Feb 11, 2025

So... remember this: io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s) ?

After building lettuce with a patch to include command name in the timeout message it turned out that the timeouting command is SUBSCRIBE, which was weird. Until I looked at the code that does the subscription.

These seems to be buggy:

override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
Stream
.resource(Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel))
.evalTap(_ =>
FutureLift[F]
.lift(subConnection.async().subscribe(channel.underlying))
)
.flatMap(_.subscribe(500).unNone)

override def psubscribe(
pattern: RedisPattern[K]
): Stream[F, RedisPatternEvent[K, V]] =
Stream
.resource(Resource.eval(state.get) >>= PubSubInternals.pattern[F, K, V](state, subConnection).apply(pattern))
.evalTap(_ =>
FutureLift[F]
.lift(subConnection.async().psubscribe(pattern.underlying))
)
.flatMap(_.subscribe(500).unNone)

  • You can't do state.get from a Ref and expect that state to hold while you go on to perform side-effects.
  • Usage of .unNone is highly questionable, it just filters out None's, I guess this was intended to be unNoneTerminate.
  • The logic was generally hard to follow. There is one Topic shared between multiple streams, but each subscribe gets it's own redis listener and Dispatcher? Why?

So I rewrote the logic to:

  • Maintain separate AtomicCell state maps for channels and patterns.
  • Create a new subscription, topic and Dispatcher first time someone subscribes to channel or pattern.
  • In case someone subscribes to the same thing again the subscriber count is increased.
  • Subscriber count is decreased when the Stream is terminated.
  • unsubscribe finishes all Streams.
  • Cleanup is performed when the last stream terminates.

I also changed the return type of publish to match the type from Lettuce.

@yisraelU I would appreciate you double-checking the logic here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant