diff --git a/executor/child/handler.go b/executor/child/handler.go index 58fea56..e2ad690 100644 --- a/executor/child/handler.go +++ b/executor/child/handler.go @@ -48,13 +48,21 @@ func (ch *Child) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) // if has key, then process the messages if ch.host.HasKey() { - if len(ch.GetMsgQueue()) != 0 { + msgQueue := ch.GetMsgQueue() + + for i := 0; i < len(msgQueue); i += 5 { + end := i + 5 + if end > len(msgQueue) { + end = len(msgQueue) + } + ch.AppendProcessedMsgs(btypes.ProcessedMsgs{ - Msgs: ch.GetMsgQueue(), + Msgs: msgQueue[i:end], Timestamp: time.Now().UnixNano(), Save: true, }) } + msgKVs, err := ch.host.ProcessedMsgsToRawKV(ch.GetProcessedMsgs(), false) if err != nil { return err diff --git a/executor/host/handler.go b/executor/host/handler.go index e4e68b7..08f998f 100644 --- a/executor/host/handler.go +++ b/executor/host/handler.go @@ -27,9 +27,14 @@ func (h *Host) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) e h.Node().SyncInfoToRawKV(blockHeight), } if h.Node().HasBroadcaster() { - if len(msgQueue) != 0 { + for i := 0; i < len(msgQueue); i += 5 { + end := i + 5 + if end > len(msgQueue) { + end = len(msgQueue) + } + h.AppendProcessedMsgs(btypes.ProcessedMsgs{ - Msgs: msgQueue, + Msgs: msgQueue[i:end], Timestamp: time.Now().UnixNano(), Save: true, }) diff --git a/node/broadcaster/broadcaster.go b/node/broadcaster/broadcaster.go index 114aa93..48fc6da 100644 --- a/node/broadcaster/broadcaster.go +++ b/node/broadcaster/broadcaster.go @@ -187,11 +187,18 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time } if txInfo.Save { - b.pendingProcessedMsgs = append(b.pendingProcessedMsgs, btypes.ProcessedMsgs{ - Msgs: msgs, - Timestamp: time.Now().UnixNano(), - Save: txInfo.Save, - }) + for i := 0; i < len(msgs); i += 5 { + end := i + 5 + if end > len(msgs) { + end = len(msgs) + } + + b.pendingProcessedMsgs = append(b.pendingProcessedMsgs, btypes.ProcessedMsgs{ + Msgs: msgs[i:end], + Timestamp: time.Now().UnixNano(), + Save: true, + }) + } } b.logger.Debug("pending tx", zap.Int("index", i), zap.String("tx", txInfo.String()))