Skip to content

Commit f6f5e59

Browse files
committed
add topic filter to handle incoming messages for specific topic
1 parent 827d98a commit f6f5e59

File tree

9 files changed

+542
-149
lines changed

9 files changed

+542
-149
lines changed

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"type": "go",
1010
"request": "launch",
1111
"mode": "auto",
12-
"program": "${workspaceFolder}/cmd/simple/main.go",
12+
"program": "${workspaceFolder}/examples/simple/main.go",
1313
"env": {},
1414
"args": []
1515
}

client.go

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type Client interface {
3636
// the client wait group is done.
3737
// The context used grpc stream to signal context done.
3838
DisconnectContext(ctx context.Context) error
39+
// TopicFilter is used to receive filtered messages on specififc topic.
40+
TopicFilter(subTopic string) (*TopicFilter, error)
3941
// Publish will publish a message with the specified DeliveryMode and content
4042
// to the specified topic.
4143
Publish(topic string, payload []byte, pubOpts ...PubOptions) Result
@@ -54,6 +56,7 @@ type Client interface {
5456
// received.
5557
Unsubscribe(topics ...string) Result
5658
}
59+
5760
type client struct {
5861
opts *options
5962
context context.Context // context for the client
@@ -66,7 +69,7 @@ type client struct {
6669
send chan *MessageAndResult
6770
recv chan lp.MessagePack
6871
pub chan *utp.Publish
69-
callbacks map[uint64]MessageHandler
72+
notifier *notifier
7073

7174
// Time when the keepalive session was last refreshed.
7275
lastTouched atomic.Value
@@ -88,11 +91,11 @@ func NewClient(target, clientID string, opts ...Options) (Client, error) {
8891
opts: new(options),
8992
context: ctx,
9093
cancel: cancel,
91-
messageIds: messageIds{index: make(map[MID]Result)},
94+
messageIds: messageIds{index: make(map[MID]Result), resumedIds: make(map[MID]struct{})},
9295
send: make(chan *MessageAndResult, 1), // buffered
9396
recv: make(chan lp.MessagePack),
9497
pub: make(chan *utp.Publish),
95-
callbacks: make(map[uint64]MessageHandler),
98+
notifier: newNotifier(100), // Notifier with Queue size 100
9699
// close
97100
closeC: make(chan struct{}),
98101
}
@@ -103,7 +106,6 @@ func NewClient(target, clientID string, opts ...Options) (Client, error) {
103106
// set default options
104107
c.opts.addServer(target)
105108
c.opts.setClientID(clientID)
106-
c.callbacks[0] = c.opts.defaultMessageHandler
107109

108110
// Open database connection
109111
path := c.opts.storePath
@@ -150,6 +152,7 @@ func (c *client) close() error {
150152
// close(c.ack)
151153
close(c.recv)
152154
close(c.pub)
155+
c.notifier.close()
153156
store.Close()
154157
if c.cancel != nil {
155158
c.cancel()
@@ -315,6 +318,21 @@ func (c *client) serverDisconnect(err error) {
315318
}
316319
}
317320

321+
func (c *client) TopicFilter(subscriptionTopic string) (*TopicFilter, error) {
322+
topic := new(topic)
323+
topic.parse(subscriptionTopic)
324+
if err := topic.validate(validateMinLength,
325+
validateMaxLenth,
326+
validateMaxDepth,
327+
validateTopicParts); err != nil {
328+
return nil, err
329+
}
330+
t := &TopicFilter{subscriptionTopic: topic, updates: make(chan []PubMessage)}
331+
c.notifier.addFilter(t.filter)
332+
333+
return t, nil
334+
}
335+
318336
// Publish will publish a message with the specified DeliveryMode and content
319337
// to the specified topic.
320338
func (c *client) Publish(pubTopic string, payload []byte, pubOpts ...PubOptions) Result {
@@ -340,6 +358,13 @@ func (c *client) Publish(pubTopic string, payload []byte, pubOpts ...PubOptions)
340358
return r
341359
}
342360

361+
if err := t.validate(validateMinLength,
362+
validateMaxLenth,
363+
validateMaxDepth); err != nil {
364+
r.setError(err)
365+
return r
366+
}
367+
343368
if dMode, ok := t.getOption("delivery_mode"); ok {
344369
val, err := strconv.ParseInt(dMode, 10, 64)
345370
if err == nil {
@@ -419,6 +444,15 @@ func (c *client) Relay(topics []string, relOpts ...RelOptions) Result {
419444
return r
420445
}
421446

447+
if err := t.validate(validateMinLength,
448+
validateMaxLenth,
449+
validateMaxDepth,
450+
validateMultiWildcard,
451+
validateTopicParts); err != nil {
452+
r.setError(err)
453+
return r
454+
}
455+
422456
if dur, ok := t.getOption("last"); ok {
423457
last = dur
424458
}
@@ -475,6 +509,15 @@ func (c *client) Subscribe(subTopic string, subOpts ...SubOptions) Result {
475509
return r
476510
}
477511

512+
if err := t.validate(validateMinLength,
513+
validateMaxLenth,
514+
validateMaxDepth,
515+
validateMultiWildcard,
516+
validateTopicParts); err != nil {
517+
r.setError(err)
518+
return r
519+
}
520+
478521
if dMode, ok := t.getOption("delivery_mode"); ok {
479522
val, err := strconv.ParseInt(dMode, 10, 64)
480523
if err == nil {
@@ -540,6 +583,15 @@ func (c *client) SubscribeMultiple(topics []string, subOpts ...SubOptions) Resul
540583
return r
541584
}
542585

586+
if err := t.validate(validateMinLength,
587+
validateMaxLenth,
588+
validateMaxDepth,
589+
validateMultiWildcard,
590+
validateTopicParts); err != nil {
591+
r.setError(err)
592+
return r
593+
}
594+
543595
if dMode, ok := t.getOption("delivery_mode"); ok {
544596
val, err := strconv.ParseInt(dMode, 10, 64)
545597
if err == nil {
@@ -725,7 +777,7 @@ func (c *client) isClosed() bool {
725777
// Check read ok status.
726778
func (c *client) ok() error {
727779
if c.isClosed() {
728-
return errors.New("client connection is closed.")
780+
return errors.New("client connection is closed")
729781
}
730782
return nil
731783
}

0 commit comments

Comments
 (0)