Skip to content

Commit

Permalink
relay/syncer: relay notifies syncer of new write to reduce sync laten…
Browse files Browse the repository at this point in the history
…cy (pingcap#2225)
  • Loading branch information
D3Hunter authored and ti-chi-bot committed Oct 27, 2021
1 parent b05cbcc commit 151cc17
Show file tree
Hide file tree
Showing 30 changed files with 653 additions and 488 deletions.
5 changes: 3 additions & 2 deletions cmd/dm-portal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"net/http"
"os"

"github.com/rakyll/statik/fs"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/portal"
_ "github.com/pingcap/dm/dm/portal/statik"
"github.com/pingcap/dm/pkg/log"
"github.com/rakyll/statik/fs"
"go.uber.org/zap"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/dm-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {
log.L().Info("", zap.Stringer("dm-syncer conf", conf))
})

sync := syncer.NewSyncer(conf, nil) // do not support shard DDL for singleton syncer.
sync := syncer.NewSyncer(conf, nil, nil) // do not support shard DDL for singleton syncer.
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

Expand Down
3 changes: 2 additions & 1 deletion cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
globalLog "github.com/pingcap/log"
"go.uber.org/zap"

lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log"
)

func main() {
Expand Down
3 changes: 2 additions & 1 deletion dm/pbmock/dmmaster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion dm/pbmock/dmworker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type RelayHolder interface {
Result() *pb.ProcessResult
// Update updates relay config online
Update(ctx context.Context, cfg *config.SourceConfig) error
// RegisterListener registers a relay listener
RegisterListener(el relay.Listener)
// UnRegisterListener unregisters a relay listener
UnRegisterListener(el relay.Listener)
}

// NewRelayHolder is relay holder initializer
Expand Down Expand Up @@ -307,6 +311,14 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}

func (h *realRelayHolder) RegisterListener(el relay.Listener) {
h.relay.RegisterListener(el)
}

func (h *realRelayHolder) UnRegisterListener(el relay.Listener) {
h.relay.UnRegisterListener(el)
}

/******************** dummy relay holder ********************/

type dummyRelayHolder struct {
Expand Down Expand Up @@ -424,3 +436,9 @@ func (d *dummyRelayHolder) Stage() pb.Stage {
defer d.Unlock()
return d.stage
}

func (d *dummyRelayHolder) RegisterListener(el relay.Listener) {
}

func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) {
}
6 changes: 6 additions & 0 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) RegisterListener(el relay.Listener) {
}

func (d *DummyRelay) UnRegisterListener(el relay.Listener) {
}

// NewDummyRelay creates an instance of dummy Relay.
func NewDummyRelay(cfg *relay.Config) relay.Process {
return &DummyRelay{}
Expand Down
3 changes: 2 additions & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)
Expand Down Expand Up @@ -120,7 +121,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
3 changes: 3 additions & 0 deletions dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ func (w *SourceWorker) EnableRelay() (err error) {
w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay)
}()

w.relayHolder.RegisterListener(w.subTaskHolder)

w.relayEnabled.Store(true)
w.l.Info("relay enabled")
w.subTaskHolder.resetAllSubTasks(true)
Expand Down Expand Up @@ -385,6 +387,7 @@ func (w *SourceWorker) DisableRelay() {
if w.relayHolder != nil {
r := w.relayHolder
w.relayHolder = nil
r.UnRegisterListener(w.subTaskHolder)
r.Close()
}
if w.relayPurger != nil {
Expand Down
5 changes: 3 additions & 2 deletions dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
)

Expand Down Expand Up @@ -241,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -416,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
30 changes: 26 additions & 4 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/syncer"
Expand All @@ -44,12 +45,22 @@ const (
waitRelayCatchupTimeout = 30 * time.Second
)

type relayNotifier struct {
// ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times
ch chan interface{}
}

// Notified implements streamer.EventNotifier.
func (r relayNotifier) Notified() chan interface{} {
return r.ch
}

// createRealUnits is subtask units initializer
// it can be used for testing.
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand All @@ -64,7 +75,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
} else {
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
us = append(us, syncer.NewSyncer(cfg, etcdClient))
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
case config.ModeFull:
// NOTE: maybe need another checker in the future?
us = append(us, dumpling.NewDumpling(cfg))
Expand All @@ -74,7 +85,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
case config.ModeIncrement:
us = append(us, syncer.NewSyncer(cfg, etcdClient))
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
default:
log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode))
}
Expand Down Expand Up @@ -108,6 +119,8 @@ type SubTask struct {
etcdClient *clientv3.Client

workerName string

notifier streamer.EventNotifier
}

// NewSubTask is subtask initializer
Expand All @@ -130,14 +143,15 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
workerName: workerName,
notifier: &relayNotifier{ch: make(chan interface{}, 1)},
}
updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName)
return &st
}

// initUnits initializes the sub task processing units.
func (st *SubTask) initUnits() error {
st.units = createUnits(st.cfg, st.etcdClient, st.workerName)
st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier)
if len(st.units) < 1 {
return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode)
}
Expand Down Expand Up @@ -724,3 +738,11 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string)
taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage))
}
}

func (st *SubTask) relayNotify() {
// skip if there's pending notify
select {
case st.notifier.Notified() <- struct{}{}:
default:
}
}
13 changes: 13 additions & 0 deletions dm/worker/subtask_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package worker
import (
"context"
"sync"

"github.com/go-mysql-org/go-mysql/replication"
)

// subTaskHolder holds subtask instances.
Expand Down Expand Up @@ -88,3 +90,14 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask {
}
return result
}

// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, s := range h.subTasks {
s.relayNotify()
}
}
19 changes: 10 additions & 9 deletions dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/syncer"

Expand All @@ -47,24 +48,24 @@ func (t *testSubTask) TestCreateUnits(c *C) {
Mode: "xxx",
}
worker := "worker"
c.Assert(createUnits(cfg, nil, worker), HasLen, 0)
c.Assert(createUnits(cfg, nil, worker, nil), HasLen, 0)

cfg.Mode = config.ModeFull
unitsFull := createUnits(cfg, nil, worker)
unitsFull := createUnits(cfg, nil, worker, nil)
c.Assert(unitsFull, HasLen, 2)
_, ok := unitsFull[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
_, ok = unitsFull[1].(*loader.Loader)
c.Assert(ok, IsTrue)

cfg.Mode = config.ModeIncrement
unitsIncr := createUnits(cfg, nil, worker)
unitsIncr := createUnits(cfg, nil, worker, nil)
c.Assert(unitsIncr, HasLen, 1)
_, ok = unitsIncr[0].(*syncer.Syncer)
c.Assert(ok, IsTrue)

cfg.Mode = config.ModeAll
unitsAll := createUnits(cfg, nil, worker)
unitsAll := createUnits(cfg, nil, worker, nil)
c.Assert(unitsAll, HasLen, 3)
_, ok = unitsAll[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
Expand Down Expand Up @@ -176,7 +177,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return nil
}
st.Run(pb.Stage_Running)
Expand All @@ -185,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -296,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -420,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand All @@ -445,7 +446,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {

st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker")
c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down
Loading

0 comments on commit 151cc17

Please sign in to comment.