Skip to content

Commit 04ef799

Browse files
authored
[Bug](fix) fix the broadcast cause result error (#47476)
shuffle may cause the `_last_local_channel_idx` set a error result Multi BE trigger the bug
1 parent ab79476 commit 04ef799

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

be/src/pipeline/exec/exchange_sink_operator.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
102102
fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1));
103103
}
104104
_wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);
105+
// do the shufffle make sure enough random
106+
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
107+
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
108+
std::random_device rd;
109+
std::mt19937 g(rd());
110+
shuffle(channels.begin(), channels.end(), g);
111+
}
105112

106113
size_t local_size = 0;
107114
for (int i = 0; i < channels.size(); ++i) {
@@ -200,12 +207,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
200207
_writer.reset(new Writer());
201208
auto& p = _parent->cast<ExchangeSinkOperatorX>();
202209

203-
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
204-
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
205-
std::random_device rd;
206-
std::mt19937 g(rd());
207-
shuffle(channels.begin(), channels.end(), g);
208-
}
209210
for (int i = 0; i < channels.size(); ++i) {
210211
RETURN_IF_ERROR(channels[i]->open(state));
211212
}

be/src/vec/sink/vdata_stream_sender.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,9 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t
280280
RETURN_IF_ERROR(serialize_block(dest, num_receivers));
281281
}
282282
*serialized = true;
283-
return Status::OK();
283+
} else {
284+
*serialized = false;
284285
}
285-
*serialized = false;
286286
return Status::OK();
287287
}
288288

0 commit comments

Comments
 (0)