From 0dc379f54fba4a8e0facc71d1fdee01d5689dfed Mon Sep 17 00:00:00 2001 From: supermeng Date: Thu, 1 Mar 2018 15:38:02 +0800 Subject: [PATCH] Do not recreate container when scale resource --- cluster/cluster.go | 1 + engine/config.go | 11 +++++++-- engine/pod.go | 52 ++++++++++++++++++++++++++++++++++++++++- engine/podgroup.go | 33 +++++++++++++++++++++----- engine/podgroup_ops.go | 29 ++++++++++++++++++++++- engine/podgroup_test.go | 4 ++-- engine/runtimes.go | 25 ++++++++++---------- engine/specs.go | 2 -- glide.lock | 2 +- glide.yaml | 2 +- 10 files changed, 132 insertions(+), 29 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 711db96..ce64294 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -33,6 +33,7 @@ type Cluster interface { InspectContainer(id string) (adoc.ContainerDetail, error) RemoveContainer(id string, force bool, volumes bool) error RenameContainer(id string, name string) error + UpdateContainer(id string, config interface{}) error MonitorEvents(filter string, callback adoc.EventCallback) int64 StopMonitor(monitorId int64) diff --git a/engine/config.go b/engine/config.go index 459680d..e2e185a 100644 --- a/engine/config.go +++ b/engine/config.go @@ -24,9 +24,16 @@ type Guard struct { Working bool `json:"Working"` } +type CUpdateConfig struct { + CPUPeriod int64 `json:"CpuPeriod,omitempty"` // CPU CFS (Completely Fair Scheduler) period + CPUQuota int64 `json:"CpuQuota,omitempty"` // CPU CFS (Completely Fair Scheduler) quota + Memory int64 `json:"Memory,omitempty"` // Memory limit (in bytes) + MemorySwap int64 `json:"MemorySwap,omitempty"` // Total memory usage (memory + swap); set `-1` to enable unlimited swap +} + const ( - EtcdResourcesKey = "/lain/config/resources" - EtcdGuardSwitchKey = "/lain/config/guardswitch" + EtcdResourcesKey = "/lain/config/resources" + EtcdGuardSwitchKey = "/lain/config/guardswitch" EtcdCloudVolumeRootKey = "/lain/config/cloud_volumes_root" EtcdVolumeRootKey = "/lain/config/volumes_root" diff --git a/engine/pod.go b/engine/pod.go index 66636f1..3a2a679 100644 --- a/engine/pod.go +++ b/engine/pod.go @@ -148,6 +148,42 @@ func (pc *podController) Drift(cluster cluster.Cluster, fromNode, toNode string, return true } +func (pc *podController) Update(cluster cluster.Cluster) error { + log.Infof("%s updating", pc) + start := time.Now() + defer func() { + pc.spec.Filters = []string{} // clear the filter + pc.pod.UpdatedAt = time.Now() + log.Infof("%s updated, state=%+v, duration=%s", pc, pc.pod.ImRuntime, time.Now().Sub(start)) + }() + var err error + for i, cSpec := range pc.spec.Containers { + e := pc.updateContainer(cluster, i) + if e != nil { + log.Warnf("%s Cannot update container, error=%q, spec=%+v", pc, err, cSpec) + if err == nil { + err = e + } + } + id := pc.pod.Containers[i].Id + pc.startContainer(cluster, id) + pc.refreshContainer(cluster, i) + if i == 0 && pc.pod.Containers[0].NodeName != "" { + pc.spec.PrevState.NodeName = pc.pod.Containers[i].NodeName + } + pc.spec.PrevState.IPs[i] = pc.pod.Containers[i].ContainerIp + } + if pc.pod.State == RunStatePending { + if err == nil { + pc.pod.State = RunStateSuccess + } else { + pc.pod.State = RunStateError + } + pc.pod.TargetState = ExpectStateRun + } + return err +} + func (pc *podController) Remove(cluster cluster.Cluster) { log.Infof("%s removing", pc) start := time.Now() @@ -338,6 +374,7 @@ func (pc *podController) refreshContainer(kluster cluster.Cluster, index int) { if network == "" { network = pc.spec.Namespace } + log.Infof("pc.spec.PrevState.IPs:%v", pc.spec.PrevState.IPs) prevIP, nowIP := pc.spec.PrevState.IPs[index], info.NetworkSettings.Networks[network].IPAddress // NOTE: if the container's ip is not equal to prev ip, try to correct it; if failed, accpet new ip if prevIP != "" && prevIP != nowIP { @@ -406,6 +443,19 @@ func (pc *podController) createContainer(cluster cluster.Cluster, filters []stri return cluster.CreateContainer(cc, hc, nc, name) } +func (pc *podController) updateContainer(cluster cluster.Cluster, index int) error { + podSpec := pc.spec + spec := podSpec.Containers[index] + id := pc.pod.Containers[index].Id + config := &CUpdateConfig{ + Memory: spec.MemoryLimit, + MemorySwap: spec.MemoryLimit, // Memory == MemorySwap means disable swap + CPUPeriod: CPUQuota, + CPUQuota: int64(spec.CpuLimit*resource.Cpu*CPUMaxPctg) * CPUQuota / int64(CPUMaxLevel*100), + } + return cluster.UpdateContainer(id, config) +} + func (pc *podController) createContainerConfig(filters []string, index int) adoc.ContainerConfig { podSpec := pc.spec spec := podSpec.Containers[index] @@ -527,7 +577,7 @@ func (pc *podController) createHostConfig(index int) adoc.HostConfig { Resources: adoc.Resources{ Memory: spec.MemoryLimit, MemorySwap: spec.MemoryLimit, // Memory == MemorySwap means disable swap - MemorySwappiness: &swappiness, + MemorySwappiness: &swappiness, CPUPeriod: CPUQuota, CPUQuota: int64(spec.CpuLimit*resource.Cpu*CPUMaxPctg) * CPUQuota / int64(CPUMaxLevel*100), BlkioDeviceReadBps: BlkioDeviceReadBps, diff --git a/engine/podgroup.go b/engine/podgroup.go index 9119d15..712d6f1 100644 --- a/engine/podgroup.go +++ b/engine/podgroup.go @@ -204,13 +204,17 @@ func (pgCtrl *podGroupController) RescheduleSpec(podSpec PodSpec) { if ok := pgCtrl.updatePodPorts(podSpec); !ok { return } - // store oldPodSpec for rollback(with ttl 10min) - pgCtrl.opsChan <- pgOperCacheLastSpec{spec: spec} - oldPodSpec := spec.Pod.Clone() spec.Pod = spec.Pod.Merge(podSpec) - spec.Version += 1 spec.UpdatedAt = time.Now() + reDeploy := shouldReDeploy(oldPodSpec, podSpec) + if reDeploy { + // store oldPodSpec for rollback(with ttl 10min) + pgCtrl.opsChan <- pgOperCacheLastSpec{spec: spec} + spec.Version += 1 + } else { + spec.Pod.Version -= 1 + } pgCtrl.Lock() pgCtrl.spec = spec pgCtrl.Unlock() @@ -218,7 +222,11 @@ func (pgCtrl *podGroupController) RescheduleSpec(podSpec PodSpec) { pgCtrl.opsChan <- pgOperSaveStore{true} pgCtrl.opsChan <- pgOperSnapshotEagleView{spec.Name} for i := 0; i < spec.NumInstances; i += 1 { - pgCtrl.opsChan <- pgOperUpgradeInstance{i + 1, spec.Version, oldPodSpec, spec.Pod} + if reDeploy { + pgCtrl.opsChan <- pgOperUpgradeInstance{i + 1, spec.Version, oldPodSpec, spec.Pod} + } else { + pgCtrl.opsChan <- pgOperUpdateInsConfig{i + 1, spec.Version, oldPodSpec, spec.Pod} + } pgCtrl.opsChan <- pgOperSnapshotGroup{true} pgCtrl.opsChan <- pgOperSaveStore{true} } @@ -336,7 +344,6 @@ func (pgCtrl *podGroupController) Activate(c cluster.Cluster, store storage.Stor } func (pgCtrl *podGroupController) LastSpec() *PodGroupSpec { - log.Infof("Fetch LastPodSpec !") var lastSpec PodGroupSpec if err := pgCtrl.engine.store.Get(pgCtrl.lastPodSpecKey, &lastSpec); err != nil { log.Infof("Fetch LastPodSpec with err:%v", err) @@ -705,3 +712,17 @@ func newPodGroupController(spec PodGroupSpec, states []PodPrevState, pg PodGroup pgCtrl.Publisher = NewPublisher(true) return pgCtrl } + +// Assume reschedule spec operation change MemoryLimit or CpuLimit will not change other pod spec +func shouldReDeploy(oldSpec, newSpec PodSpec) bool { + if len(oldSpec.Containers) != len(newSpec.Containers) { + return true + } + for i, _ := range newSpec.Containers { + if oldSpec.Containers[i].MemoryLimit != newSpec.Containers[i].MemoryLimit || + oldSpec.Containers[i].CpuLimit != newSpec.Containers[i].CpuLimit { + return false + } + } + return true +} diff --git a/engine/podgroup_ops.go b/engine/podgroup_ops.go index c95529e..1215f09 100644 --- a/engine/podgroup_ops.go +++ b/engine/podgroup_ops.go @@ -158,7 +158,7 @@ func (op pgOperRefreshInstance) Do(pgCtrl *podGroupController, c cluster.Cluster pgCtrl.RUnlock() }() - if(op.instanceNo > len(pgCtrl.podCtrls)){ + if op.instanceNo > len(pgCtrl.podCtrls) { log.Warnf("Pod is not exists") return false } @@ -314,6 +314,33 @@ func (op pgOperVerifyInstanceCount) Do(pgCtrl *podGroupController, c cluster.Clu return false } +type pgOperUpdateInsConfig struct { + instanceNo int + version int + oldPodSpec PodSpec + newPodSpec PodSpec +} + +func (op pgOperUpdateInsConfig) Do(pgCtrl *podGroupController, c cluster.Cluster, store storage.Store, ev *RuntimeEagleView) bool { + var runtime ImRuntime + start := time.Now() + defer func() { + pgCtrl.RLock() + log.Infof("%s update instance, op=%+v, runtime=%+v, duration=%s", pgCtrl, op, runtime, time.Now().Sub(start)) + pgCtrl.RUnlock() + }() + podCtrl := pgCtrl.podCtrls[op.instanceNo-1] + newPodSpec := op.newPodSpec.Clone() + newPodSpec.PrevState = podCtrl.spec.PrevState.Clone() // upgrade action, state should not changed + podCtrl.spec = newPodSpec + podCtrl.pod.State = RunStatePending + if err := podCtrl.Update(c); err != nil { + lowOp := pgOperUpgradeInstance{op.instanceNo, op.version, op.oldPodSpec, op.newPodSpec} + lowOp.Do(pgCtrl, c, store, ev) + } + return false +} + type pgOperDeployInstance struct { instanceNo int version int diff --git a/engine/podgroup_test.go b/engine/podgroup_test.go index f9752f5..1c9bc3c 100644 --- a/engine/podgroup_test.go +++ b/engine/podgroup_test.go @@ -129,8 +129,8 @@ func TestEnginePodGroup(t *testing.T) { time.Sleep(40 * time.Second) if pg, ok := engine.InspectPodGroup(name); !ok { t.Errorf("We should have the pod group, but we don't get it") - } else if pg.Spec.Version != 2 { - t.Errorf("We should have version 2 of the pods") + } else if pg.Spec.Version != 1 { + t.Errorf("We should have version 1 of the pods") } if err := engine.RemovePodGroup(name); err != nil { diff --git a/engine/runtimes.go b/engine/runtimes.go index 9515764..b732b40 100644 --- a/engine/runtimes.go +++ b/engine/runtimes.go @@ -1,9 +1,9 @@ package engine import ( - "github.com/mijia/adoc" - "github.com/mijia/sweb/log" "time" + + "github.com/mijia/adoc" ) type RunState int @@ -14,16 +14,16 @@ type PGOpState int32 var RestartMaxCount int const ( - RunStatePending = iota // waiting for operation - RunStateDrift // drifting from one node to another - RunStateSuccess // ok - RunStateExit // exited - RunStateFail // start failed with error - RunStateInconsistent // container's state is different between deployd and swarm - RunStateMissing // container is missing and need create it. happened when node down .etc - RunStateRemoved // removed - RunStatePaused // paused - RunStateError // call docker interface with error + RunStatePending = iota // waiting for operation + RunStateDrift // drifting from one node to another + RunStateSuccess // ok + RunStateExit // exited + RunStateFail // start failed with error + RunStateInconsistent // container's state is different between deployd and swarm + RunStateMissing // container is missing and need create it. happened when node down .etc + RunStateRemoved // removed + RunStatePaused // paused + RunStateError // call docker interface with error ) const ( @@ -251,7 +251,6 @@ func (pod Pod) PodIp() string { func (pod *Pod) ChangeTargetState(state ExpectState) { pod.TargetState = state - log.Infof("target state:::%v", state) } type PodGroup struct { diff --git a/engine/specs.go b/engine/specs.go index 479a236..3a3e83c 100644 --- a/engine/specs.go +++ b/engine/specs.go @@ -31,7 +31,6 @@ const ( MinPodKillTimeout = 10 MaxPodKillTimeout = 120 - ) var ( @@ -441,7 +440,6 @@ func (s PodSpec) Merge(o PodSpec) PodSpec { s.Stateful = o.Stateful s.Version += 1 s.UpdatedAt = time.Now() - s.PrevState = o.PrevState s.SetupTime = o.SetupTime s.KillTimeout = o.KillTimeout s.HealthConfig = o.HealthConfig diff --git a/glide.lock b/glide.lock index d9d36dd..16cd42b 100644 --- a/glide.lock +++ b/glide.lock @@ -98,7 +98,7 @@ imports: - jlexer - jwriter - name: github.com/mijia/adoc - version: 1ef227e439ebbac803b4b9ec6f9a111edd4d6831 + version: 61dbc8d45a4512b5e1e5c1ff25773cee578418b9 - name: github.com/mijia/go-generics version: 2278a5f0de143e1d17ea16d56e7f85391bdb85a3 - name: github.com/mijia/sweb diff --git a/glide.yaml b/glide.yaml index d56d3a8..fa91456 100644 --- a/glide.yaml +++ b/glide.yaml @@ -10,7 +10,7 @@ import: - store - store/etcd - package: github.com/mijia/adoc - version: 1ef227e439ebbac803b4b9ec6f9a111edd4d6831 + version: 61dbc8d45a4512b5e1e5c1ff25773cee578418b9 - package: github.com/mijia/go-generics - package: github.com/mijia/sweb subpackages: