Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cpp17 and no need for Boost if you disable examples. #284

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
59 changes: 36 additions & 23 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ set(CPPKAFKA_VERSION "${CPPKAFKA_VERSION_MAJOR}.${CPPKAFKA_VERSION_MINOR}.${CPPK
set(RDKAFKA_MIN_VERSION "0.9.4")
set(RDKAFKA_MIN_VERSION_HEX 0x00090400)

option(USE_CPP17 "Use C++17 features instead of Boost (still, if you don't have boost you can't build examples" OFF)

if (NOT CMAKE_CXX_FLAGS)
# Set default compile flags for the project
if(MSVC)
Expand All @@ -28,10 +30,17 @@ if (NOT CMAKE_CXX_FLAGS)
add_definitions("-DNOGDI=1")
add_definitions("-DNOMINMAX=1")
else()
set(CMAKE_CXX_FLAGS "-std=c++11 -Wall")
set(CMAKE_CXX_FLAGS "-Wall")
endif()
endif()

if(USE_CPP17)
set(CMAKE_CXX_STANDARD 17)
add_definitions("-D_USE_CPP17")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is enough. There needs to be a record somewhere (e.g. in a generated header file) that stores this and code should include that file to figure out this flag.

If you don't do that, you can have the library being compiled with this flag on but an application that uses the library not knowing (for obvious reasons) that they need to set this macro manually, which will break badly because from one's perspective you're using std types and from the other's boost ones.

else()
set(CMAKE_CXX_STANDARD 11)
endif()

# Set output directories
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib)
Expand Down Expand Up @@ -102,21 +111,30 @@ if (NOT CPPKAFKA_PKGCONFIG_DIR)
set(CPPKAFKA_PKGCONFIG_DIR share/pkgconfig)
endif()

# Look for Boost (just need boost.optional headers here)
find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET})

if (Boost_FOUND)
find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET})
set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS})
set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED})
include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS})
if (CPPKAFKA_CMAKE_VERBOSE)
message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}")
message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}")
message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}")
message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}")
message(STATUS "Boost libraries: ${Boost_LIBRARIES}")
if(NOT USE_CPP17 OR NOT CPPKAFKA_DISABLE_EXAMPLES OR NOT CPPKAFKA_DISABLE_TESTS)
# Look for Boost (just need boost.optional headers here)
find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET})

if (Boost_FOUND)
find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET})
set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS})
set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED})
include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS})
if (CPPKAFKA_CMAKE_VERBOSE)
message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}")
message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}")
message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}")
message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}")
message(STATUS "Boost libraries: ${Boost_LIBRARIES}")
endif()
endif()

# Examples target
if (Boost_PROGRAM_OPTIONS_FOUND)
add_subdirectory(examples)
else()
message(STATUS "Disabling examples")
endif()
endif()

Expand All @@ -139,12 +157,7 @@ endif()
add_subdirectory(src)
add_subdirectory(include/cppkafka)

# Examples target
if (NOT CPPKAFKA_DISABLE_EXAMPLES AND Boost_PROGRAM_OPTIONS_FOUND)
add_subdirectory(examples)
else()
message(STATUS "Disabling examples")
endif()


# Add a target to generate API documentation using Doxygen
find_package(Doxygen ${FIND_PACKAGE_QUIET})
Expand Down Expand Up @@ -186,4 +199,4 @@ if(NOT TARGET uninstall)
# Add uninstall target
add_custom_target(uninstall
COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/cmake_uninstall.cmake)
endif()
endif()
8 changes: 4 additions & 4 deletions include/cppkafka/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include <functional>
#include <initializer_list>
#include <chrono>
#include <boost/optional.hpp>
#include "utils/optional.h"
#include <librdkafka/rdkafka.h>
#include "topic_partition_list.h"
#include "topic_configuration.h"
Expand Down Expand Up @@ -226,12 +226,12 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
/**
* Gets the default topic configuration
*/
const boost::optional<TopicConfiguration>& get_default_topic_configuration() const;
const optional<TopicConfiguration>& get_default_topic_configuration() const;

/**
* Gets the default topic configuration
*/
boost::optional<TopicConfiguration>& get_default_topic_configuration();
optional<TopicConfiguration>& get_default_topic_configuration();
private:
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
decltype(&rd_kafka_conf_dup)>;
Expand All @@ -240,7 +240,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
static HandlePtr make_handle(rd_kafka_conf_t* ptr);

HandlePtr handle_;
boost::optional<TopicConfiguration> default_topic_config_;
optional<TopicConfiguration> default_topic_config_;
DeliveryReportCallback delivery_report_callback_;
OffsetCommitCallback offset_commit_callback_;
ErrorCallback error_callback_;
Expand Down
2 changes: 2 additions & 0 deletions include/cppkafka/cppkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@
#include <cppkafka/topic_configuration.h>
#include <cppkafka/topic_partition.h>
#include <cppkafka/topic_partition_list.h>
#include <cppkafka/utils/any.h>
#include <cppkafka/utils/backoff_committer.h>
#include <cppkafka/utils/backoff_performer.h>
#include <cppkafka/utils/buffered_producer.h>
#include <cppkafka/utils/compacted_topic_processor.h>
#include <cppkafka/utils/consumer_dispatcher.h>
#include <cppkafka/utils/optional.h>
#include <cppkafka/utils/poll_interface.h>
#include <cppkafka/utils/poll_strategy_base.h>
#include <cppkafka/utils/roundrobin_poll_strategy.h>
Expand Down
4 changes: 2 additions & 2 deletions include/cppkafka/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include <cstdint>
#include <chrono>
#include <cassert>
#include <boost/optional.hpp>
#include "utils/optional.h"
#include <librdkafka/rdkafka.h>
#include "buffer.h"
#include "macros.h"
Expand Down Expand Up @@ -189,7 +189,7 @@ class CPPKAFKA_API Message {
*
* If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned.
*/
boost::optional<MessageTimestamp> get_timestamp() const;
optional<MessageTimestamp> get_timestamp() const;

#if RD_KAFKA_VERSION >= RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION
/**
Expand Down
14 changes: 14 additions & 0 deletions include/cppkafka/utils/any.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifdef _USE_CPP17
#include<any>
#else
#include <boost/any.hpp>
#endif

namespace cppkafka
{
#ifdef _USE_CPP17
using any = std::any;
#else
using any = boost::any;
#endif
}
14 changes: 7 additions & 7 deletions include/cppkafka/utils/compacted_topic_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include <string>
#include <map>
#include <set>
#include <boost/optional.hpp>
#include "optional.h"
#include "../buffer.h"
#include "../consumer.h"
#include "../macros.h"
Expand Down Expand Up @@ -106,8 +106,8 @@ class CPPKAFKA_API CompactedTopicEvent {
EventType type_;
std::string topic_;
int partition_;
boost::optional<Key> key_;
boost::optional<Value> value_;
optional<Key> key_;
optional<Value> value_;
};

template <typename Key, typename Value>
Expand All @@ -121,12 +121,12 @@ class CPPKAFKA_API CompactedTopicProcessor {
/**
* Callback used for decoding key objects
*/
using KeyDecoder = std::function<boost::optional<Key>(const Buffer&)>;
using KeyDecoder = std::function<optional<Key>(const Buffer&)>;

/**
* Callback used for decoding value objects
*/
using ValueDecoder = std::function<boost::optional<Value>(const Key& key, const Buffer&)>;
using ValueDecoder = std::function<optional<Value>(const Key& key, const Buffer&)>;

/**
* Callback used for event handling
Expand Down Expand Up @@ -276,10 +276,10 @@ void CompactedTopicProcessor<Key, Value>::process_event() {
Message message = consumer_.poll();
if (message) {
if (!message.get_error()) {
boost::optional<Key> key = key_decoder_(message.get_key());
optional<Key> key = key_decoder_(message.get_key());
if (key) {
if (message.get_payload()) {
boost::optional<Value> value = value_decoder_(*key, message.get_payload());
optional<Value> value = value_decoder_(*key, message.get_payload());
if (value) {
// If there's a payload and we managed to parse the value, generate a
// SET_ELEMENT event
Expand Down
14 changes: 14 additions & 0 deletions include/cppkafka/utils/optional.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifdef _USE_CPP17
#include<optional>
#else
#include <boost/optional.hpp>
#endif

namespace cppkafka
{
#ifdef _USE_CPP17
using std::optional;
#else
using boost::optional
#endif
}
4 changes: 2 additions & 2 deletions include/cppkafka/utils/poll_strategy_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#define CPPKAFKA_POLL_STRATEGY_BASE_H

#include <map>
#include <boost/any.hpp>
#include "any.h"
#include "../queue.h"
#include "../topic_partition_list.h"
#include "poll_interface.h"
Expand All @@ -45,7 +45,7 @@ namespace cppkafka {
*/
struct QueueData {
Queue queue;
boost::any metadata;
any metadata;
};

/**
Expand Down
7 changes: 6 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ set_target_properties(${TARGET_NAME} PROPERTIES
VERSION ${CPPKAFKA_VERSION}
SOVERSION ${CPPKAFKA_VERSION})
# In CMake >= 3.15 Boost::boost == Boost::headers
target_link_libraries(${TARGET_NAME} PUBLIC RdKafka::rdkafka Boost::boost)
target_link_libraries(${TARGET_NAME} PUBLIC RdKafka::rdkafka)

if(NOT USE_CPP17 OR NOT CPPKAFKA_DISABLE_EXAMPLES OR NOT CPPKAFKA_DISABLE_TESTS)
target_link_libraries(${TARGET_NAME} PUBLIC Boost::boost)
endif()

if (WIN32)
# On windows ntohs and related are in ws2_32
target_link_libraries(${TARGET_NAME} PUBLIC ws2_32.lib)
Expand Down
1 change: 0 additions & 1 deletion src/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ using std::move;
using std::vector;
using std::initializer_list;
using std::chrono::milliseconds;
using boost::optional;

namespace cppkafka {

Expand Down
2 changes: 1 addition & 1 deletion src/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Message& Message::load_internal() {
return *this;
}

boost::optional<MessageTimestamp> Message::get_timestamp() const {
optional<MessageTimestamp> Message::get_timestamp() const {
rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type);
if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
Expand Down
4 changes: 4 additions & 0 deletions src/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ void Producer::do_produce(const Message& message,
const Buffer& payload = message.get_payload();
const Buffer& key = message.get_key();
const int policy = static_cast<int>(message_payload_policy_);
#ifdef _USE_CPP17
int64_t duration = message.get_timestamp() ? message.get_timestamp().value().get_timestamp().count() : 0;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume both optionals have operator* so rather than doing this you can maybe use that which should make the same expression work for both types?

#else
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
#endif
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(message.get_topic().data()),
RD_KAFKA_V_PARTITION(message.get_partition()),
Expand Down
4 changes: 4 additions & 0 deletions src/utils/backoff_performer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
*
*/

#ifdef _WIN32
#define NOMINMAX
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be set as a definition at the cmake level if it's needed, rather than here in this particular file

#endif

#include <algorithm>
#include <limits>
#include "utils/backoff_performer.h"
Expand Down
4 changes: 2 additions & 2 deletions src/utils/poll_strategy_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace cppkafka {

PollStrategyBase::PollStrategyBase(Consumer& consumer)
: consumer_(consumer),
consumer_queue_(QueueData{consumer.get_consumer_queue(), boost::any()}) {
consumer_queue_(QueueData{consumer.get_consumer_queue(), any()}) {
// get all currently active partition assignments
TopicPartitionList assignment = consumer_.get_assignment();
on_assignment(assignment);
Expand Down Expand Up @@ -93,7 +93,7 @@ void PollStrategyBase::assign(TopicPartitionList& partitions) {
// populate partition queues
for (const auto& partition : partitions) {
// get the queue associated with this partition
partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()});
partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), any()});
}
reset_state();
}
Expand Down