Skip to content

Commit

Permalink
[native] Switch from old Task::create() API to the new ones
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Apr 30, 2024
1 parent 2611ae9 commit 16fe817
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
6 changes: 5 additions & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,11 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
// task which hasn't been destroyed yet, such as the task pool in query's
// root memory pool.
auto newExecTask = exec::Task::create(
taskId, planFragment, prestoTask->id.id(), std::move(queryCtx));
taskId,
planFragment,
prestoTask->id.id(),
std::move(queryCtx),
exec::Task::ExecutionMode::kParallel);
// TODO: move spill directory creation inside velox task execution
// whenever spilling is triggered. It will reduce the unnecessary file
// operations on remote storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ class BroadcastTest : public exec::test::OperatorTestBase {
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
core::PlanFragment planFragment{planNode};
return exec::Task::create(
taskId, std::move(planFragment), destination, std::move(queryCtx));
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
exec::Task::ExecutionMode::kParallel);
}

std::pair<RowTypePtr, std::vector<std::string>> executeBroadcastWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
executor_.get(), core::QueryConfig({}));
core::PlanFragment planFragment{planNode};
return exec::Task::create(
taskId, std::move(planFragment), destination, std::move(queryCtx));
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
exec::Task::ExecutionMode::kParallel);
}

RowVectorPtr deserialize(
Expand Down
15 changes: 12 additions & 3 deletions presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,12 @@ class TaskManagerTest : public testing::Test {
auto queryCtx =
taskManager_->getQueryContextManager()->findOrCreateQueryCtx(
taskId, {});
return exec::Task::create(taskId, planFragment, 0, std::move(queryCtx));
return exec::Task::create(
taskId,
planFragment,
0,
std::move(queryCtx),
exec::Task::ExecutionMode::kParallel);
}

std::unique_ptr<protocol::TaskInfo> createOrUpdateTask(
Expand Down Expand Up @@ -1340,8 +1345,12 @@ TEST_F(TaskManagerTest, testCumulativeMemory) {
.planFragment();
auto queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
const protocol::TaskId taskId = "scan.0.0.1.0";
auto veloxTask =
Task::create(taskId, std::move(planFragment), 0, std::move(queryCtx));
auto veloxTask = Task::create(
taskId,
std::move(planFragment),
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);

const uint64_t startTimeMs = velox::getCurrentTimeMs();
auto prestoTask = std::make_unique<PrestoTask>(taskId, "fakeId");
Expand Down

0 comments on commit 16fe817

Please sign in to comment.