Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

rename/pubsub-topicfilter #2170

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/googleapis/gnostic v0.0.0-20190624222214-25d8b0b66985 // indirect
github.com/gorilla/mux v1.7.3 // indirect
github.com/hashicorp/golang-lru v0.5.3
github.com/influxdata/influxdb v0.0.0-20180221223340-01288bdb0883
github.com/json-iterator/go v1.1.7 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Expand Down
22 changes: 11 additions & 11 deletions pss/pubsub.go → pss/topicfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ import (
"github.com/ethersphere/swarm/pss/message"
)

// PubSub implements the pushsync.PubSub interface using pss
type PubSub struct {
// TopicFilter implements the pushsync.TopicFilter interface using pss
type TopicFilter struct {
pss *Pss
messageTTL time.Duration // expire duration of a pubsub message. Depends on the use case.
messageTTL time.Duration // expire duration of a topic filter message. Depends on the use case.
}

// NewPubSub creates a new PubSub
func NewPubSub(p *Pss, messageTTL time.Duration) *PubSub {
return &PubSub{
// NewTopicFilter creates a new TopicFilter
func NewTopicFilter(p *Pss, messageTTL time.Duration) *TopicFilter {
return &TopicFilter{
pss: p,
messageTTL: messageTTL,
}
}

// BaseAddr returns Kademlia base address
func (p *PubSub) BaseAddr() []byte {
func (p *TopicFilter) BaseAddr() []byte {
return p.pss.BaseAddr()
}

Expand All @@ -51,12 +51,12 @@ func isPssPeer(bp *network.BzzPeer) bool {
// IsClosestTo returns true is self is the closest known node to addr
// as uniquely defined by the MSB XOR distance
// among pss capable peers
func (p *PubSub) IsClosestTo(addr []byte) bool {
func (p *TopicFilter) IsClosestTo(addr []byte) bool {
return p.pss.IsClosestTo(addr, isPssPeer)
}

// Register registers a handler
func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() {
func (p *TopicFilter) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() {
f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error {
return handler(msg, peer)
}
Expand All @@ -69,8 +69,8 @@ func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p
}

// Send sends a message using pss SendRaw
func (p *PubSub) Send(to []byte, topic string, msg []byte) error {
defer metrics.GetOrRegisterResettingTimer("pss/pubsub/send", nil).UpdateSince(time.Now())
func (p *TopicFilter) Send(to []byte, topic string, msg []byte) error {
defer metrics.GetOrRegisterResettingTimer("pss/topicfilter/send", nil).UpdateSince(time.Now())
pt := message.NewTopic([]byte(topic))
return p.pss.SendRaw(PssAddress(to), pt, msg, p.messageTTL)
}
4 changes: 2 additions & 2 deletions pushsync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const (
pssReceiptTopic = "PUSHSYNC_RECEIPTS" // pss topic for statement of custody receipts
)

// PubSub is a Postal Service interface needed to send/receive chunks and receipts for push syncing
type PubSub interface {
// TopicFilter is a Postal Service interface needed to send/receive chunks and receipts for push syncing
type TopicFilter interface {
Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func()
Send(to []byte, topic string, msg []byte) error
BaseAddr() []byte
Expand Down
6 changes: 3 additions & 3 deletions pushsync/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// TestProtocol tests the push sync protocol
// push syncer node communicate with storers via mock loopback PubSub
// push syncer node communicate with storers via mock loopback TopicFilter
func TestProtocol(t *testing.T) {
timeout := 10 * time.Second
chunkCnt := 1024
Expand All @@ -51,7 +51,7 @@ func TestProtocol(t *testing.T) {
log.Debug("closest node?", "n", n, "n%storerCnt", n%storerCnt, "storer", j)
return n%storerCnt == j
}
storers[j] = NewStorer(&testStore{store}, &testPubSub{lb, isClosestTo})
storers[j] = NewStorer(&testStore{store}, &testTopicFilter{lb, isClosestTo})
}

tags, tagIDs := setupTags(chunkCnt, tagCnt)
Expand All @@ -60,7 +60,7 @@ func TestProtocol(t *testing.T) {
// isClosestTo function mocked
isClosestTo := func([]byte) bool { return false }
// start push syncing in a go routine
p := NewPusher(tp, &testPubSub{lb, isClosestTo}, tags)
p := NewPusher(tp, &testTopicFilter{lb, isClosestTo}, tags)
defer p.Close()

synced := make(map[int]int)
Expand Down
8 changes: 4 additions & 4 deletions pushsync/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Pusher struct {
syncedAddrs []storage.Address
syncedAddrsMu sync.Mutex
receipts chan []byte // channel to receive receipts
ps PubSub // PubSub interface to send chunks and receive receipts
ps TopicFilter // TopicFilter interface to send chunks and receive receipts
logger log.Logger // custom logger
}

Expand All @@ -73,9 +73,9 @@ type pushedItem struct {
// NewPusher constructs a Pusher and starts up the push sync protocol
// takes
// - a DB interface to subscribe to push sync index to allow iterating over recently stored chunks
// - a pubsub interface to send chunks and receive statements of custody
// - a TopicFilter interface to send chunks and receive statements of custody
// - tags that hold the tags
func NewPusher(store DB, ps PubSub, tags *chunk.Tags) *Pusher {
func NewPusher(store DB, ps TopicFilter, tags *chunk.Tags) *Pusher {
p := &Pusher{
store: store,
tags: tags,
Expand Down Expand Up @@ -296,7 +296,7 @@ func (p *Pusher) pushReceipt(addr []byte) {
}

// sendChunkMsg sends chunks to their destination
// using the PubSub interface Send method (e.g., pss neighbourhood addressing)
// using the TopicFilter interface Send method (e.g., pss neighbourhood addressing)
func (p *Pusher) sendChunkMsg(ch chunk.Chunk) error {
rlpTimer := time.Now()

Expand Down
16 changes: 8 additions & 8 deletions pushsync/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestPusher(t *testing.T) {
// construct the mock push sync index iterator
tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent)
// start push syncing in a go routine
p := NewPusher(tp, &testPubSub{lb, func([]byte) bool { return false }}, tags)
p := NewPusher(tp, &testTopicFilter{lb, func([]byte) bool { return false }}, tags)
defer p.Close()
// collect synced chunks until all chunks synced
// wait on errc for errors on any thread
Expand All @@ -121,25 +121,25 @@ func TestPusher(t *testing.T) {

}

type testPubSub struct {
type testTopicFilter struct {
*loopBack
isClosestTo func([]byte) bool
}

var testBaseAddr = make([]byte, 32)

// BaseAddr needed to implement PubSub interface
// in the testPubSub, this address has no relevant and is given only for logging
func (tps *testPubSub) BaseAddr() []byte {
// BaseAddr needed to implement TopicFilter interface
// in the testTopicFilter, this address has no relevant and is given only for logging
func (tps *testTopicFilter) BaseAddr() []byte {
return testBaseAddr
}

// IsClosestTo needed to implement PubSub interface
func (tps *testPubSub) IsClosestTo(addr []byte) bool {
// IsClosestTo needed to implement TopicFilter interface
func (tps *testTopicFilter) IsClosestTo(addr []byte) bool {
return tps.isClosestTo(addr)
}

// loopback implements PubSub as a central subscription engine,
// loopback implements TopicFilter as a central subscription engine,
// ie a msg sent is received by all handlers registered for the topic
type loopBack struct {
handlers map[string][]func(msg []byte, p *p2p.Peer) error
Expand Down
6 changes: 3 additions & 3 deletions pushsync/simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Servic
r := retrieval.New(kad, netStore, addr, nil)
netStore.RemoteGet = r.RequestFromPeers

pubSub := pss.NewPubSub(ps, 1*time.Second)
topicFilter := pss.NewTopicFilter(ps, 1*time.Second)
// setup pusher
p := NewPusher(lstore, pubSub, tags)
p := NewPusher(lstore, topicFilter, tags)
bucket.Store(bucketKeyPushSyncer, p)

// setup storer
s := NewStorer(netStore, pubSub)
s := NewStorer(netStore, topicFilter)

cleanup := func() {
p.Close()
Expand Down
10 changes: 5 additions & 5 deletions pushsync/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type Store interface {

// Storer is the object used by the push-sync server side protocol
type Storer struct {
store Store // store to put chunks in, and retrieve them from
ps PubSub // pubsub interface to receive chunks and send receipts
deregister func() // deregister the registered handler when Storer is closed
logger log.Logger // custom logger
store Store // store to put chunks in, and retrieve them from
ps TopicFilter // TopicFilter interface to receive chunks and send receipts
deregister func() // deregister the registered handler when Storer is closed
logger log.Logger // custom logger
}

// NewStorer constructs a Storer
Expand All @@ -50,7 +50,7 @@ type Storer struct {
// - the chunks are stored and synced to their nearest neighbours and
// - a statement of custody receipt is sent as a response to the originator
// it sets a cancel function that deregisters the handler
func NewStorer(store Store, ps PubSub) *Storer {
func NewStorer(store Store, ps TopicFilter) *Storer {
s := &Storer{
store: store,
ps: ps,
Expand Down
6 changes: 3 additions & 3 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e

if config.PushSyncEnabled {
// expire time for push-sync messages should be lower than regular chat-like messages to avoid network flooding
pubsub := pss.NewPubSub(self.ps, 20*time.Second)
self.pushSync = pushsync.NewPusher(localStore, pubsub, self.tags)
self.storer = pushsync.NewStorer(self.netStore, pubsub)
topicFilter := pss.NewTopicFilter(self.ps, 20*time.Second)
self.pushSync = pushsync.NewPusher(localStore, topicFilter, self.tags)
self.storer = pushsync.NewStorer(self.netStore, topicFilter)
}

self.api = api.NewAPI(self.fileStore, self.dns, self.rns, feedsHandler, self.privateKey, self.tags)
Expand Down