Skip to content

Commit

Permalink
add syncLevel to send send announce/listen early
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Aug 27, 2023
1 parent 899d189 commit bc5e311
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
2 changes: 1 addition & 1 deletion include/opendht/dht.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {
* @param sr The search for which we want to announce a value.
* @param announce The 'announce' structure.
*/
void searchSendAnnounceValue(const Sp<Search>& sr);
void searchSendAnnounceValue(const Sp<Search>& sr, unsigned syncLevel = TARGET_NODES);

/**
* Main process of a Search's operations. This function will demand the
Expand Down
25 changes: 22 additions & 3 deletions src/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,9 @@ Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update)
return nullptr;
}

void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
void Dht::searchSendAnnounceValue(const Sp<Search>& sr, unsigned syncLevel) {
if (sr->announce.empty())
return;
unsigned i = 0;
std::weak_ptr<Search> ws = sr;

auto onDone = [this,ws](const net::Request& req, net::RequestAnswer&& answer)
Expand Down Expand Up @@ -552,6 +551,7 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
static const auto PROBE_QUERY = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum));

const auto& now = scheduler.time();
unsigned i = 0;
for (auto& np : sr->nodes) {
auto& n = *np;
if (not n.isSynced(now))
Expand Down Expand Up @@ -592,7 +592,7 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, PROBE_QUERY));
n.getStatus[PROBE_QUERY] = std::move(req);
}
if (not n.candidate and ++i == TARGET_NODES)
if (not n.candidate and ++i == syncLevel)
break;
}
}
Expand Down Expand Up @@ -675,6 +675,10 @@ Dht::searchStep(std::weak_ptr<Search> ws)
if (not sr or sr->expired or sr->done) return;

const auto& now = scheduler.time();
auto level = sr->syncLevel(now);
constexpr auto MARGIN = 1;
bool preSynced = level > MARGIN;
auto syncLevel = preSynced ? level - MARGIN : 0;
/*if (auto req_count = sr->currentlySolicitedNodeCount())
if (logger_)
logger_->d(sr->id, "[search %s IPv%c] step (%d requests)",
Expand Down Expand Up @@ -735,6 +739,21 @@ Dht::searchStep(std::weak_ptr<Search> ws)

if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->setDone();
} else if (preSynced) {
if (not sr->listeners.empty()) {
auto nl = std::min(syncLevel, LISTEN_NODES);
unsigned i = 0;
for (auto& n : sr->nodes) {
if (not n->isSynced(now))
break;
searchSynchedNodeListen(sr, *n);
if (++i == nl)
break;
}
}

// Announce requests
searchSendAnnounceValue(sr, syncLevel);
}

while (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES and searchSendGetValues(sr));
Expand Down
17 changes: 17 additions & 0 deletions src/search.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ struct Dht::Search {
*/
bool isSynced(time_point now) const;

unsigned syncLevel(time_point now) const;

/**
* Get the time of the last "get" operation performed on this search,
* or time_point::min() if no such operation have been performed.
Expand Down Expand Up @@ -895,6 +897,21 @@ Dht::Search::isSynced(time_point now) const
return i > 0;
}

unsigned
Dht::Search::syncLevel(time_point now) const
{
unsigned i = 0;
for (const auto& n : nodes) {
if (n->isBad())
continue;
if (not n->isSynced(now))
return i;
if (++i == TARGET_NODES)
break;
}
return i;
}

time_point
Dht::Search::getLastGetTime(const Query& q) const
{
Expand Down

0 comments on commit bc5e311

Please sign in to comment.