Skip to content

Commit

Permalink
peer discovery: add a timer to schedule connectivityChanged with a …
Browse files Browse the repository at this point in the history
…maximum interval of one minute
  • Loading branch information
AmnaSnene authored and aberaud committed Jul 16, 2024
1 parent 76a2bd2 commit 38454ea
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
35 changes: 21 additions & 14 deletions include/opendht/peer_discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include "sockaddr.h"
#include "infohash.h"
#include "logger.h"
#include "utils.h"

#include <asio/steady_timer.hpp>
#include <thread>

namespace asio {
Expand All @@ -36,54 +38,59 @@ class OPENDHT_PUBLIC PeerDiscovery
{
public:
static constexpr in_port_t DEFAULT_PORT = 8888;
using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&&)>;
using ServiceDiscoveredCallback = std::function<void(msgpack::object &&, SockAddr &&)>;

PeerDiscovery(in_port_t port = DEFAULT_PORT, std::shared_ptr<asio::io_context> ioContext = {}, std::shared_ptr<Logger> logger = {});
PeerDiscovery(in_port_t port = DEFAULT_PORT,
std::shared_ptr<asio::io_context> ioContext = {},
std::shared_ptr<Logger> logger = {});
~PeerDiscovery();

/**
* startDiscovery - Keep Listening data from the sender until node is joinned or stop is called
*/
* startDiscovery - Keep Listening data from the sender until node is joinned
* or stop is called
*/
void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback);

template<typename T>
void startDiscovery(const std::string &type, std::function<void(T&&, SockAddr&&)> cb) {
startDiscovery(type, [cb](msgpack::object&& ob, SockAddr&& addr) {
template <typename T>
void startDiscovery(const std::string &type, std::function<void(T &&, SockAddr &&)> cb) {
startDiscovery(type, [cb](msgpack::object &&ob, SockAddr &&addr) {
cb(ob.as<T>(), std::move(addr));
});
}

/**
* startPublish - Keeping sending data until node is joinned or stop is called
*/
*/
void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf);
void startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf);

template<typename T>
void startPublish(const std::string &type, const T& object) {
template <typename T>
void startPublish(const std::string &type, const T &object) {
msgpack::sbuffer buf;
msgpack::pack(buf, object);
startPublish(type, buf);
}

/**
* Thread Stopper
*/
*/
void stop();

/**
* Remove possible callBack to discovery
*/
*/
bool stopDiscovery(const std::string &type);

/**
* Remove different serivce message to send
*/
*/
bool stopPublish(const std::string &type);
bool stopPublish(sa_family_t domain, const std::string &type);

void connectivityChanged();

void stopConnectivityChanged();

private:
class DomainPeerDiscovery;
std::unique_ptr<DomainPeerDiscovery> peerDiscovery4_;
Expand All @@ -92,4 +99,4 @@ class OPENDHT_PUBLIC PeerDiscovery
std::thread ioRunnner_;
};

}
} // namespace dht
4 changes: 4 additions & 0 deletions src/dhtrunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ DhtRunner::run(const Config& config, Context&& context)
if (status4 == NodeStatus::Disconnected && status6 == NodeStatus::Disconnected) {
peerDiscovery_->connectivityChanged();
}
else if (status4 != NodeStatus::Connected || status6 != NodeStatus::Connected) {
peerDiscovery_->stopConnectivityChanged();
}

});
}
#endif
Expand Down
35 changes: 35 additions & 0 deletions src/peer_discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,22 @@ class PeerDiscovery::DomainPeerDiscovery

void connectivityChanged();

void stopConnectivityChanged();
private:
Sp<Logger> logger_;
//dmtx_ for callbackmap_ and drunning_ (write)
std::mutex dmtx_;
//mtx_ for messages_ and lrunning (listen)
std::mutex mtx_;
std::shared_ptr<asio::io_context> ioContext_;

static constexpr dht::duration PeerDiscovery_PERIOD_MAX{
std::chrono::minutes(1)};
static constexpr std::chrono::seconds PeerDiscovery_PERIOD{10};
asio::steady_timer peerDiscoveryTimer;
std::chrono::steady_clock::duration peerDiscovery_period{
PeerDiscovery_PERIOD};

asio::ip::udp::socket sockFd_;
asio::ip::udp::endpoint sockAddrSend_;

Expand Down Expand Up @@ -86,6 +95,7 @@ class PeerDiscovery::DomainPeerDiscovery
PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(asio::ip::udp domain, in_port_t port, Sp<asio::io_context> ioContext, Sp<Logger> logger)
: logger_(logger)
, ioContext_(ioContext)
, peerDiscoveryTimer(*ioContext_)
, sockFd_(*ioContext_, domain)
, sockAddrSend_(asio::ip::address::from_string(domain.family() == AF_INET ? MULTICAST_ADDRESS_IPV4
: MULTICAST_ADDRESS_IPV6), port)
Expand Down Expand Up @@ -323,6 +333,24 @@ PeerDiscovery::DomainPeerDiscovery::connectivityChanged()
});
if (logger_)
logger_->d("PeerDiscovery: connectivity changed");

if (peerDiscovery_period == PeerDiscovery_PERIOD_MAX ){
peerDiscovery_period = PeerDiscovery_PERIOD;
}
else{
peerDiscoveryTimer.expires_after(peerDiscovery_period);
peerDiscoveryTimer.async_wait([this](const asio::error_code& ec) {
if (ec == asio::error::operation_aborted)
return;
connectivityChanged();
});
peerDiscovery_period = std::min(peerDiscovery_period * 2, PeerDiscovery_PERIOD_MAX);
}
}

void PeerDiscovery::DomainPeerDiscovery::stopConnectivityChanged() {
peerDiscoveryTimer.cancel();
peerDiscovery_period = PeerDiscovery_PERIOD;
}

PeerDiscovery::PeerDiscovery(in_port_t port, Sp<asio::io_context> ioContext, Sp<Logger> logger)
Expand Down Expand Up @@ -435,4 +463,11 @@ PeerDiscovery::connectivityChanged()
peerDiscovery6_->connectivityChanged();
}

void PeerDiscovery::stopConnectivityChanged() {
if (peerDiscovery4_)
peerDiscovery4_->stopConnectivityChanged();
if (peerDiscovery6_)
peerDiscovery6_->stopConnectivityChanged();
}

} /* namespace dht */

0 comments on commit 38454ea

Please sign in to comment.