Skip to content

Commit

Permalink
[native] Add functionality to cancel abandoned tasks.
Browse files Browse the repository at this point in the history
Request cancellation for tasks which haven't been
accessed by coordinator for a considerable time.
  • Loading branch information
spershin committed Apr 30, 2024
1 parent b62e37f commit affd83e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
25 changes: 25 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ namespace facebook::presto {
constexpr uint32_t kMaxConcurrentLifespans{16};

namespace {
// We request cancellation for tasks which haven't been accessed by coordinator
// for a considerable time.
void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) {
for (const auto& [id, prestoTask] : taskMap) {
if (prestoTask->task != nullptr) {
if (prestoTask->task->isRunning()) {
if (prestoTask->timeSinceLastHeartbeatMs() >= abandonedMs) {
LOG(INFO) << "Cancelling abandoned task '" << id << "'.";
prestoTask->task->requestCancel();
}
}
}
}
}

// If spilling is enabled and the given Task can spill, then this helper
// generates the spilling directory path for the Task, and sets the path to it
// in the Task.
Expand Down Expand Up @@ -693,6 +708,8 @@ size_t TaskManager::cleanOldTasks() {
taskIdsToClean.emplace(id);
}
}

cancelAbandonedTasksInternal(taskMap, oldTaskCleanUpMs_);
}

const auto elapsedMs = (getCurrentTimeMs() - startTimeMs);
Expand Down Expand Up @@ -730,6 +747,13 @@ size_t TaskManager::cleanOldTasks() {
return taskIdsToClean.size();
}

void TaskManager::cancelAbandonedTasks() {
// We copy task map locally to avoid locking task map for a potentially long
// time. We also lock for 'read'.
const TaskMap taskMap = *(taskMap_.rlock());
cancelAbandonedTasksInternal(taskMap, oldTaskCleanUpMs_);
}

folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
const TaskId& taskId,
bool summarize,
Expand Down Expand Up @@ -1119,6 +1143,7 @@ void TaskManager::shutdown() {
<< " seconds so far) for 'Running' tasks to complete. " << numTasks
<< " tasks left: " << PrestoTask::taskNumbersToString(taskNumbers);
std::this_thread::sleep_for(std::chrono::seconds(1));
cancelAbandonedTasks();
taskNumbers = getTaskNumbers(numTasks);
++seconds;
}
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class TaskManager {
"concurrent_lifespans_per_task"};
static constexpr folly::StringPiece kSessionTimezone{"session_timezone"};

// We request cancellation for tasks which haven't been accessed by
// coordinator for a considerable time.
void cancelAbandonedTasks();

std::unique_ptr<protocol::TaskInfo> createOrUpdateTask(
const protocol::TaskId& taskId,
const velox::core::PlanFragment& planFragment,
Expand Down

0 comments on commit affd83e

Please sign in to comment.