diff --git a/include/opendht/thread_pool.h b/include/opendht/thread_pool.h index b5b2d6f1a..fe80b7f2d 100644 --- a/include/opendht/thread_pool.h +++ b/include/opendht/thread_pool.h @@ -74,12 +74,10 @@ class OPENDHT_PUBLIC ThreadPool { unsigned minThreads_; const unsigned maxThreads_; - std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(1)}; + std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(5)}; double threadDelayRatio_ {2}; - std::vector> endedThreads_ {}; void threadEnded(std::thread&); - void cleanupThreads(); }; class OPENDHT_PUBLIC Executor : public std::enable_shared_from_this { diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 81cbf5882..d53c4e83a 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -49,7 +49,7 @@ ThreadPool::ThreadPool(unsigned minThreads, unsigned maxThreads) { threads_.reserve(maxThreads_); if (minThreads_ != maxThreads_) { - threadDelayRatio_ = std::pow(2, 1.0 / (maxThreads_ - minThreads_)); + threadDelayRatio_ = std::pow(3, 1.0 / (maxThreads_ - minThreads_)); } } @@ -122,14 +122,20 @@ void ThreadPool::threadEnded(std::thread& thread) { std::lock_guard l(lock_); - endedThreads_.emplace_back(thread); - tasks_.emplace([this]{ - cleanupThreads(); + tasks_.emplace([this,t=std::reference_wrapper(thread)]{ + std::lock_guard l(lock_); + for (auto it = threads_.begin(); it != threads_.end(); ++it) { + if (&*(*it) == &t.get()) { + t.get().join(); + threads_.erase(it); + break; + } + } }); // A thread expired, maybe after handling a one-time burst of tasks. // If new threads start later, increase the expiration delay. - if (threadExpirationDelay > std::chrono::hours(168)) { - // If we reach 7 days, assume the thread pool is often used at max capacity + if (threadExpirationDelay > std::chrono::hours(24 * 7)) { + // If we reach 7 days, assume the thread is regularly used at full capacity minThreads_ = std::min(minThreads_+1, maxThreads_); } else { threadExpirationDelay *= threadDelayRatio_; @@ -137,23 +143,6 @@ ThreadPool::threadEnded(std::thread& thread) cv_.notify_one(); } -void -ThreadPool::cleanupThreads() -{ - std::lock_guard l(lock_); - if (not endedThreads_.empty()) { - for (auto& thread : endedThreads_) - thread.get().join(); - endedThreads_.clear(); - for (auto it = threads_.begin(); it != threads_.end();) { - if (not (*it)->joinable()) - it = threads_.erase(it); - else - ++it; - } - } -} - void ThreadPool::stop(bool wait) {