Skip to content

Commit 827d98a

Browse files
committed
add relay multiple topics to retrieve messages persisted on server
1 parent 249ae3a commit 827d98a

File tree

4 files changed

+301
-28
lines changed

4 files changed

+301
-28
lines changed

client.go

Lines changed: 171 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"log"
88
"net"
9+
"strconv"
910
"sync"
1011
"sync/atomic"
1112
"time"
@@ -38,12 +39,16 @@ type Client interface {
3839
// Publish will publish a message with the specified DeliveryMode and content
3940
// to the specified topic.
4041
Publish(topic string, payload []byte, pubOpts ...PubOptions) Result
42+
// Relay sends a request to relay messages for one or more topics those are persisted on the server.
43+
// Provide a MessageHandler to be executed when a message is published on the topic provided,
44+
// or nil for the default handler.
45+
Relay(topics []string, relOpts ...RelOptions) Result
4146
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
4247
// a message is published on the topic provided, or nil for the default handler.
43-
// Relay sends a relay request to server. Provide a MessageHandler to be executed when
44-
// a message is published on the topic provided, or nil for the default handler.
45-
Relay(topic string, relOpts ...RelOptions) Result
4648
Subscribe(topic string, subOpts ...SubOptions) Result
49+
// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to be executed when
50+
// a message is published on the topic provided, or nil for the default handler.
51+
SubscribeMultiple(subs []string, subOpts ...SubOptions) Result
4752
// Unsubscribe will end the subscription from each of the topics provided.
4853
// Messages published to those topics from other clients will no longer be
4954
// received.
@@ -312,7 +317,7 @@ func (c *client) serverDisconnect(err error) {
312317

313318
// Publish will publish a message with the specified DeliveryMode and content
314319
// to the specified topic.
315-
func (c *client) Publish(topic string, payload []byte, pubOpts ...PubOptions) Result {
320+
func (c *client) Publish(pubTopic string, payload []byte, pubOpts ...PubOptions) Result {
316321
r := &PublishResult{result: result{complete: make(chan struct{})}}
317322
if err := c.ok(); err != nil {
318323
r.setError(errors.New("error not connected"))
@@ -324,22 +329,52 @@ func (c *client) Publish(topic string, payload []byte, pubOpts ...PubOptions) Re
324329
opt.set(opts)
325330
}
326331

332+
deliveryMode := opts.deliveryMode
333+
delay := opts.delay
334+
ttl := opts.ttl
335+
t := new(topic)
336+
337+
// parse the topic.
338+
if ok := t.parse(pubTopic); !ok {
339+
r.setError(errors.New("publish: unable to parse topic"))
340+
return r
341+
}
342+
343+
if dMode, ok := t.getOption("delivery_mode"); ok {
344+
val, err := strconv.ParseInt(dMode, 10, 64)
345+
if err == nil {
346+
deliveryMode = uint8(val)
347+
}
348+
}
349+
350+
if d, ok := t.getOption("delay"); ok {
351+
val, err := strconv.ParseInt(d, 10, 64)
352+
if err == nil {
353+
delay = int32(val)
354+
}
355+
}
356+
357+
if dur, ok := t.getOption("ttl"); ok {
358+
ttl = dur
359+
}
360+
327361
pubMsg := &utp.PublishMessage{
328-
Topic: topic,
362+
Topic: t.topic,
329363
Payload: payload,
330-
Ttl: opts.ttl,
364+
Ttl: ttl,
331365
}
332366

333367
// Check batch or delay delivery.
334-
if opts.deliveryMode == 2 || opts.delay > 0 {
335-
// timeID := c.TimeID(opts.delay)
336-
return c.batchManager.add(opts.delay, pubMsg)
368+
if deliveryMode == 2 || delay > 0 {
369+
return c.batchManager.add(delay, pubMsg)
337370
}
338-
pub := &utp.Publish{DeliveryMode: opts.deliveryMode, Messages: []*utp.PublishMessage{pubMsg}}
371+
pub := &utp.Publish{DeliveryMode: deliveryMode, Messages: []*utp.PublishMessage{pubMsg}}
372+
339373
if pub.MessageID == 0 {
340374
mID := c.nextID(r)
341375
pub.MessageID = c.outboundID(mID)
342376
}
377+
343378
publishWaitTimeout := c.opts.writeTimeout
344379
if publishWaitTimeout == 0 {
345380
publishWaitTimeout = time.Second * 30
@@ -354,35 +389,56 @@ func (c *client) Publish(topic string, payload []byte, pubOpts ...PubOptions) Re
354389
r.setError(errors.New("publish timeout error occurred"))
355390
return r
356391
}
392+
357393
return r
358394
}
359395

360396
// Relay send a new relay request. Provide a MessageHandler to be executed when
361397
// a message is published on the topic provided.
362-
func (c *client) Relay(topic string, relOpts ...RelOptions) Result {
398+
func (c *client) Relay(topics []string, relOpts ...RelOptions) Result {
363399
r := &RelayResult{result: result{complete: make(chan struct{})}}
364400
if err := c.ok(); err != nil {
365401
r.setError(errors.New("error not connected"))
366402
return r
367403
}
404+
368405
opts := new(relOptions)
369406
for _, opt := range relOpts {
370407
opt.set(opts)
371408
}
372409

373410
relMsg := &utp.Relay{}
374-
relMsg.RelayRequests = append(relMsg.RelayRequests, &utp.RelayRequest{Topic: topic, Last: opts.last})
411+
412+
for _, relTopic := range topics {
413+
last := opts.last
414+
t := new(topic)
415+
416+
// parse the topic.
417+
if ok := t.parse(relTopic); !ok {
418+
r.setError(errors.New("relay: unable to parse topic"))
419+
return r
420+
}
421+
422+
if dur, ok := t.getOption("last"); ok {
423+
last = dur
424+
}
425+
426+
relMsg.RelayRequests = append(relMsg.RelayRequests, &utp.RelayRequest{Topic: t.topic, Last: last})
427+
}
375428

376429
if relMsg.MessageID == 0 {
377430
mID := c.nextID(r)
378431
relMsg.MessageID = c.outboundID(mID)
379432
}
433+
380434
relayWaitTimeout := c.opts.writeTimeout
381435
if relayWaitTimeout == 0 {
382436
relayWaitTimeout = time.Second * 30
383437
}
438+
384439
// persist outbound
385440
c.storeOutbound(relMsg)
441+
386442
select {
387443
case c.send <- &MessageAndResult{m: relMsg, r: r}:
388444
case <-time.After(relayWaitTimeout):
@@ -395,30 +451,125 @@ func (c *client) Relay(topic string, relOpts ...RelOptions) Result {
395451

396452
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
397453
// a message is published on the topic provided.
398-
func (c *client) Subscribe(topic string, subOpts ...SubOptions) Result {
454+
func (c *client) Subscribe(subTopic string, subOpts ...SubOptions) Result {
399455
r := &SubscribeResult{result: result{complete: make(chan struct{})}}
400456
if err := c.ok(); err != nil {
401457
r.setError(errors.New("error not connected"))
402458
return r
403459
}
460+
404461
opts := new(subOptions)
405462
for _, opt := range subOpts {
406463
opt.set(opts)
407464
}
408465

409466
subMsg := &utp.Subscribe{}
410-
subMsg.Subscriptions = append(subMsg.Subscriptions, &utp.Subscription{DeliveryMode: opts.deliveryMode, Delay: opts.delay, Topic: topic})
467+
468+
deliveryMode := opts.deliveryMode
469+
delay := opts.delay
470+
t := new(topic)
471+
472+
// parse the topic.
473+
if ok := t.parse(subTopic); !ok {
474+
r.setError(errors.New("subscribe: unable to parse topic"))
475+
return r
476+
}
477+
478+
if dMode, ok := t.getOption("delivery_mode"); ok {
479+
val, err := strconv.ParseInt(dMode, 10, 64)
480+
if err == nil {
481+
deliveryMode = uint8(val)
482+
}
483+
}
484+
485+
if d, ok := t.getOption("delay"); ok {
486+
val, err := strconv.ParseInt(d, 10, 64)
487+
if err == nil {
488+
delay = int32(val)
489+
}
490+
}
491+
492+
subMsg.Subscriptions = append(subMsg.Subscriptions, &utp.Subscription{DeliveryMode: deliveryMode, Delay: delay, Topic: t.topic})
411493

412494
if subMsg.MessageID == 0 {
413495
mID := c.nextID(r)
414496
subMsg.MessageID = c.outboundID(mID)
415497
}
498+
416499
subscribeWaitTimeout := c.opts.writeTimeout
417500
if subscribeWaitTimeout == 0 {
418501
subscribeWaitTimeout = time.Second * 30
419502
}
503+
420504
// persist outbound
421505
c.storeOutbound(subMsg)
506+
507+
select {
508+
case c.send <- &MessageAndResult{m: subMsg, r: r}:
509+
case <-time.After(subscribeWaitTimeout):
510+
r.setError(errors.New("subscribe timeout error occurred"))
511+
return r
512+
}
513+
514+
return r
515+
}
516+
517+
// SubscribeMultiple starts a new subscription. Provide a MessageHandler to be executed when
518+
// a message is published on the topic provided.
519+
func (c *client) SubscribeMultiple(topics []string, subOpts ...SubOptions) Result {
520+
r := &SubscribeResult{result: result{complete: make(chan struct{})}}
521+
if err := c.ok(); err != nil {
522+
r.setError(errors.New("error not connected"))
523+
return r
524+
}
525+
526+
opts := new(subOptions)
527+
for _, opt := range subOpts {
528+
opt.set(opts)
529+
}
530+
531+
subMsg := &utp.Subscribe{}
532+
for _, subTopic := range topics {
533+
deliveryMode := opts.deliveryMode
534+
delay := opts.delay
535+
t := new(topic)
536+
537+
// parse the topic.
538+
if ok := t.parse(subTopic); !ok {
539+
r.setError(errors.New("SubscribeMultiple: unable to parse topic"))
540+
return r
541+
}
542+
543+
if dMode, ok := t.getOption("delivery_mode"); ok {
544+
val, err := strconv.ParseInt(dMode, 10, 64)
545+
if err == nil {
546+
deliveryMode = uint8(val)
547+
}
548+
}
549+
550+
if d, ok := t.getOption("delay"); ok {
551+
val, err := strconv.ParseInt(d, 10, 64)
552+
if err == nil {
553+
delay = int32(val)
554+
}
555+
}
556+
557+
subMsg.Subscriptions = append(subMsg.Subscriptions, &utp.Subscription{DeliveryMode: deliveryMode, Delay: delay, Topic: t.topic})
558+
}
559+
560+
if subMsg.MessageID == 0 {
561+
mID := c.nextID(r)
562+
subMsg.MessageID = c.outboundID(mID)
563+
}
564+
565+
subscribeWaitTimeout := c.opts.writeTimeout
566+
if subscribeWaitTimeout == 0 {
567+
subscribeWaitTimeout = time.Second * 30
568+
}
569+
570+
// persist outbound
571+
c.storeOutbound(subMsg)
572+
422573
select {
423574
case c.send <- &MessageAndResult{m: subMsg, r: r}:
424575
case <-time.After(subscribeWaitTimeout):
@@ -434,29 +585,35 @@ func (c *client) Subscribe(topic string, subOpts ...SubOptions) Result {
434585
// received.
435586
func (c *client) Unsubscribe(topics ...string) Result {
436587
r := &SubscribeResult{result: result{complete: make(chan struct{})}}
588+
437589
unsubMsg := &utp.Unsubscribe{}
438590
var subs []*utp.Subscription
439591
for _, topic := range topics {
440592
sub := &utp.Subscription{Topic: topic}
441593
subs = append(subs, sub)
442594
}
443595
unsubMsg.Subscriptions = subs
596+
444597
if unsubMsg.MessageID == 0 {
445598
mID := c.nextID(r)
446599
unsubMsg.MessageID = c.outboundID(mID)
447600
}
601+
448602
unsubscribeWaitTimeout := c.opts.writeTimeout
449603
if unsubscribeWaitTimeout == 0 {
450604
unsubscribeWaitTimeout = time.Second * 30
451605
}
606+
452607
// persist outbound
453608
c.storeOutbound(unsubMsg)
609+
454610
select {
455611
case c.send <- &MessageAndResult{m: unsubMsg, r: r}:
456612
case <-time.After(unsubscribeWaitTimeout):
457613
r.setError(errors.New("unsubscribe timeout error occurred"))
458614
return r
459615
}
616+
460617
return r
461618
}
462619

cmd/sample/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func main() {
236236
if err != nil {
237237
log.Fatalf("err: %s", err)
238238
}
239-
r := client.Relay(*topic, unitdb.WithLast("1m"))
239+
r := client.Relay([]string{*topic}, unitdb.WithLast("1m"))
240240
if _, err := r.Get(ctx, 1*time.Second); err != nil {
241241
fmt.Println(err)
242242
os.Exit(1)

cmd/simple/main.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,28 @@ import (
77
"os"
88
"time"
99

10-
unitdb "github.com/unit-io/unitdb-go"
10+
udb "github.com/unit-io/unitdb-go"
1111
)
1212

13-
var f unitdb.MessageHandler = func(client unitdb.Client, pubMsg unitdb.PubMessage) {
13+
var f udb.MessageHandler = func(client udb.Client, pubMsg udb.PubMessage) {
1414
for _, msg := range pubMsg.Messages() {
1515
fmt.Printf("TOPIC: %s\n", msg.Topic)
1616
fmt.Printf("MSG: %s\n", msg.Payload)
1717
}
1818
}
1919

2020
func main() {
21-
client, err := unitdb.NewClient(
21+
client, err := udb.NewClient(
2222
//"tcp://localhost:6060",
2323
// "ws://localhost:6080",
2424
"grpc://localhost:6080",
2525
"UCBFDONCNJLaKMCAIeJBaOVfbAXUZHNPLDKKLDKLHZHKYIZLCDPQ",
26-
unitdb.WithCleanSession(),
27-
unitdb.WithInsecure(),
28-
unitdb.WithBatchDuration(1*time.Second),
29-
unitdb.WithKeepAlive(2*time.Second),
30-
unitdb.WithPingTimeout(1*time.Second),
31-
unitdb.WithDefaultMessageHandler(f),
26+
udb.WithCleanSession(),
27+
udb.WithInsecure(),
28+
udb.WithBatchDuration(1*time.Second),
29+
udb.WithKeepAlive(2*time.Second),
30+
udb.WithPingTimeout(1*time.Second),
31+
udb.WithDefaultMessageHandler(f),
3232
)
3333
if err != nil {
3434
log.Fatalf("err: %s", err)
@@ -39,23 +39,23 @@ func main() {
3939
log.Fatalf("err: %s", err)
4040
}
4141

42-
var r unitdb.Result
42+
var r udb.Result
4343

44-
r = client.Relay("ADcABeFRBDJKe/groups.private.673651407196578720.message", unitdb.WithLast("10m"))
44+
r = client.Relay([]string{"ADcABeFRBDJKe/groups.private.673651407196578720.message"}, udb.WithLast("10m"))
4545
if _, err := r.Get(ctx, 1*time.Second); err != nil {
4646
fmt.Println(err)
4747
os.Exit(1)
4848
}
4949

50-
r = client.Subscribe("ADcABeFRBDJKe/groups.private.673651407196578720.message", unitdb.WithSubDeliveryMode(0))
50+
r = client.Subscribe("ADcABeFRBDJKe/groups.private.673651407196578720.message", udb.WithSubDeliveryMode(0))
5151
if _, err := r.Get(ctx, 1*time.Second); err != nil {
5252
fmt.Println(err)
5353
os.Exit(1)
5454
}
5555

5656
for i := 0; i < 2; i++ {
5757
msg := fmt.Sprintf("Hi #%d time!", i)
58-
r := client.Publish("ADcABeFRBDJKe/groups.private.673651407196578720.message", []byte(msg), unitdb.WithTTL("1m"), unitdb.WithPubDeliveryMode(0))
58+
r := client.Publish("ADcABeFRBDJKe/groups.private.673651407196578720.message", []byte(msg), udb.WithTTL("1m"), udb.WithPubDeliveryMode(0))
5959
if _, err := r.Get(ctx, 1*time.Second); err != nil {
6060
log.Fatalf("err: %s", err)
6161
}

0 commit comments

Comments
 (0)