From f1b514ba664a1eb447a62e56c9fb37fc50ca8f42 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sat, 13 Jul 2024 15:03:29 +0530 Subject: [PATCH 1/3] chore: generate different reqID for lightpush towards diff peers --- waku/v2/protocol/lightpush/waku_lightpush.go | 8 +------- waku/v2/protocol/lightpush/waku_lightpush_option.go | 1 - 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index b8f3c0f39..a49a94370 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -189,13 +189,6 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu // request sends a message via lightPush protocol to either a specified peer or peer that is selected. func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { - if params == nil { - return nil, errors.New("lightpush params are mandatory") - } - - if len(params.requestID) == 0 { - return nil, ErrInvalidID - } logger := wakuLP.log.With(logging.HostID("peer", peer)) @@ -336,6 +329,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa for _, peerID := range params.selectedPeers { wg.Add(1) go func(id peer.ID) { + params.requestID = protocol.GenerateRequestID() defer wg.Done() response, err := wakuLP.request(ctx, req, params, id) if err != nil { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index b192fd17a..7ed043705 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -133,7 +133,6 @@ func WithAutomaticRequestID() RequestOption { // DefaultOptions are the default options to be used when using the lightpush protocol func DefaultOptions(host host.Host) []RequestOption { return []RequestOption{ - WithAutomaticRequestID(), WithAutomaticPeerSelection(), WithMaxPeers(1), //keeping default as 2 for status use-case } From d7e853c5947635ff7c6801c596c75cddbbee3c56 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sat, 13 Jul 2024 15:19:18 +0530 Subject: [PATCH 2/3] fix: unsubscribe in case invalid push requests are received #1124 --- waku/v2/protocol/filter/client.go | 26 +++++++++++++++---- .../subscription/subscriptions_map.go | 9 ++++--- .../subscription/subscriptions_map_test.go | 12 ++++----- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 5909bbbd8..3342dc299 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -147,6 +147,17 @@ func (wf *WakuFilterLightNode) Stop() { }) } +func (wf *WakuFilterLightNode) unsubscribeWithoutSubscription(cf protocol.ContentFilter, peerID peer.ID) { + err := wf.request( + wf.Context(), + protocol.GenerateRequestID(), + pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, + cf, peerID) + if err != nil { + wf.log.Warn("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) + } +} + func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { peerID := stream.Conn().RemotePeer() @@ -156,6 +167,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea if !wf.subscriptions.IsSubscribedTo(peerID) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) + //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us + //This could be happening due to https://github.com/waku-org/go-waku/issues/1124 + go wf.unsubscribeWithoutSubscription(protocol.ContentFilter{}, peerID) if err := stream.Reset(); err != nil { wf.log.Error("resetting connection", zap.Error(err)) } @@ -199,22 +213,24 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } logger = messagePush.WakuMessage.Logger(logger, pubSubTopic) - - if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) { + cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic) + if !wf.subscriptions.Has(peerID, cf) { logger.Warn("received messagepush with invalid subscription parameters") + //Unsubscribe from that peer for the contentTopic, possibly due to https://github.com/waku-org/go-waku/issues/1124 + go wf.unsubscribeWithoutSubscription(cf, peerID) wf.metrics.RecordError(invalidSubscriptionMessage) return } wf.metrics.RecordMessage() - wf.notify(peerID, pubSubTopic, messagePush.WakuMessage) + wf.notify(ctx, peerID, pubSubTopic, messagePush.WakuMessage) logger.Info("received message push") } } -func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) { +func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) { envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic) if wf.broadcaster != nil { @@ -222,7 +238,7 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, wf.broadcaster.Submit(envelope) } // Notify filter subscribers - wf.subscriptions.Notify(remotePeerID, envelope) + wf.subscriptions.Notify(ctx, remotePeerID, envelope) } func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index 4692d9538..a6621f131 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -1,6 +1,7 @@ package subscription import ( + "context" "errors" "sync" @@ -178,17 +179,17 @@ func (sub *SubscriptionsMap) Clear() { sub.clear() } -func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) { +func (sub *SubscriptionsMap) Notify(ctx context.Context, peerID peer.ID, envelope *protocol.Envelope) { sub.RLock() defer sub.RUnlock() subscriptions, ok := sub.items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()] if ok { - iterateSubscriptionSet(sub.logger, subscriptions, envelope) + iterateSubscriptionSet(ctx, sub.logger, subscriptions, envelope) } } -func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) { +func iterateSubscriptionSet(ctx context.Context, logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) { for _, subscription := range subscriptions { func(subscription *SubscriptionDetails) { subscription.RLock() @@ -201,6 +202,8 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e if !subscription.Closed { select { + case <-ctx.Done(): + return case subscription.C <- envelope: default: logger.Warn("can't deliver message to subscription. subscriber too slow") diff --git a/waku/v2/protocol/subscription/subscriptions_map_test.go b/waku/v2/protocol/subscription/subscriptions_map_test.go index f5c6d21ed..01a3b788f 100644 --- a/waku/v2/protocol/subscription/subscriptions_map_test.go +++ b/waku/v2/protocol/subscription/subscriptions_map_test.go @@ -153,8 +153,8 @@ func TestSubscriptionsNotify(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - fmap.Notify(p1, envTopic1Ct1) - fmap.Notify(p2, envTopic1Ct1) + fmap.Notify(ctx, p1, envTopic1Ct1) + fmap.Notify(ctx, p2, envTopic1Ct1) }() <-successChan @@ -177,8 +177,8 @@ func TestSubscriptionsNotify(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - fmap.Notify(p1, envTopic1Ct2) - fmap.Notify(p2, envTopic1Ct2) + fmap.Notify(ctx, p1, envTopic1Ct2) + fmap.Notify(ctx, p2, envTopic1Ct2) }() <-successChan @@ -207,8 +207,8 @@ func TestSubscriptionsNotify(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - fmap.Notify(p1, envTopic1Ct1_2) - fmap.Notify(p2, envTopic1Ct1_2) + fmap.Notify(ctx, p1, envTopic1Ct1_2) + fmap.Notify(ctx, p2, envTopic1Ct1_2) }() <-successChan // One of these successes is for closing the subscription From c3fd4622b18927165a73cf5e53d14c0f488a41fe Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 15 Jul 2024 19:40:26 +0530 Subject: [PATCH 3/3] chore: address review comments --- waku/v2/protocol/lightpush/waku_lightpush.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 79e1b088c..de7bce8b4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -329,9 +329,10 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa for _, peerID := range params.selectedPeers { wg.Add(1) go func(id peer.ID) { - params.requestID = protocol.GenerateRequestID() + paramsValue := *params + paramsValue.requestID = protocol.GenerateRequestID() defer wg.Done() - response, err := wakuLP.request(ctx, req, params, id) + response, err := wakuLP.request(ctx, req, ¶msValue, id) if err != nil { logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) }