diff --git a/go.mod b/go.mod index a64125cf9a..53a0a5d33c 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/googleapis/gnostic v0.0.0-20190624222214-25d8b0b66985 // indirect github.com/gorilla/mux v1.7.3 // indirect github.com/hashicorp/golang-lru v0.5.3 + github.com/influxdata/influxdb v0.0.0-20180221223340-01288bdb0883 github.com/json-iterator/go v1.1.7 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kylelemons/godebug v1.1.0 // indirect diff --git a/pss/pubsub.go b/pss/topicfilter.go similarity index 72% rename from pss/pubsub.go rename to pss/topicfilter.go index 44b9aa02a1..9ddba0c472 100644 --- a/pss/pubsub.go +++ b/pss/topicfilter.go @@ -25,22 +25,22 @@ import ( "github.com/ethersphere/swarm/pss/message" ) -// PubSub implements the pushsync.PubSub interface using pss -type PubSub struct { +// TopicFilter implements the pushsync.TopicFilter interface using pss +type TopicFilter struct { pss *Pss - messageTTL time.Duration // expire duration of a pubsub message. Depends on the use case. + messageTTL time.Duration // expire duration of a topic filter message. Depends on the use case. } -// NewPubSub creates a new PubSub -func NewPubSub(p *Pss, messageTTL time.Duration) *PubSub { - return &PubSub{ +// NewTopicFilter creates a new TopicFilter +func NewTopicFilter(p *Pss, messageTTL time.Duration) *TopicFilter { + return &TopicFilter{ pss: p, messageTTL: messageTTL, } } // BaseAddr returns Kademlia base address -func (p *PubSub) BaseAddr() []byte { +func (p *TopicFilter) BaseAddr() []byte { return p.pss.BaseAddr() } @@ -51,12 +51,12 @@ func isPssPeer(bp *network.BzzPeer) bool { // IsClosestTo returns true is self is the closest known node to addr // as uniquely defined by the MSB XOR distance // among pss capable peers -func (p *PubSub) IsClosestTo(addr []byte) bool { +func (p *TopicFilter) IsClosestTo(addr []byte) bool { return p.pss.IsClosestTo(addr, isPssPeer) } // Register registers a handler -func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() { +func (p *TopicFilter) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() { f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error { return handler(msg, peer) } @@ -69,8 +69,8 @@ func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p } // Send sends a message using pss SendRaw -func (p *PubSub) Send(to []byte, topic string, msg []byte) error { - defer metrics.GetOrRegisterResettingTimer("pss/pubsub/send", nil).UpdateSince(time.Now()) +func (p *TopicFilter) Send(to []byte, topic string, msg []byte) error { + defer metrics.GetOrRegisterResettingTimer("pss/topicfilter/send", nil).UpdateSince(time.Now()) pt := message.NewTopic([]byte(topic)) return p.pss.SendRaw(PssAddress(to), pt, msg, p.messageTTL) } diff --git a/pushsync/protocol.go b/pushsync/protocol.go index 1a1b700948..4bb8ba92f6 100644 --- a/pushsync/protocol.go +++ b/pushsync/protocol.go @@ -30,8 +30,8 @@ const ( pssReceiptTopic = "PUSHSYNC_RECEIPTS" // pss topic for statement of custody receipts ) -// PubSub is a Postal Service interface needed to send/receive chunks and receipts for push syncing -type PubSub interface { +// TopicFilter is a Postal Service interface needed to send/receive chunks and receipts for push syncing +type TopicFilter interface { Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() Send(to []byte, topic string, msg []byte) error BaseAddr() []byte diff --git a/pushsync/protocol_test.go b/pushsync/protocol_test.go index b51c23ffb6..f1059b0723 100644 --- a/pushsync/protocol_test.go +++ b/pushsync/protocol_test.go @@ -29,7 +29,7 @@ import ( ) // TestProtocol tests the push sync protocol -// push syncer node communicate with storers via mock loopback PubSub +// push syncer node communicate with storers via mock loopback TopicFilter func TestProtocol(t *testing.T) { timeout := 10 * time.Second chunkCnt := 1024 @@ -51,7 +51,7 @@ func TestProtocol(t *testing.T) { log.Debug("closest node?", "n", n, "n%storerCnt", n%storerCnt, "storer", j) return n%storerCnt == j } - storers[j] = NewStorer(&testStore{store}, &testPubSub{lb, isClosestTo}) + storers[j] = NewStorer(&testStore{store}, &testTopicFilter{lb, isClosestTo}) } tags, tagIDs := setupTags(chunkCnt, tagCnt) @@ -60,7 +60,7 @@ func TestProtocol(t *testing.T) { // isClosestTo function mocked isClosestTo := func([]byte) bool { return false } // start push syncing in a go routine - p := NewPusher(tp, &testPubSub{lb, isClosestTo}, tags) + p := NewPusher(tp, &testTopicFilter{lb, isClosestTo}, tags) defer p.Close() synced := make(map[int]int) diff --git a/pushsync/pusher.go b/pushsync/pusher.go index fadfe80ec8..24c6695a66 100644 --- a/pushsync/pusher.go +++ b/pushsync/pusher.go @@ -56,7 +56,7 @@ type Pusher struct { syncedAddrs []storage.Address syncedAddrsMu sync.Mutex receipts chan []byte // channel to receive receipts - ps PubSub // PubSub interface to send chunks and receive receipts + ps TopicFilter // TopicFilter interface to send chunks and receive receipts logger log.Logger // custom logger } @@ -73,9 +73,9 @@ type pushedItem struct { // NewPusher constructs a Pusher and starts up the push sync protocol // takes // - a DB interface to subscribe to push sync index to allow iterating over recently stored chunks -// - a pubsub interface to send chunks and receive statements of custody +// - a TopicFilter interface to send chunks and receive statements of custody // - tags that hold the tags -func NewPusher(store DB, ps PubSub, tags *chunk.Tags) *Pusher { +func NewPusher(store DB, ps TopicFilter, tags *chunk.Tags) *Pusher { p := &Pusher{ store: store, tags: tags, @@ -296,7 +296,7 @@ func (p *Pusher) pushReceipt(addr []byte) { } // sendChunkMsg sends chunks to their destination -// using the PubSub interface Send method (e.g., pss neighbourhood addressing) +// using the TopicFilter interface Send method (e.g., pss neighbourhood addressing) func (p *Pusher) sendChunkMsg(ch chunk.Chunk) error { rlpTimer := time.Now() diff --git a/pushsync/pusher_test.go b/pushsync/pusher_test.go index 17fb91a951..7e4fa9c1f5 100644 --- a/pushsync/pusher_test.go +++ b/pushsync/pusher_test.go @@ -95,7 +95,7 @@ func TestPusher(t *testing.T) { // construct the mock push sync index iterator tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent) // start push syncing in a go routine - p := NewPusher(tp, &testPubSub{lb, func([]byte) bool { return false }}, tags) + p := NewPusher(tp, &testTopicFilter{lb, func([]byte) bool { return false }}, tags) defer p.Close() // collect synced chunks until all chunks synced // wait on errc for errors on any thread @@ -121,25 +121,25 @@ func TestPusher(t *testing.T) { } -type testPubSub struct { +type testTopicFilter struct { *loopBack isClosestTo func([]byte) bool } var testBaseAddr = make([]byte, 32) -// BaseAddr needed to implement PubSub interface -// in the testPubSub, this address has no relevant and is given only for logging -func (tps *testPubSub) BaseAddr() []byte { +// BaseAddr needed to implement TopicFilter interface +// in the testTopicFilter, this address has no relevant and is given only for logging +func (tps *testTopicFilter) BaseAddr() []byte { return testBaseAddr } -// IsClosestTo needed to implement PubSub interface -func (tps *testPubSub) IsClosestTo(addr []byte) bool { +// IsClosestTo needed to implement TopicFilter interface +func (tps *testTopicFilter) IsClosestTo(addr []byte) bool { return tps.isClosestTo(addr) } -// loopback implements PubSub as a central subscription engine, +// loopback implements TopicFilter as a central subscription engine, // ie a msg sent is received by all handlers registered for the topic type loopBack struct { handlers map[string][]func(msg []byte, p *p2p.Peer) error diff --git a/pushsync/simulation_test.go b/pushsync/simulation_test.go index da4e8d1839..16db71fec6 100644 --- a/pushsync/simulation_test.go +++ b/pushsync/simulation_test.go @@ -204,13 +204,13 @@ func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Servic r := retrieval.New(kad, netStore, addr, nil) netStore.RemoteGet = r.RequestFromPeers - pubSub := pss.NewPubSub(ps, 1*time.Second) + topicFilter := pss.NewTopicFilter(ps, 1*time.Second) // setup pusher - p := NewPusher(lstore, pubSub, tags) + p := NewPusher(lstore, topicFilter, tags) bucket.Store(bucketKeyPushSyncer, p) // setup storer - s := NewStorer(netStore, pubSub) + s := NewStorer(netStore, topicFilter) cleanup := func() { p.Close() diff --git a/pushsync/storer.go b/pushsync/storer.go index 12b5344d0b..81dd9d25e1 100644 --- a/pushsync/storer.go +++ b/pushsync/storer.go @@ -37,10 +37,10 @@ type Store interface { // Storer is the object used by the push-sync server side protocol type Storer struct { - store Store // store to put chunks in, and retrieve them from - ps PubSub // pubsub interface to receive chunks and send receipts - deregister func() // deregister the registered handler when Storer is closed - logger log.Logger // custom logger + store Store // store to put chunks in, and retrieve them from + ps TopicFilter // TopicFilter interface to receive chunks and send receipts + deregister func() // deregister the registered handler when Storer is closed + logger log.Logger // custom logger } // NewStorer constructs a Storer @@ -50,7 +50,7 @@ type Storer struct { // - the chunks are stored and synced to their nearest neighbours and // - a statement of custody receipt is sent as a response to the originator // it sets a cancel function that deregisters the handler -func NewStorer(store Store, ps PubSub) *Storer { +func NewStorer(store Store, ps TopicFilter) *Storer { s := &Storer{ store: store, ps: ps, diff --git a/swarm.go b/swarm.go index ff109c6bcc..e5add4c0cf 100644 --- a/swarm.go +++ b/swarm.go @@ -271,9 +271,9 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e if config.PushSyncEnabled { // expire time for push-sync messages should be lower than regular chat-like messages to avoid network flooding - pubsub := pss.NewPubSub(self.ps, 20*time.Second) - self.pushSync = pushsync.NewPusher(localStore, pubsub, self.tags) - self.storer = pushsync.NewStorer(self.netStore, pubsub) + topicFilter := pss.NewTopicFilter(self.ps, 20*time.Second) + self.pushSync = pushsync.NewPusher(localStore, topicFilter, self.tags) + self.storer = pushsync.NewStorer(self.netStore, topicFilter) } self.api = api.NewAPI(self.fileStore, self.dns, self.rns, feedsHandler, self.privateKey, self.tags)