Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lightclient err handling #1160

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func (wf *WakuFilterLightNode) Stop() {
})
}

func (wf *WakuFilterLightNode) unsubscribeWithoutSubscription(cf protocol.ContentFilter, peerID peer.ID) {
err := wf.request(
wf.Context(),
protocol.GenerateRequestID(),
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
cf, peerID)
if err != nil {
wf.log.Warn("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
}

func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
Expand All @@ -156,6 +167,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
//This could be happening due to https://github.com/waku-org/go-waku/issues/1124
go wf.unsubscribeWithoutSubscription(protocol.ContentFilter{}, peerID)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
Expand Down Expand Up @@ -199,30 +213,32 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
}

logger = messagePush.WakuMessage.Logger(logger, pubSubTopic)

if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)
if !wf.subscriptions.Has(peerID, cf) {
logger.Warn("received messagepush with invalid subscription parameters")
//Unsubscribe from that peer for the contentTopic, possibly due to https://github.com/waku-org/go-waku/issues/1124
go wf.unsubscribeWithoutSubscription(cf, peerID)
wf.metrics.RecordError(invalidSubscriptionMessage)
return
}

wf.metrics.RecordMessage()

wf.notify(peerID, pubSubTopic, messagePush.WakuMessage)
wf.notify(ctx, peerID, pubSubTopic, messagePush.WakuMessage)

logger.Info("received message push")
}
}

func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic)

if wf.broadcaster != nil {
// Broadcasting message so it's stored
wf.broadcaster.Submit(envelope)
}
// Notify filter subscribers
wf.subscriptions.Notify(remotePeerID, envelope)
wf.subscriptions.Notify(ctx, remotePeerID, envelope)
}

func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
Expand Down
8 changes: 1 addition & 7 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,6 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu

// request sends a message via lightPush protocol to either a specified peer or peer that is selected.
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) {
if params == nil {
return nil, errors.New("lightpush params are mandatory")
}

if len(params.requestID) == 0 {
return nil, ErrInvalidID
}

logger := wakuLP.log.With(logging.HostID("peer", peer))

Expand Down Expand Up @@ -336,6 +329,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
for _, peerID := range params.selectedPeers {
wg.Add(1)
go func(id peer.ID) {
params.requestID = protocol.GenerateRequestID()
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
response, err := wakuLP.request(ctx, req, params, id)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion waku/v2/protocol/lightpush/waku_lightpush_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func WithAutomaticRequestID() RequestOption {
// DefaultOptions are the default options to be used when using the lightpush protocol
func DefaultOptions(host host.Host) []RequestOption {
return []RequestOption{
WithAutomaticRequestID(),
WithAutomaticPeerSelection(),
WithMaxPeers(1), //keeping default as 2 for status use-case
}
Expand Down
9 changes: 6 additions & 3 deletions waku/v2/protocol/subscription/subscriptions_map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package subscription

import (
"context"
"errors"
"sync"

Expand Down Expand Up @@ -178,17 +179,17 @@ func (sub *SubscriptionsMap) Clear() {
sub.clear()
}

func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) {
func (sub *SubscriptionsMap) Notify(ctx context.Context, peerID peer.ID, envelope *protocol.Envelope) {
sub.RLock()
defer sub.RUnlock()

subscriptions, ok := sub.items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()]
if ok {
iterateSubscriptionSet(sub.logger, subscriptions, envelope)
iterateSubscriptionSet(ctx, sub.logger, subscriptions, envelope)
}
}

func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) {
func iterateSubscriptionSet(ctx context.Context, logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) {
for _, subscription := range subscriptions {
func(subscription *SubscriptionDetails) {
subscription.RLock()
Expand All @@ -201,6 +202,8 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e

if !subscription.Closed {
select {
case <-ctx.Done():
return
case subscription.C <- envelope:
default:
logger.Warn("can't deliver message to subscription. subscriber too slow")
Expand Down
12 changes: 6 additions & 6 deletions waku/v2/protocol/subscription/subscriptions_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func TestSubscriptionsNotify(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct1)
fmap.Notify(p2, envTopic1Ct1)
fmap.Notify(ctx, p1, envTopic1Ct1)
fmap.Notify(ctx, p2, envTopic1Ct1)
}()

<-successChan
Expand All @@ -177,8 +177,8 @@ func TestSubscriptionsNotify(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct2)
fmap.Notify(p2, envTopic1Ct2)
fmap.Notify(ctx, p1, envTopic1Ct2)
fmap.Notify(ctx, p2, envTopic1Ct2)
}()

<-successChan
Expand Down Expand Up @@ -207,8 +207,8 @@ func TestSubscriptionsNotify(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct1_2)
fmap.Notify(p2, envTopic1Ct1_2)
fmap.Notify(ctx, p1, envTopic1Ct1_2)
fmap.Notify(ctx, p2, envTopic1Ct1_2)
}()

<-successChan // One of these successes is for closing the subscription
Expand Down
Loading