Skip to content

Commit a69cc94

Browse files
authored
Fix/limit max msgs (#36)
* limit max msgs * add it at load pending tx too
1 parent 3e5c697 commit a69cc94

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

executor/child/handler.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,21 @@ func (ch *Child) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs)
4848

4949
// if has key, then process the messages
5050
if ch.host.HasKey() {
51-
if len(ch.GetMsgQueue()) != 0 {
51+
msgQueue := ch.GetMsgQueue()
52+
53+
for i := 0; i < len(msgQueue); i += 5 {
54+
end := i + 5
55+
if end > len(msgQueue) {
56+
end = len(msgQueue)
57+
}
58+
5259
ch.AppendProcessedMsgs(btypes.ProcessedMsgs{
53-
Msgs: ch.GetMsgQueue(),
60+
Msgs: msgQueue[i:end],
5461
Timestamp: time.Now().UnixNano(),
5562
Save: true,
5663
})
5764
}
65+
5866
msgKVs, err := ch.host.ProcessedMsgsToRawKV(ch.GetProcessedMsgs(), false)
5967
if err != nil {
6068
return err

executor/host/handler.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,14 @@ func (h *Host) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) e
2727
h.Node().SyncInfoToRawKV(blockHeight),
2828
}
2929
if h.Node().HasBroadcaster() {
30-
if len(msgQueue) != 0 {
30+
for i := 0; i < len(msgQueue); i += 5 {
31+
end := i + 5
32+
if end > len(msgQueue) {
33+
end = len(msgQueue)
34+
}
35+
3136
h.AppendProcessedMsgs(btypes.ProcessedMsgs{
32-
Msgs: msgQueue,
37+
Msgs: msgQueue[i:end],
3338
Timestamp: time.Now().UnixNano(),
3439
Save: true,
3540
})

node/broadcaster/broadcaster.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,18 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time
187187
}
188188

189189
if txInfo.Save {
190-
b.pendingProcessedMsgs = append(b.pendingProcessedMsgs, btypes.ProcessedMsgs{
191-
Msgs: msgs,
192-
Timestamp: time.Now().UnixNano(),
193-
Save: txInfo.Save,
194-
})
190+
for i := 0; i < len(msgs); i += 5 {
191+
end := i + 5
192+
if end > len(msgs) {
193+
end = len(msgs)
194+
}
195+
196+
b.pendingProcessedMsgs = append(b.pendingProcessedMsgs, btypes.ProcessedMsgs{
197+
Msgs: msgs[i:end],
198+
Timestamp: time.Now().UnixNano(),
199+
Save: true,
200+
})
201+
}
195202
}
196203

197204
b.logger.Debug("pending tx", zap.Int("index", i), zap.String("tx", txInfo.String()))

0 commit comments

Comments
 (0)