Skip to content

Commit

Permalink
chore(rln-relay): clean up nullifier table every MaxEpochGap
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Sep 6, 2023
1 parent 76b0071 commit 27f05fb
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 83 deletions.
100 changes: 100 additions & 0 deletions waku/v2/protocol/rln/nullifier_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package rln

import (
"bytes"
"errors"
"sync"

"github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)

// NullifierLog is the log of nullifiers and Shamir shares of the past messages grouped per epoch
type NullifierLog struct {
sync.RWMutex

log *zap.Logger
nullifierLog map[rln.Nullifier][]rln.ProofMetadata // Might make sense to replace this map by a shrinkable map due to https://github.com/golang/go/issues/20135.
nullifierQueue []rln.Nullifier
}

func NewNullifierLog(log *zap.Logger) *NullifierLog {
result := &NullifierLog{
nullifierLog: make(map[rln.Nullifier][]rln.ProofMetadata),
log: log,
}

return result
}

var errAlreadyExists = errors.New("proof already exists")

func (n *NullifierLog) Insert(proofMD rln.ProofMetadata) error {
n.Lock()
defer n.Unlock()

proofs, ok := n.nullifierLog[proofMD.ExternalNullifier]
if ok {
// check if an identical record exists
for _, p := range proofs {
if p.Equals(proofMD) {
// TODO: slashing logic
return errAlreadyExists
}
}
}

n.nullifierLog[proofMD.ExternalNullifier] = append(proofs, proofMD)
n.nullifierQueue = append(n.nullifierQueue, proofMD.ExternalNullifier)
return nil
}

// HasDuplicate returns true if there is another message in the `nullifierLog` with the same
// epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares
// otherwise, returns false
func (n *NullifierLog) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) {
n.RLock()
defer n.RUnlock()

proofs, ok := n.nullifierLog[proofMD.ExternalNullifier]
if !ok {
// epoch does not exist
return false, nil
}

for _, p := range proofs {
if p.Equals(proofMD) {
// there is an identical record, ignore the msg
return true, nil
}
}

// check for a message with the same nullifier but different secret shares
matched := false
for _, it := range proofs {
if bytes.Equal(it.Nullifier[:], proofMD.Nullifier[:]) && (!bytes.Equal(it.ShareX[:], proofMD.ShareX[:]) || !bytes.Equal(it.ShareY[:], proofMD.ShareY[:])) {
matched = true
break
}
}

return matched, nil
}

// Cleanup cleans up the log every time there are more than MaxEpochGap epochs stored in it
func (n *NullifierLog) Cleanup() {
n.Lock()
defer n.Unlock()

if int64(len(n.nullifierQueue)) < maxEpochGap {
return
}

n.log.Debug("clearing epochs from the nullifier log", zap.Int64("count", maxEpochGap))

toDelete := n.nullifierQueue[0:maxEpochGap]
for _, l := range toDelete {
delete(n.nullifierLog, l)
}
n.nullifierQueue = n.nullifierQueue[maxEpochGap:]
}
2 changes: 1 addition & 1 deletion waku/v2/protocol/rln/onchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() {
groupManager: gm,
RLN: rlnInstance,
log: utils.Logger(),
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
nullifierLog: NewNullifierLog(utils.Logger()),
}

err = rlnRelay.Start(context.TODO())
Expand Down
16 changes: 7 additions & 9 deletions waku/v2/protocol/rln/rln_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() {
s.Require().NoError(err)

rlnRelay := &WakuRLNRelay{
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
nullifierLog: NewNullifierLog(utils.Logger()),
rootTracker: rootTracker,
}

Expand Down Expand Up @@ -123,27 +123,25 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() {

// check whether hasDuplicate correctly finds records with the same nullifiers but different secret shares
// no duplicate for wm1 should be found, since the log is empty
result1, err := rlnRelay.HasDuplicate(md1)
result1, err := rlnRelay.nullifierLog.HasDuplicate(md1)
s.Require().NoError(err)
s.Require().False(result1) // No duplicate is found

// Add it to the log
added, err := rlnRelay.updateLog(md1)
err = rlnRelay.nullifierLog.Insert(md1)
s.Require().NoError(err)
s.Require().True(added)

// no duplicate for wm2 should be found, its nullifier differs from wm1
result2, err := rlnRelay.HasDuplicate(md2)
result2, err := rlnRelay.nullifierLog.HasDuplicate(md2)
s.Require().NoError(err)
s.Require().False(result2) // No duplicate is found

// Add it to the log
added, err = rlnRelay.updateLog(md2)
err = rlnRelay.nullifierLog.Insert(md2)
s.Require().NoError(err)
s.Require().True(added)

// wm3 has the same nullifier as wm1 but different secret shares, it should be detected as duplicate
result3, err := rlnRelay.HasDuplicate(md3)
result3, err := rlnRelay.nullifierLog.HasDuplicate(md3)
s.Require().NoError(err)
s.Require().True(result3) // It's a duplicate

Expand Down Expand Up @@ -178,7 +176,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() {
groupManager: groupManager,
rootTracker: rootTracker,
RLN: rlnInstance,
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
nullifierLog: NewNullifierLog(utils.Logger()),
log: utils.Logger(),
metrics: newMetrics(prometheus.DefaultRegisterer),
}
Expand Down
81 changes: 8 additions & 73 deletions waku/v2/protocol/rln/waku_rln_relay.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package rln

import (
"bytes"
"context"
"encoding/hex"
"errors"
"math"
"sync"
"time"

"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -36,11 +34,8 @@ type WakuRLNRelay struct {
groupManager GroupManager
rootTracker *group_manager.MerkleRootTracker

RLN *rln.RLN

// the log of nullifiers and Shamir shares of the past messages grouped per epoch
nullifierLogLock sync.RWMutex
nullifierLog map[rln.Nullifier][]rln.ProofMetadata
RLN *rln.RLN
nullifierLog *NullifierLog

log *zap.Logger
}
Expand Down Expand Up @@ -86,13 +81,14 @@ func New(
metrics: metrics,
log: log,
timesource: timesource,
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
}

return rlnPeer, nil
}

func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error {
rlnRelay.nullifierLog = NewNullifierLog(rlnRelay.log)

err := rlnRelay.groupManager.Start(ctx, rlnRelay.RLN, rlnRelay.rootTracker)
if err != nil {
return err
Expand All @@ -108,72 +104,14 @@ func (rlnRelay *WakuRLNRelay) Stop() error {
return rlnRelay.groupManager.Stop()
}

func (rlnRelay *WakuRLNRelay) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) {
// returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same
// epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares
// otherwise, returns false

rlnRelay.nullifierLogLock.RLock()
proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier]
rlnRelay.nullifierLogLock.RUnlock()

// check if the epoch exists
if !ok {
return false, nil
}

for _, p := range proofs {
if p.Equals(proofMD) {
// there is an identical record, ignore rhe mag
return true, nil
}
}

// check for a message with the same nullifier but different secret shares
matched := false
for _, it := range proofs {
if bytes.Equal(it.Nullifier[:], proofMD.Nullifier[:]) && (!bytes.Equal(it.ShareX[:], proofMD.ShareX[:]) || !bytes.Equal(it.ShareY[:], proofMD.ShareY[:])) {
matched = true
break
}
}

return matched, nil
}

func (rlnRelay *WakuRLNRelay) updateLog(proofMD rln.ProofMetadata) (bool, error) {
rlnRelay.nullifierLogLock.Lock()
defer rlnRelay.nullifierLogLock.Unlock()
proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier]

// check if the epoch exists
if !ok {
rlnRelay.nullifierLog[proofMD.ExternalNullifier] = []rln.ProofMetadata{proofMD}
return true, nil
}

// check if an identical record exists
for _, p := range proofs {
if p.Equals(proofMD) {
// TODO: slashing logic
return true, nil
}
}

// add proofMD to the log
proofs = append(proofs, proofMD)
rlnRelay.nullifierLog[proofMD.ExternalNullifier] = proofs

return true, nil
}

// ValidateMessage validates the supplied message based on the waku-rln-relay routing protocol i.e.,
// the message's epoch is within `maxEpochGap` of the current epoch
// the message's has valid rate limit proof
// the message's does not violate the rate limit
// if `optionalTime` is supplied, then the current epoch is calculated based on that, otherwise the current time will be used
func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time.Time) (messageValidationResult, error) {
//
rlnRelay.nullifierLog.Cleanup()

if msg == nil {
return validationError, errors.New("nil message")
}
Expand Down Expand Up @@ -237,7 +175,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
}

// check if double messaging has happened
hasDup, err := rlnRelay.HasDuplicate(proofMD)
hasDup, err := rlnRelay.nullifierLog.HasDuplicate(proofMD)
if err != nil {
rlnRelay.log.Debug("validation error", zap.Error(err))
rlnRelay.metrics.RecordError(duplicateCheckErr)
Expand All @@ -249,10 +187,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
return spamMessage, nil
}

// insert the message to the log
// the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e.,
// it will never error out
_, err = rlnRelay.updateLog(proofMD)
err = rlnRelay.nullifierLog.Insert(proofMD)
if err != nil {
rlnRelay.log.Debug("could not insert proof into log")
rlnRelay.metrics.RecordError(logInsertionErr)
Expand Down

0 comments on commit 27f05fb

Please sign in to comment.