Skip to content

Commit

Permalink
thread pool: simplify cleanup logic
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Aug 21, 2023
1 parent 9e0ad9d commit 285105c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 26 deletions.
4 changes: 1 addition & 3 deletions include/opendht/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ class OPENDHT_PUBLIC ThreadPool {

unsigned minThreads_;
const unsigned maxThreads_;
std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(1)};
std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(5)};
double threadDelayRatio_ {2};

std::vector<std::reference_wrapper<std::thread>> endedThreads_ {};
void threadEnded(std::thread&);
void cleanupThreads();
};

class OPENDHT_PUBLIC Executor : public std::enable_shared_from_this<Executor> {
Expand Down
35 changes: 12 additions & 23 deletions src/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ThreadPool::ThreadPool(unsigned minThreads, unsigned maxThreads)
{
threads_.reserve(maxThreads_);
if (minThreads_ != maxThreads_) {
threadDelayRatio_ = std::pow(2, 1.0 / (maxThreads_ - minThreads_));
threadDelayRatio_ = std::pow(3, 1.0 / (maxThreads_ - minThreads_));
}
}

Expand Down Expand Up @@ -122,38 +122,27 @@ void
ThreadPool::threadEnded(std::thread& thread)
{
std::lock_guard<std::mutex> l(lock_);
endedThreads_.emplace_back(thread);
tasks_.emplace([this]{
cleanupThreads();
tasks_.emplace([this,t=std::reference_wrapper<std::thread>(thread)]{
std::lock_guard<std::mutex> l(lock_);
for (auto it = threads_.begin(); it != threads_.end(); ++it) {
if (&*(*it) == &t.get()) {
t.get().join();
threads_.erase(it);
break;
}
}
});
// A thread expired, maybe after handling a one-time burst of tasks.
// If new threads start later, increase the expiration delay.
if (threadExpirationDelay > std::chrono::hours(168)) {
// If we reach 7 days, assume the thread pool is often used at max capacity
if (threadExpirationDelay > std::chrono::hours(24 * 7)) {
// If we reach 7 days, assume the thread is regularly used at full capacity
minThreads_ = std::min(minThreads_+1, maxThreads_);
} else {
threadExpirationDelay *= threadDelayRatio_;
}
cv_.notify_one();
}

void
ThreadPool::cleanupThreads()
{
std::lock_guard<std::mutex> l(lock_);
if (not endedThreads_.empty()) {
for (auto& thread : endedThreads_)
thread.get().join();
endedThreads_.clear();
for (auto it = threads_.begin(); it != threads_.end();) {
if (not (*it)->joinable())
it = threads_.erase(it);
else
++it;
}
}
}

void
ThreadPool::stop(bool wait)
{
Expand Down

0 comments on commit 285105c

Please sign in to comment.