Skip to content

Commit 5842632

Browse files
committed
prefer atomic values to atomic operations
not sure why there's no atomic.NewUint32, so leave baseConnID alone also update some random code with changes since 1.20 is now required don't use mrand.Intn directly to avoid contention on global rng
1 parent 447a516 commit 5842632

File tree

4 files changed

+10
-17
lines changed

4 files changed

+10
-17
lines changed

canal/canal.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type Canal struct {
5151
includeTableRegex []*regexp.Regexp
5252
excludeTableRegex []*regexp.Regexp
5353

54-
delay *uint32
54+
delay atomic.Uint32
5555

5656
ctx context.Context
5757
cancel context.CancelFunc
@@ -85,8 +85,6 @@ func NewCanal(cfg *Config) (*Canal, error) {
8585
}
8686
c.master = &masterInfo{logger: c.cfg.Logger}
8787

88-
c.delay = new(uint32)
89-
9088
var err error
9189

9290
if err = c.prepareDumper(); err != nil {
@@ -195,7 +193,7 @@ func (c *Canal) prepareDumper() error {
195193
}
196194

197195
func (c *Canal) GetDelay() uint32 {
198-
return atomic.LoadUint32(c.delay)
196+
return c.delay.Load()
199197
}
200198

201199
// Run will first try to dump all data from MySQL master `mysqldump`,

canal/sync.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package canal
22

33
import (
44
"log/slog"
5-
"sync/atomic"
65
"time"
76

87
"github.com/go-mysql-org/go-mysql/mysql"
@@ -259,7 +258,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
259258
if now >= ev.Header.Timestamp {
260259
newDelay = now - ev.Header.Timestamp
261260
}
262-
atomic.StoreUint32(c.delay, newDelay)
261+
c.delay.Store(newDelay)
263262
}
264263

265264
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {

mysql/util.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"runtime"
1717
"strconv"
1818
"strings"
19-
"time"
2019

2120
"filippo.io/edwards25519"
2221
"github.com/go-mysql-org/go-mysql/utils"
@@ -171,13 +170,10 @@ func AppendLengthEncodedInteger(b []byte, n uint64) []byte {
171170

172171
func RandomBuf(size int) []byte {
173172
buf := make([]byte, size)
174-
// When this project supports golang 1.20 as a minimum, then this mrand.New(...)
175-
// line can be eliminated and the random number can be generated by simply
176-
// calling mrand.Intn()
177-
random := mrand.New(mrand.NewSource(time.Now().UTC().UnixNano()))
178-
min, max := 30, 127
173+
random := mrand.New(mrand.NewSource(int64(mrand.Uint64())))
174+
min, max := int32(30), int32(127)
179175
for i := 0; i < size; i++ {
180-
buf[i] = byte(min + random.Intn(max-min))
176+
buf[i] = byte(min + random.Int31n(max-min))
181177
}
182178
return buf
183179
}

replication/parser.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type BinlogParser struct {
3333
timestampStringLocation *time.Location
3434

3535
// used to start/stop processing
36-
stopProcessing uint32
36+
stopProcessing atomic.Bool
3737

3838
useDecimal bool
3939
useFloatWithTrailingZero bool
@@ -54,11 +54,11 @@ func NewBinlogParser() *BinlogParser {
5454
}
5555

5656
func (p *BinlogParser) Stop() {
57-
atomic.StoreUint32(&p.stopProcessing, 1)
57+
p.stopProcessing.Store(true)
5858
}
5959

6060
func (p *BinlogParser) Resume() {
61-
atomic.StoreUint32(&p.stopProcessing, 0)
61+
p.stopProcessing.Store(false)
6262
}
6363

6464
func (p *BinlogParser) Reset() {
@@ -166,7 +166,7 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool,
166166
}
167167

168168
func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
169-
for atomic.LoadUint32(&p.stopProcessing) != 1 {
169+
for !p.stopProcessing.Load() {
170170
done, err := p.parseSingleEvent(r, onEvent)
171171
if err != nil {
172172
if err == errMissingTableMapEvent {

0 commit comments

Comments
 (0)