-
Notifications
You must be signed in to change notification settings - Fork 11
/
heartbeat.go
129 lines (115 loc) · 2.71 KB
/
heartbeat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Copyright (c) 2019 Zenichi Amano
*
* This file is part of go-push-receiver, which is MIT licensed.
* See http://opensource.org/licenses/MIT
*/
package pushreceiver
import (
"context"
"time"
)
// Heartbeat sends signal for connection keep alive.
type Heartbeat struct {
clientInterval time.Duration
serverInterval time.Duration
deadmanTimeout time.Duration
adaptive bool
}
// HeartbeatOption type
type HeartbeatOption func(*Heartbeat)
// WithClientInterval is heartbeat client interval setter
func WithClientInterval(interval time.Duration) HeartbeatOption {
return func(heartbeat *Heartbeat) {
heartbeat.clientInterval = interval
}
}
// WithServerInterval is heartbeat server interval setter
func WithServerInterval(interval time.Duration) HeartbeatOption {
return func(heartbeat *Heartbeat) {
// minimum 1 minute
if interval > 1*time.Minute {
heartbeat.serverInterval = interval
} else {
heartbeat.serverInterval = 1 * time.Minute
}
}
}
// WithDeadmanTimeout is heartbeat deadman timeout setter
func WithDeadmanTimeout(timeout time.Duration) HeartbeatOption {
return func(heartbeat *Heartbeat) {
heartbeat.deadmanTimeout = timeout
}
}
// WithAdaptive is heartbeat adaptive setter
func WithAdaptive(enabled bool) HeartbeatOption {
return func(heartbeat *Heartbeat) {
heartbeat.adaptive = enabled
}
}
func newHeartbeat(options ...HeartbeatOption) *Heartbeat {
h := &Heartbeat{}
for _, option := range options {
option(h)
}
return h
}
func (h *Heartbeat) start(ctx context.Context, mcs *mcs) {
if h.deadmanTimeout <= 0 {
if h.clientInterval < h.serverInterval {
h.deadmanTimeout = durationDeadmanTimeout(h.serverInterval)
} else {
h.deadmanTimeout = durationDeadmanTimeout(h.clientInterval)
}
}
var (
pingDeadman *time.Timer
pingDeadmanC <-chan time.Time
)
if h.deadmanTimeout > 0 {
pingDeadman = time.NewTimer(h.deadmanTimeout)
pingDeadmanC = pingDeadman.C
}
defer func() {
if pingDeadman != nil {
pingDeadman.Stop()
}
}()
var (
pingTicker *time.Ticker
pingTickerC <-chan time.Time
)
if h.clientInterval > 0 {
pingTicker = time.NewTicker(h.clientInterval)
pingTickerC = pingTicker.C
}
defer func() {
if pingTicker != nil {
pingTicker.Stop()
}
}()
for {
select {
case <-ctx.Done():
return
case <-mcs.heartbeatAck:
if pingDeadman != nil {
pingDeadman.Reset(h.deadmanTimeout)
}
case <-pingDeadmanC:
// force disconnect
mcs.log.Print("force disconnect by heartbeat")
mcs.disconnect()
return
case <-pingTickerC:
// send heartbeat to FCM
err := mcs.SendHeartbeatPingPacket()
if err != nil {
return
}
}
}
}
func durationDeadmanTimeout(interval time.Duration) time.Duration {
return interval * 4
}