Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: add back time_zone config and change default time_zone to downstream timezone #3403

Merged
merged 15 commits into from
Nov 11, 2021
2 changes: 1 addition & 1 deletion dm/chaos/cases/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var mustExecSQLs = []string{
}

// setInstancesState sets the state (like global sql_mode) for upstream and downstream DB instances.
func setInstancesState(ctx context.Context, targetCfg config2.DBConfig, sourcesCfg ...config2.DBConfig) error {
func setInstancesState(ctx context.Context, targetCfg *config2.DBConfig, sourcesCfg ...*config2.DBConfig) error {
targetDB, err := conn.DefaultDBProvider.Apply(targetCfg)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func main() {
}

// set upstream and downstream instances state.
err = setInstancesState(ctx, cfg.Target, cfg.Source1, cfg.Source2, cfg.Source3)
err = setInstancesState(ctx, &cfg.Target, &cfg.Source1, &cfg.Source2, &cfg.Source3)
if err != nil {
log.L().Error("fail to set instances state", zap.Error(err))
code = 2
Expand Down
4 changes: 2 additions & 2 deletions dm/chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s
}

cfg := sourcesCfg[i]
db, err2 := conn.DefaultDBProvider.Apply(cfg)
db, err2 := conn.DefaultDBProvider.Apply(&cfg)
if err2 != nil {
return nil, err2
}
Expand All @@ -110,7 +110,7 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s
res = append(res, singleResult{})
}

targetDB, err := conn.DefaultDBProvider.Apply(targetCfg)
targetDB, err := conn.DefaultDBProvider.Apply(&targetCfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}
dbCfg := instance.cfg.From
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.sourceDB, err = conn.DefaultDBProvider.Apply(dbCfg)
instance.sourceDB, err = conn.DefaultDBProvider.Apply(&dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.From.User, instance.cfg.From.Host, instance.cfg.From.Port), terror.ScopeUpstream)
}
Expand All @@ -157,7 +157,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}
dbCfg = instance.cfg.To
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.targetDB, err = conn.DefaultDBProvider.Apply(dbCfg)
instance.targetDB, err = conn.DefaultDBProvider.Apply(&dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.To.User, instance.cfg.To.Host, instance.cfg.To.Port), terror.ScopeDownstream)
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func (c *Checker) Resume(ctx context.Context, pr chan pb.ProcessResult) {
}

// Update implements Unit.Update.
func (c *Checker) Update(cfg *config.SubTaskConfig) error {
func (c *Checker) Update(ctx context.Context, cfg *config.SubTaskConfig) error {
// not support update configuration now
return nil
}
Expand Down
14 changes: 0 additions & 14 deletions dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,6 @@ func (c *SourceConfig) YamlForDowngrade() (string, error) {
}
s.From.Password = cipher

// omit default values, so we can ignore them for later marshal
s.omitDefaultVals()

// not write this field when exporting
s.EnableRelay = false
return s.Yaml()
Expand Down Expand Up @@ -448,17 +445,6 @@ func NewSourceConfigForDowngrade(sourceCfg *SourceConfig) *SourceConfigForDowngr
}
}

// omitDefaultVals change default value to empty value for new config item.
// If any default value for new config item is not empty(0 or false or nil),
// we should change it to empty.
func (c *SourceConfigForDowngrade) omitDefaultVals() {
if len(c.From.Session) > 0 {
if timeZone, ok := c.From.Session["time_zone"]; ok && timeZone == defaultTimeZone {
delete(c.From.Session, "time_zone")
}
}
}

// Yaml returns YAML format representation of the config.
func (c *SourceConfigForDowngrade) Yaml() (string, error) {
b, err := yaml.Marshal(c)
Expand Down
1 change: 1 addition & 0 deletions dm/dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (t *testConfig) TestConfig(c *C) {

clone6, err := ParseYaml(clone4yaml)
c.Assert(err, IsNil)
clone6.From.Session = nil
c.Assert(clone6, DeepEquals, clone4)

// test invalid config
Expand Down
23 changes: 11 additions & 12 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,18 @@ func (db *DBConfig) Decode(data string) error {

// Adjust adjusts the config.
func (db *DBConfig) Adjust() {
// force set session time zone to UTC here.
AdjustTargetDBTimeZone(db)
if len(db.Password) > 0 {
db.Password = utils.DecryptOrPlaintext(db.Password)
}
}

func (db *DBConfig) AdjustWithTimeZone(timeZone string) {
if timeZone != "" {
AdjustDBTimeZone(db, timeZone)
}
db.Adjust()
}

// Clone returns a deep copy of DBConfig. This function only fixes data race when adjusting Session.
func (db *DBConfig) Clone() *DBConfig {
if db == nil {
Expand Down Expand Up @@ -238,9 +243,8 @@ type SubTaskConfig struct {
// deprecated
HeartbeatReportInterval int `toml:"heartbeat-report-interval" json:"heartbeat-report-interval"`
// deprecated
EnableHeartbeat bool `toml:"enable-heartbeat" json:"enable-heartbeat"`
// deprecated
Timezone string `toml:"timezone" json:"timezone"`
EnableHeartbeat bool `toml:"enable-heartbeat" json:"enable-heartbeat"`
Timezone string `toml:"timezone" json:"timezone"`

Meta *Meta `toml:"meta" json:"meta"`

Expand Down Expand Up @@ -415,11 +419,6 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
c.MetaSchema = defaultMetaSchema
}

if c.Timezone != "" {
log.L().Warn("'timezone' is deprecated, please remove this field.")
c.Timezone = ""
}

dirSuffix := "." + c.Name
if !strings.HasSuffix(c.LoaderConfig.Dir, dirSuffix) { // check to support multiple times calling
// if not ends with the task name, we append the task name to the tail
Expand All @@ -433,8 +432,8 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
c.SyncerConfig.CheckpointFlushInterval = defaultCheckpointFlushInterval
}

c.From.Adjust()
c.To.Adjust()
c.From.AdjustWithTimeZone(c.Timezone)
c.To.AdjustWithTimeZone(c.Timezone)

if verifyDecryptPassword {
_, err1 := c.DecryptPassword()
Expand Down
40 changes: 15 additions & 25 deletions dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ var (
defaultBatch = 100
defaultQueueSize = 1024 // do not give too large default value to avoid OOM
defaultCheckpointFlushInterval = 30 // in seconds
// force use UTC time_zone.
defaultTimeZone = "+00:00"

// TargetDBConfig.
defaultSessionCfg = []struct {
Expand Down Expand Up @@ -296,9 +294,8 @@ type TaskConfig struct {
// deprecated
HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval" toml:"heartbeat-update-interval" json:"heartbeat-update-interval"`
// deprecated
HeartbeatReportInterval int `yaml:"heartbeat-report-interval" toml:"heartbeat-report-interval" json:"heartbeat-report-interval"`
// deprecated
Timezone string `yaml:"timezone" toml:"timezone" json:"timezone"`
HeartbeatReportInterval int `yaml:"heartbeat-report-interval" toml:"heartbeat-report-interval" json:"heartbeat-report-interval"`
Timezone string `yaml:"timezone" toml:"timezone" json:"timezone"`

// handle schema/table name mode, and only for schema/table name
// if case insensitive, we would convert schema/table name to lower case
Expand Down Expand Up @@ -696,9 +693,11 @@ func (c *TaskConfig) adjust() error {
sort.Strings(unusedConfigs)
return terror.ErrConfigGlobalConfigsUnused.Generate(unusedConfigs)
}
// we postpone default time_zone init in each unit so we won't change the config value in task/sub_task config
if c.Timezone != "" {
log.L().Warn("`timezone` is deprecated and useless anymore, please remove it.")
c.Timezone = ""
if _, err := utils.ParseTimeZone(c.Timezone); err != nil {
return err
}
}
if c.RemoveMeta {
log.L().Warn("`remove-meta` in task config is deprecated, please use `start-task ... --remove-meta` instead")
Expand Down Expand Up @@ -760,29 +759,25 @@ func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version *semver.Version) {
lowerMap[cfg.key] = cfg.val
}
}
// force set time zone to UTC
if tz, ok := lowerMap["time_zone"]; ok {
log.L().Warn("session variable 'time_zone' is overwritten with UTC timezone.",
zap.String("time_zone", tz))
}
lowerMap["time_zone"] = defaultTimeZone
dbConfig.Session = lowerMap
}

// AdjustTargetDBTimeZone force adjust session `time_zone` to UTC.
func AdjustTargetDBTimeZone(config *DBConfig) {
for k := range config.Session {
// AdjustDBTimeZone force adjust session `time_zone`.
func AdjustDBTimeZone(config *DBConfig, timeZone string) {
for k, v := range config.Session {
if strings.ToLower(k) == "time_zone" {
log.L().Warn("session variable 'time_zone' is overwritten by default UTC timezone.",
zap.String("time_zone", config.Session[k]))
config.Session[k] = defaultTimeZone
if v != timeZone {
log.L().Warn("session variable 'time_zone' is overwritten by task config's timezone",
zap.String("time_zone", config.Session[k]))
config.Session[k] = timeZone
}
return
}
}
if config.Session == nil {
config.Session = make(map[string]string, 1)
}
config.Session["time_zone"] = defaultTimeZone
config.Session["time_zone"] = timeZone
}

var defaultParser = parser.New()
Expand Down Expand Up @@ -979,11 +974,6 @@ func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade {
// If any default value for new config item is not empty(0 or false or nil),
// we should change it to empty.
func (c *TaskConfigForDowngrade) omitDefaultVals() {
if len(c.TargetDB.Session) > 0 {
if timeZone, ok := c.TargetDB.Session["time_zone"]; ok && timeZone == defaultTimeZone {
delete(c.TargetDB.Session, "time_zone")
}
}
if len(c.ShadowTableRules) == 1 && c.ShadowTableRules[0] == DefaultShadowTableRules {
c.ShadowTableRules = nil
}
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*
cfg.EnableHeartbeat = false
cfg.HeartbeatUpdateInterval = c.HeartbeatUpdateInterval
cfg.HeartbeatReportInterval = c.HeartbeatReportInterval
cfg.Timezone = c.Timezone
cfg.Meta = inst.Meta

fromClone := dbCfg.Clone()
Expand Down Expand Up @@ -285,6 +286,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.EnableHeartbeat = stCfg0.EnableHeartbeat
c.HeartbeatUpdateInterval = stCfg0.HeartbeatUpdateInterval
c.HeartbeatReportInterval = stCfg0.HeartbeatReportInterval
c.Timezone = stCfg0.Timezone
c.CaseSensitive = stCfg0.CaseSensitive
c.TargetDB = &stCfg0.To // just ref
c.OnlineDDL = stCfg0.OnlineDDL
Expand Down
8 changes: 4 additions & 4 deletions dm/dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,22 +910,22 @@ func (t *testConfig) TestAdjustTargetDBConfig(c *C) {
}{
{
DBConfig{},
DBConfig{Session: map[string]string{"time_zone": "+00:00"}},
DBConfig{Session: map[string]string{}},
semver.New("0.0.0"),
},
{
DBConfig{Session: map[string]string{"SQL_MODE": "ANSI_QUOTES"}},
DBConfig{Session: map[string]string{"sql_mode": "ANSI_QUOTES", "time_zone": "+00:00"}},
DBConfig{Session: map[string]string{"sql_mode": "ANSI_QUOTES"}},
semver.New("2.0.7"),
},
{
DBConfig{},
DBConfig{Session: map[string]string{tidbTxnMode: tidbTxnOptimistic, "time_zone": "+00:00"}},
DBConfig{Session: map[string]string{tidbTxnMode: tidbTxnOptimistic}},
semver.New("3.0.1"),
},
{
DBConfig{Session: map[string]string{"SQL_MODE": "", tidbTxnMode: "pessimistic"}},
DBConfig{Session: map[string]string{"sql_mode": "", tidbTxnMode: "pessimistic", "time_zone": "+00:00"}},
DBConfig{Session: map[string]string{"sql_mode": "", tidbTxnMode: "pessimistic"}},
semver.New("4.0.0-beta.2"),
},
}
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (s *Server) upgradeDBSchemaV1Import(tctx *tcontext.Context, cfgs map[string
return err
}
if targetDB == nil {
targetDB, err = conn.DefaultDBProvider.Apply(cfg2.To)
targetDB, err = conn.DefaultDBProvider.Apply(&cfg2.To)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions dm/dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
// collect again, two configs exist.
cfgs, err = s.collectSourceConfigFilesV1Import(tctx)
c.Assert(err, IsNil)
for _, cfg := range cfgs {
cfg.From.Session = nil
}
c.Assert(cfgs, HasLen, 2)
c.Assert(cfgs[cfg1.SourceID], DeepEquals, cfg1)
c.Assert(cfgs[cfg2.SourceID], DeepEquals, cfg2)
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (s *Server) getBaseDBBySourceName(sourceName string) (*conn.BaseDB, error)
return nil, terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName)
}
dbCfg := sourceCfg.GenerateDBConfig()
return conn.DefaultDBProvider.Apply(*dbCfg)
return conn.DefaultDBProvider.Apply(dbCfg)
}

// DMAPIGetSourceSchemaList get source schema list url is: (GET /api/v1/sources/{source-name}/schemas).
Expand Down
7 changes: 3 additions & 4 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
dbConfig := cfg.GenerateDBConfig()
fromDB, err := conn.DefaultDBProvider.Apply(*dbConfig)
fromDB, err := conn.DefaultDBProvider.Apply(dbConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -1165,7 +1165,7 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error {
failpoint.Return(nil)
})

toDB, err := conn.DefaultDBProvider.Apply(cfg)
toDB, err := conn.DefaultDBProvider.Apply(&cfg)
if err != nil {
return err
}
Expand All @@ -1182,7 +1182,6 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error {
config.AdjustTargetDBSessionCfg(dbConfig, version)
} else {
log.L().Warn("get tidb version", log.ShortError(err))
config.AdjustTargetDBTimeZone(dbConfig)
}
return nil
}
Expand Down Expand Up @@ -1439,7 +1438,7 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
}

// set up db and clear meta data in downstream db
baseDB, err := conn.DefaultDBProvider.Apply(*toDBCfg)
baseDB, err := conn.DefaultDBProvider.Apply(toDBCfg)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Unit interface {
// Resume resumes the paused process and its returning must send a result to pr channel.
Resume(ctx context.Context, pr chan pb.ProcessResult)
// Update updates the configuration
Update(cfg *config.SubTaskConfig) error
Update(ctx context.Context, cfg *config.SubTaskConfig) error

// Status returns the unit's current status. The result may need calculation with source status, like estimated time
// to catch up. If sourceStatus is nil, the calculation should be skipped.
Expand Down
8 changes: 4 additions & 4 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (w *SourceWorker) Start() {
}

var err error
w.sourceDB, err = conn.DefaultDBProvider.Apply(w.cfg.DecryptPassword().From)
w.sourceDB, err = conn.DefaultDBProvider.Apply(&w.cfg.DecryptPassword().From)
if err != nil {
w.l.Error("can't connected to upstream", zap.Error(err))
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (w *SourceWorker) updateSourceStatus(ctx context.Context) error {
w.sourceDBMu.Lock()
if w.sourceDB == nil {
var err error
w.sourceDB, err = conn.DefaultDBProvider.Apply(w.cfg.DecryptPassword().From)
w.sourceDB, err = conn.DefaultDBProvider.Apply(&w.cfg.DecryptPassword().From)
if err != nil {
w.sourceDBMu.Unlock()
return err
Expand Down Expand Up @@ -524,7 +524,7 @@ func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.St
}

// UpdateSubTask update config for a sub task.
func (w *SourceWorker) UpdateSubTask(cfg *config.SubTaskConfig) error {
func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskConfig) error {
w.Lock()
defer w.Unlock()

Expand All @@ -538,7 +538,7 @@ func (w *SourceWorker) UpdateSubTask(cfg *config.SubTaskConfig) error {
}

w.l.Info("update sub task", zap.String("task", cfg.Name))
return st.Update(cfg)
return st.Update(ctx, cfg)
}

// OperateSubTask stop/resume/pause sub task.
Expand Down
Loading