diff --git a/CMakeLists.txt b/CMakeLists.txt index c4e44e66..2c70039e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,8 @@ set(CPPKAFKA_VERSION "${CPPKAFKA_VERSION_MAJOR}.${CPPKAFKA_VERSION_MINOR}.${CPPK set(RDKAFKA_MIN_VERSION "0.9.4") set(RDKAFKA_MIN_VERSION_HEX 0x00090400) +option(USE_CPP17 "Use C++17 features instead of Boost (still, if you don't have boost you can't build examples" OFF) + if (NOT CMAKE_CXX_FLAGS) # Set default compile flags for the project if(MSVC) @@ -28,10 +30,17 @@ if (NOT CMAKE_CXX_FLAGS) add_definitions("-DNOGDI=1") add_definitions("-DNOMINMAX=1") else() - set(CMAKE_CXX_FLAGS "-std=c++11 -Wall") + set(CMAKE_CXX_FLAGS "-Wall") endif() endif() +if(USE_CPP17) + set(CMAKE_CXX_STANDARD 17) + add_definitions("-D_USE_CPP17") +else() + set(CMAKE_CXX_STANDARD 11) +endif() + # Set output directories set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) @@ -102,21 +111,30 @@ if (NOT CPPKAFKA_PKGCONFIG_DIR) set(CPPKAFKA_PKGCONFIG_DIR share/pkgconfig) endif() -# Look for Boost (just need boost.optional headers here) -find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET}) - -if (Boost_FOUND) - find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET}) - set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) - set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) - include_directories(${Boost_INCLUDE_DIRS}) - link_directories(${Boost_LIBRARY_DIRS}) - if (CPPKAFKA_CMAKE_VERBOSE) - message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}") - message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}") - message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}") - message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}") - message(STATUS "Boost libraries: ${Boost_LIBRARIES}") +if(NOT USE_CPP17 OR NOT CPPKAFKA_DISABLE_EXAMPLES OR NOT CPPKAFKA_DISABLE_TESTS) + # Look for Boost (just need boost.optional headers here) + find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET}) + + if (Boost_FOUND) + find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET}) + set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) + set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) + include_directories(${Boost_INCLUDE_DIRS}) + link_directories(${Boost_LIBRARY_DIRS}) + if (CPPKAFKA_CMAKE_VERBOSE) + message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}") + message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}") + message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}") + message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}") + message(STATUS "Boost libraries: ${Boost_LIBRARIES}") + endif() + endif() + + # Examples target + if (Boost_PROGRAM_OPTIONS_FOUND) + add_subdirectory(examples) + else() + message(STATUS "Disabling examples") endif() endif() @@ -139,12 +157,7 @@ endif() add_subdirectory(src) add_subdirectory(include/cppkafka) -# Examples target -if (NOT CPPKAFKA_DISABLE_EXAMPLES AND Boost_PROGRAM_OPTIONS_FOUND) - add_subdirectory(examples) -else() - message(STATUS "Disabling examples") -endif() + # Add a target to generate API documentation using Doxygen find_package(Doxygen ${FIND_PACKAGE_QUIET}) @@ -186,4 +199,4 @@ if(NOT TARGET uninstall) # Add uninstall target add_custom_target(uninstall COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/cmake_uninstall.cmake) -endif() +endif() \ No newline at end of file diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index c97f5a83..0015abbc 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -35,7 +35,7 @@ #include #include #include -#include +#include "utils/optional.h" #include #include "topic_partition_list.h" #include "topic_configuration.h" @@ -226,12 +226,12 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { /** * Gets the default topic configuration */ - const boost::optional& get_default_topic_configuration() const; + const optional& get_default_topic_configuration() const; /** * Gets the default topic configuration */ - boost::optional& get_default_topic_configuration(); + optional& get_default_topic_configuration(); private: using HandlePtr = ClonablePtr; @@ -240,7 +240,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { static HandlePtr make_handle(rd_kafka_conf_t* ptr); HandlePtr handle_; - boost::optional default_topic_config_; + optional default_topic_config_; DeliveryReportCallback delivery_report_callback_; OffsetCommitCallback offset_commit_callback_; ErrorCallback error_callback_; diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 86ac366d..450b4a70 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -57,11 +57,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index db5484ef..8168f436 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -34,7 +34,7 @@ #include #include #include -#include +#include "utils/optional.h" #include #include "buffer.h" #include "macros.h" @@ -189,7 +189,7 @@ class CPPKAFKA_API Message { * * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. */ - boost::optional get_timestamp() const; + optional get_timestamp() const; #if RD_KAFKA_VERSION >= RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION /** diff --git a/include/cppkafka/utils/any.h b/include/cppkafka/utils/any.h new file mode 100644 index 00000000..0750ba16 --- /dev/null +++ b/include/cppkafka/utils/any.h @@ -0,0 +1,14 @@ + #ifdef _USE_CPP17 + #include + #else + #include + #endif + +namespace cppkafka +{ + #ifdef _USE_CPP17 + using any = std::any; + #else + using any = boost::any; + #endif +} \ No newline at end of file diff --git a/include/cppkafka/utils/compacted_topic_processor.h b/include/cppkafka/utils/compacted_topic_processor.h index 166dcfe8..38f95628 100644 --- a/include/cppkafka/utils/compacted_topic_processor.h +++ b/include/cppkafka/utils/compacted_topic_processor.h @@ -34,7 +34,7 @@ #include #include #include -#include +#include "optional.h" #include "../buffer.h" #include "../consumer.h" #include "../macros.h" @@ -106,8 +106,8 @@ class CPPKAFKA_API CompactedTopicEvent { EventType type_; std::string topic_; int partition_; - boost::optional key_; - boost::optional value_; + optional key_; + optional value_; }; template @@ -121,12 +121,12 @@ class CPPKAFKA_API CompactedTopicProcessor { /** * Callback used for decoding key objects */ - using KeyDecoder = std::function(const Buffer&)>; + using KeyDecoder = std::function(const Buffer&)>; /** * Callback used for decoding value objects */ - using ValueDecoder = std::function(const Key& key, const Buffer&)>; + using ValueDecoder = std::function(const Key& key, const Buffer&)>; /** * Callback used for event handling @@ -276,10 +276,10 @@ void CompactedTopicProcessor::process_event() { Message message = consumer_.poll(); if (message) { if (!message.get_error()) { - boost::optional key = key_decoder_(message.get_key()); + optional key = key_decoder_(message.get_key()); if (key) { if (message.get_payload()) { - boost::optional value = value_decoder_(*key, message.get_payload()); + optional value = value_decoder_(*key, message.get_payload()); if (value) { // If there's a payload and we managed to parse the value, generate a // SET_ELEMENT event diff --git a/include/cppkafka/utils/optional.h b/include/cppkafka/utils/optional.h new file mode 100644 index 00000000..d8709e8e --- /dev/null +++ b/include/cppkafka/utils/optional.h @@ -0,0 +1,14 @@ +#ifdef _USE_CPP17 +#include +#else +#include +#endif + +namespace cppkafka +{ +#ifdef _USE_CPP17 + using std::optional; +#else + using boost::optional +#endif +} \ No newline at end of file diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index e8d49287..cc590344 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -31,7 +31,7 @@ #define CPPKAFKA_POLL_STRATEGY_BASE_H #include -#include +#include "any.h" #include "../queue.h" #include "../topic_partition_list.h" #include "poll_interface.h" @@ -45,7 +45,7 @@ namespace cppkafka { */ struct QueueData { Queue queue; - boost::any metadata; + any metadata; }; /** diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5b8649b5..49d5bc1c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -47,7 +47,12 @@ set_target_properties(${TARGET_NAME} PROPERTIES VERSION ${CPPKAFKA_VERSION} SOVERSION ${CPPKAFKA_VERSION}) # In CMake >= 3.15 Boost::boost == Boost::headers -target_link_libraries(${TARGET_NAME} PUBLIC RdKafka::rdkafka Boost::boost) +target_link_libraries(${TARGET_NAME} PUBLIC RdKafka::rdkafka) + +if(NOT USE_CPP17 OR NOT CPPKAFKA_DISABLE_EXAMPLES OR NOT CPPKAFKA_DISABLE_TESTS) + target_link_libraries(${TARGET_NAME} PUBLIC Boost::boost) +endif() + if (WIN32) # On windows ntohs and related are in ws2_32 target_link_libraries(${TARGET_NAME} PUBLIC ws2_32.lib) diff --git a/src/configuration.cpp b/src/configuration.cpp index 5a59c517..62f8cf1c 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -41,7 +41,6 @@ using std::move; using std::vector; using std::initializer_list; using std::chrono::milliseconds; -using boost::optional; namespace cppkafka { diff --git a/src/message.cpp b/src/message.cpp index 103bae87..a49f61ae 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -84,7 +84,7 @@ Message& Message::load_internal() { return *this; } -boost::optional Message::get_timestamp() const { +optional Message::get_timestamp() const { rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { diff --git a/src/producer.cpp b/src/producer.cpp index af138d04..6ab6ae18 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -143,7 +143,11 @@ void Producer::do_produce(const Message& message, const Buffer& payload = message.get_payload(); const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); +#ifdef _USE_CPP17 + int64_t duration = message.get_timestamp() ? message.get_timestamp().value().get_timestamp().count() : 0; +#else int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; +#endif auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_PARTITION(message.get_partition()), diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp index cafa625e..27b43356 100644 --- a/src/utils/backoff_performer.cpp +++ b/src/utils/backoff_performer.cpp @@ -27,6 +27,10 @@ * */ +#ifdef _WIN32 +#define NOMINMAX +#endif + #include #include #include "utils/backoff_performer.h" diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 15f75d8d..c295c00b 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -36,7 +36,7 @@ namespace cppkafka { PollStrategyBase::PollStrategyBase(Consumer& consumer) : consumer_(consumer), - consumer_queue_(QueueData{consumer.get_consumer_queue(), boost::any()}) { + consumer_queue_(QueueData{consumer.get_consumer_queue(), any()}) { // get all currently active partition assignments TopicPartitionList assignment = consumer_.get_assignment(); on_assignment(assignment); @@ -93,7 +93,7 @@ void PollStrategyBase::assign(TopicPartitionList& partitions) { // populate partition queues for (const auto& partition : partitions) { // get the queue associated with this partition - partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()}); + partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), any()}); } reset_state(); }