@@ -780,20 +780,21 @@ proc push*[T](
780
780
## Push successful result to queue ``sq``.
781
781
mixin updateScore, updateStats, getStats
782
782
783
+ template findPosition(sq, sr: untyped ): SyncPosition =
784
+ sq.find(sr).valueOr:
785
+ debug "Request is no more relevant",
786
+ request = sr, sync_ident = sq.ident, topics = "syncman"
787
+ # Request is not in queue anymore, probably reset happened.
788
+ return
789
+
783
790
# This is backpressure handling algorithm, this algorithm is blocking
784
791
# all pending `push` requests if `request` is not in range .
785
792
var
786
793
position =
787
794
block:
788
795
var pos: SyncPosition
789
796
while true:
790
- pos = sq.find(sr).valueOr:
791
- debug "Request is no more relevant",
792
- request = sr,
793
- sync_ident = sq.ident,
794
- topics = "syncman"
795
- # Request is not in queue anymore, probably reset happened.
796
- return
797
+ pos = sq.findPosition(sr)
797
798
798
799
if pos.qindex == 0:
799
800
# Exiting loop when request is first in queue.
@@ -816,20 +817,18 @@ proc push*[T](
816
817
817
818
await sq.lock.acquire()
818
819
try:
819
- block:
820
- position = sq.find(sr).valueOr:
821
- # Queue has advanced, the request is no longer relevant.
822
- debug "Request is no more relevant",
823
- request = sr,
824
- sync_ident = sq.ident,
825
- topics = "syncman"
826
- return
820
+ position = sq.findPosition(sr)
827
821
828
822
if not(isNil(processingCb)):
829
823
processingCb()
830
824
831
825
let pres = await sq.process(sr, data, blobs, maybeFinalized)
832
826
827
+ # We need to update position, because while we waiting for `process()` to
828
+ # complete - clearAndWakeup() could be invoked which could clean whole the
829
+ # queue (invalidating all the positions).
830
+ position = sq.findPosition(sr)
831
+
833
832
case pres.code
834
833
of SyncProcessError.Empty:
835
834
# Empty responses does not affect failures count
0 commit comments