Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Thread Pooling as a Threading Policy #100

Merged
merged 13 commits into from
May 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
cmake_minimum_required(VERSION 3.5)
project(amazon_kinesis_producer_internal)
project(amazon_kinesis_producer)

set(THIRD_PARTY_LIB_DIR "${amazon_kinesis_producer_internal_SOURCE_DIR}/third_party/lib")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y")
if(CMAKE_COMPILER_IS_GNUCXX)

if (CMAKE_COMPILER_IS_GNUCXX)
set(ADDL_LINK_CONFIG "-static-libstdc++")
endif(CMAKE_COMPILER_IS_GNUCXX)
add_compile_options("-fpermissive")
endif ()

IF(CMAKE_BUILD_TYPE MATCHES DEBUG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
if (CMAKE_COMPILER_IS_GNUCXX)
set(ADDL_LINK_CONFIG "${ADDL_LINK_CONFIG} -static-libasan")
endif (CMAKE_COMPILER_IS_GNUCXX)
ENDIF(CMAKE_BUILD_TYPE MATCHES DEBUG)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y")
set(CMAKE_EXE_LINKER_FLAGS "-L${THIRD_PARTY_LIB_DIR} ${ADDL_LINK_CONFIG}")

set(SOURCE_FILES
Expand Down
46 changes: 45 additions & 1 deletion aws/kinesis/core/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,19 @@ class Configuration : private boost::noncopyable {
return verify_certificate_;
}

/// Indicates whether the SDK clients should use a thread pool or not
/// \return true if the client should use a thread pool, false otherwise
bool use_thread_pool() const noexcept {
return use_thread_pool_;
}

/// The maximum number of threads that a thread pool should be limited to.
/// Threads are created eagerly. This is only relevant if \see use_thread_pool() is true
/// \return the mamximum number of threads that the thread pool should consume
uint32_t thread_pool_size() const noexcept {
return thread_pool_size_;
}

// Enable aggregation. With aggregation, multiple user records are packed
// into a single KinesisRecord. If disabled, each user record is sent in its
// own KinesisRecord.
Expand Down Expand Up @@ -924,6 +937,27 @@ class Configuration : private boost::noncopyable {
return *this;
}

/// Enables or disable the use of a thread pool for the SDK Client.
/// Default: false
/// \param val whether or not to use a thread pool
/// \return This configuration
Configuration& use_thread_pool(bool val) {
use_thread_pool_ = val;
return *this;
}

/// The maximum number of threads the thread pool will be allowed to use.
/// This is only useful if \see use_thread_pool is set to true
/// The threads for the thread pool are allocated eagerly.
/// \param val the maximum number of threads that the thread pool will use
/// \return This configuration
Configuration& thread_pool_size(uint32_t val) {
if (val > 0) {
thread_pool_size_ = val;
}
return *this;
}


const std::vector<std::tuple<std::string, std::string, std::string>>&
additional_metrics_dims() {
Expand Down Expand Up @@ -968,13 +1002,19 @@ class Configuration : private boost::noncopyable {
region(c.region());
request_timeout(c.request_timeout());
verify_certificate(c.verify_certificate());
if (c.thread_config() == ::aws::kinesis::protobuf::Configuration_ThreadConfig::Configuration_ThreadConfig_POOLED) {
use_thread_pool(true);
thread_pool_size(c.thread_pool_size());
} else if (c.thread_config() == ::aws::kinesis::protobuf::Configuration_ThreadConfig_PER_REQUEST) {
use_thread_pool(false);
}

for (auto i = 0; i < c.additional_metric_dims_size(); i++) {
auto ad = c.additional_metric_dims(i);
additional_metrics_dims_.push_back(
std::make_tuple(ad.key(), ad.value(), ad.granularity()));
}

}

private:
Expand Down Expand Up @@ -1004,6 +1044,10 @@ class Configuration : private boost::noncopyable {
uint64_t request_timeout_ = 6000;
bool verify_certificate_ = true;

bool use_thread_pool_ = true;
uint32_t thread_pool_size_ = 64;


std::vector<std::tuple<std::string, std::string, std::string>>
additional_metrics_dims_;
};
Expand Down
37 changes: 34 additions & 3 deletions aws/kinesis/core/kinesis_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#include <aws/core/http/Scheme.h>
#include <aws/kinesis/core/kinesis_producer.h>

#include <system_error>
#include <aws/core/utils/threading/Executor.h>

namespace {

struct EndpointConfiguration {
Expand All @@ -37,6 +40,7 @@ const constexpr char* kVersion = "0.12.0";
const std::unordered_map< std::string, EndpointConfiguration > kRegionEndpointOverride = {
{ "cn-north-1", { "kinesis.cn-north-1.amazonaws.com.cn", "monitoring.cn-north-1.amazonaws.com.cn" } }
};
const constexpr uint32_t kDefaultThreadPoolSize = 64;

void set_override_if_present(std::string& region, Aws::Client::ClientConfiguration& cfg, std::string service, std::function<std::string(EndpointConfiguration)> extractor) {
auto region_override = kRegionEndpointOverride.find(region);
Expand Down Expand Up @@ -78,6 +82,16 @@ std::string user_agent() {
return ua;
}

template<typename T>
T cast_size_t(std::size_t value) {
if (value > std::numeric_limits<T>::max()) {
throw std::system_error(std::make_error_code(std::errc::result_out_of_range));
}
return static_cast<T>(value);
}

std::shared_ptr<Aws::Utils::Threading::Executor> sdk_client_executor;

Aws::Client::ClientConfiguration
make_sdk_client_cfg(const aws::kinesis::core::Configuration& kpl_cfg,
const std::string& region,
Expand All @@ -86,10 +100,27 @@ make_sdk_client_cfg(const aws::kinesis::core::Configuration& kpl_cfg,
cfg.userAgent = user_agent();
LOG(info) << "Using Region: " << region;
cfg.region = region;
cfg.maxConnections = kpl_cfg.max_connections();
cfg.requestTimeoutMs = kpl_cfg.request_timeout();
cfg.connectTimeoutMs = kpl_cfg.connect_timeout();
cfg.maxConnections = cast_size_t<unsigned>(kpl_cfg.max_connections());
cfg.requestTimeoutMs = cast_size_t<long>(kpl_cfg.request_timeout());
cfg.connectTimeoutMs = cast_size_t<long>(kpl_cfg.connect_timeout());
cfg.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(0, 0);
if (kpl_cfg.use_thread_pool()) {
if (sdk_client_executor == nullptr) {
uint32_t thread_pool_size = kpl_cfg.thread_pool_size();
//
// TODO: Add rlimit check to see if the configured thread pool size is greater than RLIMIT_NPROC, and report a warning.
//
if (thread_pool_size == 0) {
thread_pool_size = kDefaultThreadPoolSize;
}
LOG(info) << "Using pooled threading model with " << thread_pool_size << " threads.";
sdk_client_executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(thread_pool_size);
}
} else {
LOG(info) << "Using per request threading model.";
sdk_client_executor = std::make_shared<Aws::Utils::Threading::DefaultExecutor>();
}
cfg.executor = sdk_client_executor;
cfg.verifySSL = kpl_cfg.verify_certificate();
cfg.caPath = ca_path;
return cfg;
Expand Down
Loading