Skip to content

Commit

Permalink
[presto_cpp] Transfer idle HTTPSession between IO threads in exchange…
Browse files Browse the repository at this point in the history
… source (#22903)

Summary:

Currently each endpoint (server) has only one `SessionPool`, which can only be attached to one `EventBase`.  If there is some skewness among the distribution (i.e. too many endpoints being attached to one event base), that event base would become bottleneck and cause regression in query wall time.

Fix this by creating one `SessionPool` for each endpoint and event base pair.  Allow transfer of idle session between different event bases by using `ServerIdleSessionController`.  This way we eliminate the bottleneck in event base while still keep the `HttpSession`s reusable.

After the fix, enabling connection pool should no longer cause any regression to wall time.  The SSL handshake cost (`EVP_DigestSignFinal`) is not visible in shuffle heavy queries when connection pool is enabled.

Reviewed By: xiaoxmeng, arhimondr

Differential Revision: D57842433
  • Loading branch information
Yuhta authored and xiaoxmeng committed Jun 5, 2024
1 parent 65224e8 commit ab3bda7
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ PeriodicServiceInventoryManager::PeriodicServiceInventoryManager(

void PeriodicServiceInventoryManager::start() {
eventBaseThread_.start(id_);
sessionPool_ = std::make_unique<proxygen::SessionPool>(nullptr, 10);
stopped_ = false;
auto* eventBase = eventBaseThread_.getEventBase();
eventBase->runOnDestruction([this] { sessionPool_.reset(); });
eventBase->schedule([this]() { return sendRequest(); });
}

void PeriodicServiceInventoryManager::stop() {
stopped_ = true;
client_.reset();
eventBaseThread_.stop();
}

Expand Down Expand Up @@ -75,7 +74,11 @@ void PeriodicServiceInventoryManager::sendRequest() {
std::swap(serviceAddress_, newAddress);
client_ = std::make_shared<http::HttpClient>(
eventBaseThread_.getEventBase(),
sessionPool_.get(),
nullptr,
proxygen::Endpoint(
serviceAddress_.getAddressStr(),
serviceAddress_.getPort(),
sslContext_ != nullptr),
serviceAddress_,
std::chrono::milliseconds(10'000),
std::chrono::milliseconds(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class PeriodicServiceInventoryManager {
const double backOffjitterParam_{0.1};

folly::EventBaseThread eventBaseThread_;
std::unique_ptr<proxygen::SessionPool> sessionPool_;
folly::SocketAddress serviceAddress_;
std::shared_ptr<http::HttpClient> client_;
std::atomic_bool stopped_{true};
Expand Down
35 changes: 27 additions & 8 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/http/HttpClient.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
#include "velox/common/base/PeriodicStatsReporter.h"
#include "velox/common/base/StatsReporter.h"
Expand Down Expand Up @@ -84,8 +85,9 @@ static constexpr size_t kTaskPeriodCleanOldTasks{60'000'000}; // 60 seconds.
static constexpr size_t kConnectorPeriodGlobalCounters{
60'000'000}; // 60 seconds.
static constexpr size_t kOsPeriodGlobalCounters{2'000'000}; // 2 seconds
// Every 1 minute we print endpoint latency counters.
static constexpr size_t kHttpEndpointLatencyPeriodGlobalCounters{
static constexpr size_t kHttpServerPeriodGlobalCounters{
60'000'000}; // 60 seconds.
static constexpr size_t kHttpClientPeriodGlobalCounters{
60'000'000}; // 60 seconds.

PeriodicTaskManager::PeriodicTaskManager(
Expand Down Expand Up @@ -137,9 +139,11 @@ void PeriodicTaskManager::start() {
addOperatingSystemStatsUpdateTask();

if (SystemConfig::instance()->enableHttpEndpointLatencyFilter()) {
addHttpEndpointLatencyStatsTask();
addHttpServerStatsTask();
}

addHttpClientStatsTask();

if (server_ && server_->hasCoordinatorDiscoverer()) {
numDriverThreads_ = server_->numDriverThreads();
addWatchdogTask();
Expand Down Expand Up @@ -406,7 +410,7 @@ void PeriodicTaskManager::addOperatingSystemStatsUpdateTask() {
"os_counters");
}

void PeriodicTaskManager::printHttpEndpointLatencyStats() {
void PeriodicTaskManager::printHttpServerStats() {
const auto latencyMetrics =
http::filters::HttpEndpointLatencyFilter::retrieveLatencies();
std::ostringstream oss;
Expand All @@ -418,11 +422,26 @@ void PeriodicTaskManager::printHttpEndpointLatencyStats() {
LOG(INFO) << oss.str();
}

void PeriodicTaskManager::addHttpEndpointLatencyStatsTask() {
void PeriodicTaskManager::addHttpServerStatsTask() {
addTask(
[this]() { printHttpServerStats(); },
kHttpServerPeriodGlobalCounters,
"http_server_stats");
}

void PeriodicTaskManager::updateHttpClientStats() {
const auto numConnectionsCreated = http::HttpClient::numConnectionsCreated();
RECORD_METRIC_VALUE(
kCounterHttpClientNumConnectionsCreated,
numConnectionsCreated - lastHttpClientNumConnectionsCreated_);
lastHttpClientNumConnectionsCreated_ = numConnectionsCreated;
}

void PeriodicTaskManager::addHttpClientStatsTask() {
addTask(
[this]() { printHttpEndpointLatencyStats(); },
kHttpEndpointLatencyPeriodGlobalCounters,
"http_endpoint_counters");
[this] { updateHttpClientStats(); },
kHttpClientPeriodGlobalCounters,
"http_client_stats");
}

void PeriodicTaskManager::addWatchdogTask() {
Expand Down
10 changes: 7 additions & 3 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ class PeriodicTaskManager {
void addOperatingSystemStatsUpdateTask();
void updateOperatingSystemStats();

// Adds task that periodically prints http endpoint latency metrics.
void addHttpEndpointLatencyStatsTask();
void printHttpEndpointLatencyStats();
void addHttpServerStatsTask();
void printHttpServerStats();

void addHttpClientStatsTask();
void updateHttpClientStats();

void addWatchdogTask();

Expand All @@ -141,6 +143,8 @@ class PeriodicTaskManager {
int64_t lastVoluntaryContextSwitches_{0};
int64_t lastForcedContextSwitches_{0};

int64_t lastHttpClientNumConnectionsCreated_{0};

// NOTE: declare last since the threads access other members of `this`.
folly::FunctionScheduler oneTimeRunner_;
folly::ThreadedRepeatingFunctionRunner repeatedRunner_;
Expand Down
66 changes: 10 additions & 56 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ PrestoExchangeSource::PrestoExchangeSource(
memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
http::HttpClientConnectionPool* connPool,
const proxygen::Endpoint& endpoint,
folly::SSLContextPtr sslContext)
: ExchangeSource(extractTaskId(baseUri.path()), destination, queue, pool),
basePath_(baseUri.path()),
Expand All @@ -102,7 +103,8 @@ PrestoExchangeSource::PrestoExchangeSource(
VELOX_CHECK_NOT_NULL(pool_);
httpClient_ = std::make_shared<http::HttpClient>(
ioEventBase,
sessionPool,
connPool,
endpoint,
address,
requestTimeoutMs,
connectTimeoutMs,
Expand Down Expand Up @@ -516,53 +518,6 @@ std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::getSelfPtr() {
return std::dynamic_pointer_cast<PrestoExchangeSource>(shared_from_this());
}

const ConnectionPool& ConnectionPools::get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor) {
return *pools_.withULockPtr([&](auto ulock) -> const ConnectionPool* {
auto it = ulock->find(endpoint);
if (it != ulock->end()) {
return it->second.get();
}
auto wlock = ulock.moveFromUpgradeToWrite();
auto& pool = (*wlock)[endpoint];
if (!pool) {
pool = std::make_unique<ConnectionPool>();
pool->eventBase = ioExecutor->getEventBase();
pool->sessionPool = std::make_unique<proxygen::SessionPool>(nullptr, 10);
// Creation of the timer is not thread safe, so we do it here instead of
// in the constructor of HttpClient.
pool->eventBase->timer();
}
return pool.get();
});
}

void ConnectionPools::destroy() {
pools_.withWLock([](auto& pools) {
for (auto& [_, pool] : pools) {
pool->eventBase->runInEventBaseThread(
[sessionPool = std::move(pool->sessionPool)] {});
}
pools.clear();
});
}

namespace {

std::pair<folly::EventBase*, proxygen::SessionPool*> getSessionPool(
ConnectionPools* connectionPools,
folly::IOThreadPoolExecutor* ioExecutor,
const proxygen::Endpoint& ep) {
if (!connectionPools) {
return {ioExecutor->getEventBase(), nullptr};
}
auto& connPool = connectionPools->get(ep, ioExecutor);
return {connPool.eventBase, connPool.sessionPool.get()};
}

} // namespace

// static
std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::create(
const std::string& url,
Expand All @@ -571,37 +526,36 @@ std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::create(
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools* connectionPools,
http::HttpClientConnectionPool* connPool,
folly::SSLContextPtr sslContext) {
folly::Uri uri(url);
auto* eventBase = ioExecutor->getEventBase();
if (uri.scheme() == "http") {
VELOX_CHECK_NULL(sslContext);
proxygen::Endpoint ep(uri.host(), uri.port(), false);
auto [eventBase, sessionPool] =
getSessionPool(connectionPools, ioExecutor, ep);
return std::make_shared<PrestoExchangeSource>(
uri,
destination,
queue,
memoryPool,
cpuExecutor,
eventBase,
sessionPool,
connPool,
ep,
sslContext);
}
if (uri.scheme() == "https") {
VELOX_CHECK_NOT_NULL(sslContext);
proxygen::Endpoint ep(uri.host(), uri.port(), true);
auto [eventBase, sessionPool] =
getSessionPool(connectionPools, ioExecutor, ep);
return std::make_shared<PrestoExchangeSource>(
uri,
destination,
queue,
memoryPool,
cpuExecutor,
eventBase,
sessionPool,
connPool,
ep,
std::move(sslContext));
}
return nullptr;
Expand Down
37 changes: 3 additions & 34 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,6 @@

namespace facebook::presto {

// HTTP connection pool for a specific endpoint with its associated event base.
// All the operations on the SessionPool must be performed on the corresponding
// EventBase.
struct ConnectionPool {
folly::EventBase* eventBase;
std::unique_ptr<proxygen::SessionPool> sessionPool;
};

// Connection pools used by HTTP client in PrestoExchangeSource. It should be
// held living longer than all the PrestoExchangeSources and will be passed when
// we creating the exchange sources.
class ConnectionPools {
public:
~ConnectionPools() {
destroy();
}

const ConnectionPool& get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor);

void destroy();

private:
folly::Synchronized<folly::F14FastMap<
proxygen::Endpoint,
std::unique_ptr<ConnectionPool>,
proxygen::EndpointHash,
proxygen::EndpointEqual>>
pools_;
};

class PrestoExchangeSource : public velox::exec::ExchangeSource {
public:
class RetryState {
Expand Down Expand Up @@ -107,7 +75,8 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
velox::memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
http::HttpClientConnectionPool* connPool,
const proxygen::Endpoint& endpoint,
folly::SSLContextPtr sslContext);

/// Returns 'true' is there is no request in progress, this source is not at
Expand Down Expand Up @@ -145,7 +114,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools* connectionPools,
http::HttpClientConnectionPool* connPool,
folly::SSLContextPtr sslContext);

/// Completes the future returned by 'request()' if it hasn't completed
Expand Down
13 changes: 8 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ void PrestoServer::run() {

if (systemConfig->exchangeEnableConnectionPool()) {
PRESTO_STARTUP_LOG(INFO) << "Enable exchange Http Client connection pool.";
exchangeSourceConnectionPools_ = std::make_unique<ConnectionPools>();
exchangeSourceConnectionPool_ =
std::make_unique<http::HttpClientConnectionPool>();
}

facebook::velox::exec::ExchangeSource::registerFactory(
Expand All @@ -401,7 +402,7 @@ void PrestoServer::run() {
pool,
driverExecutor_.get(),
exchangeHttpExecutor_.get(),
exchangeSourceConnectionPools_.get(),
exchangeSourceConnectionPool_.get(),
sslContext_);
});

Expand Down Expand Up @@ -540,7 +541,9 @@ void PrestoServer::run() {
<< "': threads: " << driverExecutor_->numActiveThreads() << "/"
<< driverExecutor_->numThreads()
<< ", task queue: " << driverExecutor_->getTaskQueueSize();
driverExecutor_->join();
// Schedule release of SessionPools held by HttpClients before the exchange
// HTTP executor threads are joined.
driverExecutor_.reset();

if (connectorIoExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
Expand All @@ -550,9 +553,9 @@ void PrestoServer::run() {
connectorIoExecutor_->join();
}

if (exchangeSourceConnectionPools_) {
if (exchangeSourceConnectionPool_) {
PRESTO_SHUTDOWN_LOG(INFO) << "Releasing exchange HTTP connection pools";
exchangeSourceConnectionPools_->destroy();
exchangeSourceConnectionPool_->destroy();
}

if (httpSrvCpuExecutor_ != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class PrestoServer {
// Executor for spilling.
std::shared_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;

std::unique_ptr<ConnectionPools> exchangeSourceConnectionPools_;
std::unique_ptr<http::HttpClientConnectionPool> exchangeSourceConnectionPool_;

// If not null, the instance of AsyncDataCache used for in-memory file cache.
std::shared_ptr<velox::cache::AsyncDataCache> cache_;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ SystemConfig::SystemConfig() {
STR_PROP(kExchangeMaxErrorDuration, "3m"),
STR_PROP(kExchangeRequestTimeout, "10s"),
STR_PROP(kExchangeConnectTimeout, "20s"),
BOOL_PROP(kExchangeEnableConnectionPool, false),
BOOL_PROP(kExchangeEnableConnectionPool, true),
BOOL_PROP(kExchangeImmediateBufferTransfer, true),
NUM_PROP(kTaskRunTimeSliceMicros, 50'000),
BOOL_PROP(kIncludeNodeInSpillPath, false),
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ void registerPrestoMetrics() {
95,
99,
100);
DEFINE_METRIC(
kCounterHttpClientNumConnectionsCreated, facebook::velox::StatType::SUM);
DEFINE_HISTOGRAM_METRIC(
kCounterPrestoExchangeSerializedPageSize,
10000,
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ constexpr folly::StringPiece kCounterHttpClientPrestoExchangeNumOnBody{
/// PrestoExchangeSource.
constexpr folly::StringPiece kCounterHttpClientPrestoExchangeOnBodyBytes{
"presto_cpp.http.client.presto_exchange_source.on_body_bytes"};
constexpr folly::StringPiece kCounterHttpClientNumConnectionsCreated{
"presto_cpp.http.client.num_connections_created"};
/// SerializedPage size in bytes from PrestoExchangeSource.
constexpr folly::StringPiece kCounterPrestoExchangeSerializedPageSize{
"presto_cpp.presto_exchange_source.serialized_page_size"};
Expand Down
Loading

0 comments on commit ab3bda7

Please sign in to comment.