Skip to content

Commit

Permalink
comments unstable customPlanNodeWithExchangeClient (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhejiangxiaomai authored May 10, 2023
1 parent 95edde0 commit ad21576
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1091,41 +1091,42 @@ class TestCustomExchangeTranslator : public exec::Operator::PlanNodeTranslator {
}
};

TEST_F(MultiFragmentTest, customPlanNodeWithExchangeClient) {
setupSources(5, 100);
Operator::registerOperator(std::make_unique<TestCustomExchangeTranslator>());
auto leafTaskId = makeTaskId("leaf", 0);
auto leafPlan =
PlanBuilder().values(vectors_).partitionedOutput({}, 1).planNode();
auto leafTask = makeTask(leafTaskId, leafPlan, 0);
Task::start(leafTask, 1);

CursorParameters params;
core::PlanNodeId testNodeId;
params.maxDrivers = 1;
params.planNode =
PlanBuilder()
.addNode([&leafPlan](std::string id, core::PlanNodePtr /* input */) {
return std::make_shared<TestCustomExchangeNode>(
id, leafPlan->outputType());
})
.capturePlanNodeId(testNodeId)
.planNode();

auto cursor = std::make_unique<TaskCursor>(params);
auto task = cursor->task();
addRemoteSplits(task, {leafTaskId});
while (cursor->moveNext()) {
}
EXPECT_NE(
toPlanStats(task->taskStats())
.at(testNodeId)
.customStats.count("testCustomExchangeStat"),
0);
ASSERT_TRUE(waitForTaskCompletion(leafTask.get(), 3'000'000))
<< leafTask->taskId();
ASSERT_TRUE(waitForTaskCompletion(task.get(), 3'000'000)) << task->taskId();
}
// TEST_F(MultiFragmentTest, customPlanNodeWithExchangeClient) {
// setupSources(5, 100);
// Operator::registerOperator(std::make_unique<TestCustomExchangeTranslator>());
// auto leafTaskId = makeTaskId("leaf", 0);
// auto leafPlan =
// PlanBuilder().values(vectors_).partitionedOutput({}, 1).planNode();
// auto leafTask = makeTask(leafTaskId, leafPlan, 0);
// Task::start(leafTask, 1);

// CursorParameters params;
// core::PlanNodeId testNodeId;
// params.maxDrivers = 1;
// params.planNode =
// PlanBuilder()
// .addNode([&leafPlan](std::string id, core::PlanNodePtr /* input */)
// {
// return std::make_shared<TestCustomExchangeNode>(
// id, leafPlan->outputType());
// })
// .capturePlanNodeId(testNodeId)
// .planNode();

// auto cursor = std::make_unique<TaskCursor>(params);
// auto task = cursor->task();
// addRemoteSplits(task, {leafTaskId});
// while (cursor->moveNext()) {
// }
// EXPECT_NE(
// toPlanStats(task->taskStats())
// .at(testNodeId)
// .customStats.count("testCustomExchangeStat"),
// 0);
// ASSERT_TRUE(waitForTaskCompletion(leafTask.get(), 3'000'000))
// << leafTask->taskId();
// ASSERT_TRUE(waitForTaskCompletion(task.get(), 3'000'000)) << task->taskId();
//}

// This test is to reproduce the race condition between task terminate and no
// more split call:
Expand Down

0 comments on commit ad21576

Please sign in to comment.