Skip to content

Commit

Permalink
Merge pull request #148 from Raytar/master
Browse files Browse the repository at this point in the history
Fix Correctable streams
  • Loading branch information
meling authored Jun 29, 2021
2 parents a065aa7 + 009c0f5 commit 9811e4f
Show file tree
Hide file tree
Showing 24 changed files with 784 additions and 239 deletions.
2 changes: 1 addition & 1 deletion async.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c Configuration) AsyncCall(ctx context.Context, d QuorumCallData) *Async {
continue // don't send if no msg
}
}
n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan)
n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}}, replyChan, false)
}

fut := &Async{c: make(chan struct{}, 1)}
Expand Down
35 changes: 7 additions & 28 deletions benchmark/benchmark_gorums.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 39 additions & 27 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,38 @@ type response struct {
err error
}

type responseRouter struct {
c chan<- response
streaming bool
}

type channel struct {
sendQ chan request
nodeID uint32
mu sync.Mutex
lastError error
latency time.Duration
backoffCfg backoff.Config
rand *rand.Rand
gorumsClient ordering.GorumsClient
gorumsStream ordering.Gorums_NodeStreamClient
streamMut sync.RWMutex
streamBroken atomicFlag
parentCtx context.Context
streamCtx context.Context
cancelStream context.CancelFunc
responseRouter map[uint64]chan<- response
responseMut sync.Mutex
sendQ chan request
nodeID uint32
mu sync.Mutex
lastError error
latency time.Duration
backoffCfg backoff.Config
rand *rand.Rand
gorumsClient ordering.GorumsClient
gorumsStream ordering.Gorums_NodeStreamClient
streamMut sync.RWMutex
streamBroken atomicFlag
parentCtx context.Context
streamCtx context.Context
cancelStream context.CancelFunc
responseRouters map[uint64]responseRouter
responseMut sync.Mutex
}

func newChannel(n *Node) *channel {
return &channel{
sendQ: make(chan request, n.mgr.opts.sendBuffer),
backoffCfg: n.mgr.opts.backoff,
nodeID: n.ID(),
latency: -1 * time.Second,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
responseRouter: make(map[uint64]chan<- response),
sendQ: make(chan request, n.mgr.opts.sendBuffer),
backoffCfg: n.mgr.opts.backoff,
nodeID: n.ID(),
latency: -1 * time.Second,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
responseRouters: make(map[uint64]responseRouter),
}
}

Expand All @@ -75,21 +80,28 @@ func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error {
func (c *channel) routeResponse(msgID uint64, resp response) {
c.responseMut.Lock()
defer c.responseMut.Unlock()
if ch, ok := c.responseRouter[msgID]; ok {
ch <- resp
delete(c.responseRouter, msgID)
if router, ok := c.responseRouters[msgID]; ok {
router.c <- resp
// delete the router if we are only expecting a single message
if !router.streaming {
delete(c.responseRouters, msgID)
}
}
}

func (c *channel) enqueue(req request, responseChan chan<- response) {
func (c *channel) enqueue(req request, responseChan chan<- response, streaming bool) {
if responseChan != nil {
c.responseMut.Lock()
c.responseRouter[req.msg.Metadata.MessageID] = responseChan
c.responseRouters[req.msg.Metadata.MessageID] = responseRouter{responseChan, streaming}
c.responseMut.Unlock()
}
c.sendQ <- req
}

func (c *channel) deleteRouter(msgID uint64) {
delete(c.responseRouters, msgID)
}

func (c *channel) sendMsg(req request) (err error) {
// unblock the waiting caller unless noSendWaiting is enabled
defer func() {
Expand Down
Loading

0 comments on commit 9811e4f

Please sign in to comment.