@@ -634,7 +634,8 @@ cdef class ReadBuffer:
634
634
self ._finish_message()
635
635
return mem
636
636
637
- cdef redirect_messages(self , WriteBuffer buf, char mtype):
637
+ cdef redirect_messages(self , WriteBuffer buf, char mtype,
638
+ int stop_at = 0 ):
638
639
if not self ._current_message_ready:
639
640
raise exceptions.BufferError(
640
641
' consume_full_messages called on a buffer without a '
@@ -669,7 +670,10 @@ cdef class ReadBuffer:
669
670
else :
670
671
return
671
672
672
- # Fast path: exhaust buf0 as efficiently as possible.
673
+ if stop_at and buf._length >= stop_at:
674
+ return
675
+
676
+ # Fast path: exhaust buf0 as efficiently as possible.
673
677
if self ._pos0 + 5 <= self ._len0:
674
678
cbuf = cpython.PyBytes_AS_STRING(self ._buf0)
675
679
new_pos0 = self ._pos0
@@ -682,14 +686,17 @@ cdef class ReadBuffer:
682
686
if (cbuf + new_pos0)[0 ] != mtype:
683
687
done = 1
684
688
break
689
+ if (stop_at and
690
+ (buf._length + new_pos0 - self ._pos0) > stop_at):
691
+ done = 1
692
+ break
685
693
msg_len = hton.unpack_int32(cbuf + new_pos0 + 1 ) + 1
686
694
if new_pos0 + msg_len > cbuf_len:
687
695
break
688
696
new_pos0 += msg_len
689
697
690
698
if new_pos0 != self ._pos0:
691
- if PG_DEBUG:
692
- assert self ._pos0 < new_pos0 <= self ._len0
699
+ assert self ._pos0 < new_pos0 <= self ._len0
693
700
694
701
pos_delta = new_pos0 - self ._pos0
695
702
buf.write_cstr(
@@ -699,8 +706,7 @@ cdef class ReadBuffer:
699
706
self ._pos0 = new_pos0
700
707
self ._length -= pos_delta
701
708
702
- if PG_DEBUG:
703
- assert self ._length >= 0
709
+ assert self ._length >= 0
704
710
705
711
if done:
706
712
# The next message is of a different type.
0 commit comments