Skip to content

Commit

Permalink
refactor: usage of rlnInstance,rootTracker,groupManager
Browse files Browse the repository at this point in the history
rlnInstance, rootTrack were previously created while creating rlnRelay
but were assigned to groupManager on Start of rlnRelay. This created
unncessary dependency of passing them to static and dynamic group
manager.
Web3Config uses interface EthClientI for client, so that we can pass
mock client for testing MembershipFetcher.
  • Loading branch information
harsh-98 committed Sep 6, 2023
1 parent 8b014c7 commit 9ad1355
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 120 deletions.
13 changes: 11 additions & 2 deletions waku/v2/node/wakunode2_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/static"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/keystore"
Expand All @@ -23,12 +24,14 @@ func (w *WakuNode) RLNRelay() RLNRelay {

func (w *WakuNode) setupRLNRelay() error {
var err error
var groupManager rln.GroupManager

if !w.opts.enableRLN {
return nil
}

var groupManager rln.GroupManager

rlnInstance, rootTracker := rln.GetRLNInstanceAndRootTracker(w.opts.rlnTreePath)
if !w.opts.rlnRelayDynamic {
w.log.Info("setting up waku-rln-relay in off-chain mode")

Expand Down Expand Up @@ -58,13 +61,19 @@ func (w *WakuNode) setupRLNRelay() error {
w.opts.keystorePassword,
w.opts.prometheusReg,
w.log,
rlnInstance,
rootTracker,
)
if err != nil {
return err
}
}

rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.opts.prometheusReg, w.log)
rlnRelay, err := rln.New(group_manager.GMDetails{
GroupManager: groupManager,
RootTracker: rootTracker,
RLN: rlnInstance,
}, w.timesource, w.opts.prometheusReg, w.log)
if err != nil {
return err
}
Expand Down
33 changes: 12 additions & 21 deletions waku/v2/protocol/rln/group_manager/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -28,28 +27,21 @@ var RLNAppInfo = keystore.AppInfo{
}

type DynamicGroupManager struct {
rln *rln.RLN
log *zap.Logger
MembershipFetcher
metrics Metrics

cancel context.CancelFunc
wg sync.WaitGroup

identityCredential *rln.IdentityCredential
membershipIndex rln.MembershipIndex

web3Config *web3.Config
lastBlockProcessed uint64

eventHandler RegistrationEventHandler

appKeystore *keystore.AppKeystore
keystorePassword string

rootTracker *group_manager.MerkleRootTracker
}

func handler(gm *DynamicGroupManager, events []*contracts.RLNMemberRegistered) error {
func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error {
toRemoveTable := om.New()
toInsertTable := om.New()

Expand Down Expand Up @@ -117,17 +109,18 @@ func NewDynamicGroupManager(
keystorePassword string,
reg prometheus.Registerer,
log *zap.Logger,
rlnInstance *rln.RLN,
rootTracker *group_manager.MerkleRootTracker,
) (*DynamicGroupManager, error) {
log = log.Named("rln-dynamic")

web3Config := web3.NewConfig(ethClientAddr, memContractAddr)
return &DynamicGroupManager{
membershipIndex: membershipIndex,
web3Config: web3.NewConfig(ethClientAddr, memContractAddr),
eventHandler: handler,
appKeystore: appKeystore,
keystorePassword: keystorePassword,
log: log,
metrics: newMetrics(reg),
membershipIndex: membershipIndex,
appKeystore: appKeystore,
keystorePassword: keystorePassword,
MembershipFetcher: NewMembershipFetcher(web3Config, rlnInstance, rootTracker, log),
metrics: newMetrics(reg),
}, nil
}

Expand Down Expand Up @@ -164,7 +157,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN,
return err
}

if err = gm.HandleGroupUpdates(ctx, gm.eventHandler); err != nil {
if err = gm.MembershipFetcher.HandleGroupUpdates(ctx, gm.handler); err != nil {
return err
}

Expand Down Expand Up @@ -265,9 +258,7 @@ func (gm *DynamicGroupManager) Stop() error {
return err
}

gm.web3Config.ETHClient.Close()

gm.wg.Wait()
gm.MembershipFetcher.Stop()

return nil
}
31 changes: 15 additions & 16 deletions waku/v2/protocol/rln/group_manager/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dynamic
import (
"context"
"math/big"
"sync"
"testing"

"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -35,21 +34,21 @@ func TestHandler(t *testing.T) {
rootTracker, err := group_manager.NewMerkleRootTracker(5, rlnInstance)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.TODO())
_, cancel := context.WithCancel(context.TODO())
defer cancel()

_ = ctx

gm := &DynamicGroupManager{
rln: rlnInstance,
log: utils.Logger(),
MembershipFetcher: NewMembershipFetcher(
&web3.Config{
ChainID: big.NewInt(1),
},
rlnInstance,
rootTracker,
utils.Logger(),
),
cancel: cancel,
wg: sync.WaitGroup{},
web3Config: &web3.Config{
ChainID: big.NewInt(1),
},
rootTracker: rootTracker,
metrics: newMetrics(prometheus.DefaultRegisterer),

metrics: newMetrics(prometheus.DefaultRegisterer),
}

root0 := [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33}
Expand All @@ -59,7 +58,7 @@ func TestHandler(t *testing.T) {

events := []*contracts.RLNMemberRegistered{eventBuilder(1, false, 0xaaaa, 1)}

err = handler(gm, events)
err = gm.handler(events)
require.NoError(t, err)

roots = gm.rootTracker.Roots()
Expand All @@ -75,7 +74,7 @@ func TestHandler(t *testing.T) {
eventBuilder(4, false, 0xeeee, 5),
}

err = handler(gm, events)
err = gm.handler(events)
require.NoError(t, err)

// Root[1] should become [0]
Expand All @@ -99,7 +98,7 @@ func TestHandler(t *testing.T) {
eventBuilder(3, false, 0xeeee, 5),
}

err = handler(gm, events)
err = gm.handler(events)
require.NoError(t, err)

roots = gm.rootTracker.Roots()
Expand All @@ -113,7 +112,7 @@ func TestHandler(t *testing.T) {
// Adding multiple events for same block
events = []*contracts.RLNMemberRegistered{}

err = handler(gm, events)
err = gm.handler(events)
require.NoError(t, err)

roots = gm.rootTracker.Roots()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,44 @@ import (
"bytes"
"context"
"errors"
"sync"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/web3"
"github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)

// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
type RegistrationEventHandler = func(*DynamicGroupManager, []*contracts.RLNMemberRegistered) error
type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error

type MembershipFetcher struct {
web3Config *web3.Config
rln *rln.RLN
log *zap.Logger
rootTracker *group_manager.MerkleRootTracker
wg sync.WaitGroup
}

func NewMembershipFetcher(web3Config *web3.Config, rln *rln.RLN, rootTracker *group_manager.MerkleRootTracker, log *zap.Logger) MembershipFetcher {
return MembershipFetcher{
web3Config: web3Config,
rln: rln,
log: log,
rootTracker: rootTracker,
}
}

// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract
// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract`
// and collects all the events, for every received event, it calls the `handler`
func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error {
func (gm *MembershipFetcher) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error {
fromBlock := gm.web3Config.RLNContract.DeployedBlockNumber
metadata, err := gm.GetMetadata()
if err != nil {
Expand All @@ -45,7 +66,7 @@ func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler R
return err
}
//
err = gm.loadOldEvents(ctx, gm.web3Config.RLNContract.RLN, fromBlock, latestBlockNumber, handler)
err = gm.loadOldEvents(ctx, fromBlock, latestBlockNumber, handler)
if err != nil {
return err
}
Expand All @@ -57,13 +78,13 @@ func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler R
return <-errCh
}

func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, fromBlock, toBlock uint64, handler RegistrationEventHandler) error {
func (gm *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlock uint64, handler RegistrationEventHandler) error {
for ; fromBlock+maxBatchSize < toBlock; fromBlock += maxBatchSize + 1 { // check if the end of the batch is within the toBlock range
events, err := gm.getEvents(ctx, fromBlock, fromBlock+maxBatchSize)
if err != nil {
return err
}
if err := handler(gm, events); err != nil {
if err := handler(events); err != nil {
return err
}
}
Expand All @@ -74,10 +95,10 @@ func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *c
return err
}
// process all the fetched events
return handler(gm, events)
return handler(events)
}

func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) {
func (gm *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) {
defer gm.wg.Done()

// Watch for new events
Expand Down Expand Up @@ -113,7 +134,7 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, fromBlock uin
// update the last processed block
fromBlock = toBlock + 1

err = handler(gm, events)
err = handler(events)
if err != nil {
gm.log.Error("processing rln log", zap.Error(err))
}
Expand All @@ -135,7 +156,7 @@ func tooMuchDataRequestedError(err error) bool {
return err.Error() == "query returned more than 10000 results"
}

func (gm *DynamicGroupManager) latestBlockNumber(ctx context.Context) (uint64, error) {
func (gm *MembershipFetcher) latestBlockNumber(ctx context.Context) (uint64, error) {
block, err := gm.web3Config.ETHClient.BlockByNumber(ctx, nil)
if err != nil {
return 0, err
Expand All @@ -144,7 +165,7 @@ func (gm *DynamicGroupManager) latestBlockNumber(ctx context.Context) (uint64, e
return block.Number().Uint64(), nil
}

func (gm *DynamicGroupManager) getEvents(ctx context.Context, fromBlock uint64, toBlock uint64) ([]*contracts.RLNMemberRegistered, error) {
func (gm *MembershipFetcher) getEvents(ctx context.Context, fromBlock uint64, toBlock uint64) ([]*contracts.RLNMemberRegistered, error) {
evts, err := gm.fetchEvents(ctx, fromBlock, toBlock)
if err != nil {
if tooMuchDataRequestedError(err) { // divide the range and try again
Expand All @@ -164,7 +185,7 @@ func (gm *DynamicGroupManager) getEvents(ctx context.Context, fromBlock uint64,
return evts, nil
}

func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) {
func (gm *MembershipFetcher) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) {
logIterator, err := gm.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx})
if err != nil {
return nil, err
Expand All @@ -186,3 +207,19 @@ func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to

return results, nil
}

// GetMetadata retrieves metadata from the zerokit's RLN database
func (gm *MembershipFetcher) GetMetadata() (RLNMetadata, error) {
b, err := gm.rln.GetMetadata()
if err != nil {
return RLNMetadata{}, err
}

return DeserializeMetadata(b)
}

func (gm *MembershipFetcher) Stop() {
gm.web3Config.ETHClient.Close()
// wait for the watchNewEvents goroutine to finish
gm.wg.Wait()
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/web3"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-zerokit-rln/rln"
Expand All @@ -16,7 +17,7 @@ import (
var NULL_ADDR common.Address = common.HexToAddress("0x0000000000000000000000000000000000000000")

func TestFetchingLogic(t *testing.T) {
client := NewMockClient(t, "web3_test.json")
client := NewMockClient(t, "membership_fetcher.json")

rlnContract, err := contracts.NewRLN(NULL_ADDR, client)
if err != nil {
Expand All @@ -26,20 +27,24 @@ func TestFetchingLogic(t *testing.T) {
if err != nil {
t.Fatal(err)
}

dgm := DynamicGroupManager{
rootTracker, err := group_manager.NewMerkleRootTracker(1, rlnInstance)
if err != nil {
t.Fatal(err)
}
dgm := MembershipFetcher{
web3Config: &web3.Config{
RLNContract: web3.RLNContract{
RLN: rlnContract,
},
ETHClient: client,
},
rln: rlnInstance,
log: utils.Logger(),
rln: rlnInstance,
log: utils.Logger(),
rootTracker: rootTracker,
}

counts := []int{}
mockFn := func(_ *DynamicGroupManager, events []*contracts.RLNMemberRegistered) error {
mockFn := func(events []*contracts.RLNMemberRegistered) error {
counts = append(counts, len(events))
return nil
}
Expand Down
Loading

0 comments on commit 9ad1355

Please sign in to comment.