Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish and subscribe system optimizations #13

Open
eloylp opened this issue Sep 26, 2022 · 1 comment
Open

Publish and subscribe system optimizations #13

eloylp opened this issue Sep 26, 2022 · 1 comment
Labels
enhancement New feature or request

Comments

@eloylp
Copy link
Owner

eloylp commented Sep 26, 2022

Context

This library provides basic and experimental pub/sub features. In order to send the messages among the different subscribers per topic, the code maintains a map of maps, with a global lock:

type pubSubEngine struct {
	csMap map[string]map[*conn.Slot]message.Sender
	L     *sync.RWMutex
}
//...

The first level map, holds the topics name as key, having as value the second level map, which holds as a key the pointer address of the connection slot, and the related sender as a value.

In reality, senders contains the underlying connection slot, but having the connection slot pointer address as key its very handy, as its ideal for quick removal of connections on each topic. We could not use the messsage.Sender pointer as key, as this address could change over time.

Problem

We must take into account, that the publish method is a hot path of the program. We could expect high concurrency there:

  • The current global lock will lock all the structure each time a message its published, for any of the topics.
  • The current "map inside map" strategy was very convenient for the POC, as we can eliminate entries very easily each time a disconnection from a client happens. But, we are iterating a map, which means we need to search across the entire key space of the hashmap.

Goals

  • Evaluate the if this optimisations makes sense, by taking into account the status of the project and uses. Taking into account the publish method is going to be called much more times than the connection removal, probably we should make improvements at the cost of complexity.
  • Solve the problem of the global lock.
  • Solve the problem of the iteration over a map.
@eloylp eloylp added the enhancement New feature or request label Sep 26, 2022
@eloylp
Copy link
Owner Author

eloylp commented Sep 26, 2022

We discussed with @mmontes11 about this subject. The first question in the air was ... is it necessary ? Well, maybe for a reduced connection set would be fast enough. Maybe, it would be positive to just put on hold the development of this issue, until we need to improve performance numbers.

However, we did some bench tests in order to quantify the possible improvements and better decide. If we were iterating over a slice instead of a map, it looks like we could improve by 8x the iteration speed, as we would be iterating a contiguous space of memory of the slice backing array. Here are the results of the tests:

    pu: 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz
    BenchmarkIter/map
    BenchmarkIter/map-8                477     2567071 ns/op         0 B/op         0 allocs/op
    BenchmarkIter/slice
    BenchmarkIter/slice-8             3307      333725 ns/op         0 B/op         0 allocs/op
    PASS

Here are the tests for the record:

    func BenchmarkIterateMap(b *testing.B) {
      senders := map[string]map[*conn.Slot]message.Sender{}
      senders["topic.a"] = map[*conn.Slot]message.Sender{}
    
      for i := 0; i < 250000; i++ {
        ds := newDumbSender()
        senders["topic.a"][ds.ConnSlot()] = ds
      }
      msg := message.New().SetPayload(&protos.MessageV1{})
    
      b.ReportAllocs()
      b.ResetTimer()
    
      for i := 0; i < b.N; i++ {
        for _, sender := range senders["topic.a"] {
          _, err := sender.Send(msg)
          if err != nil {
            panic(err)
          }
        }
      }
    }
    
    func BenchmarkIterateSlice(b *testing.B) {
      senders := make([]message.Sender, 250000)
      for i, _ := range senders {
        senders[i] = newDumbSender()
      }
      msg := message.New().SetPayload(&protos.MessageV1{})
    
      b.ReportAllocs()
      b.ResetTimer()
    
      for i := 0; i < b.N; i++ {
        for i := 0; i < len(senders); i++ {
          _, err := senders[i].Send(msg)
          if err != nil {
            panic(err)
          }
        }
      }
    }

One possible solution would be to replace the second level map with a pre allocated slice. Such slice should be completely managed by the library (subscribe, remove, compaction).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant