Skip to content

Do not recreate container when scale resource #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Cluster interface {
InspectContainer(id string) (adoc.ContainerDetail, error)
RemoveContainer(id string, force bool, volumes bool) error
RenameContainer(id string, name string) error
UpdateContainer(id string, config interface{}) error

MonitorEvents(filter string, callback adoc.EventCallback) int64
StopMonitor(monitorId int64)
Expand Down
11 changes: 9 additions & 2 deletions engine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ type Guard struct {
Working bool `json:"Working"`
}

type CUpdateConfig struct {
CPUPeriod int64 `json:"CpuPeriod,omitempty"` // CPU CFS (Completely Fair Scheduler) period
CPUQuota int64 `json:"CpuQuota,omitempty"` // CPU CFS (Completely Fair Scheduler) quota
Memory int64 `json:"Memory,omitempty"` // Memory limit (in bytes)
MemorySwap int64 `json:"MemorySwap,omitempty"` // Total memory usage (memory + swap); set `-1` to enable unlimited swap
}

const (
EtcdResourcesKey = "/lain/config/resources"
EtcdGuardSwitchKey = "/lain/config/guardswitch"
EtcdResourcesKey = "/lain/config/resources"
EtcdGuardSwitchKey = "/lain/config/guardswitch"
EtcdCloudVolumeRootKey = "/lain/config/cloud_volumes_root"
EtcdVolumeRootKey = "/lain/config/volumes_root"

Expand Down
52 changes: 51 additions & 1 deletion engine/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,42 @@ func (pc *podController) Drift(cluster cluster.Cluster, fromNode, toNode string,
return true
}

func (pc *podController) Update(cluster cluster.Cluster) error {
log.Infof("%s updating", pc)
start := time.Now()
defer func() {
pc.spec.Filters = []string{} // clear the filter
pc.pod.UpdatedAt = time.Now()
log.Infof("%s updated, state=%+v, duration=%s", pc, pc.pod.ImRuntime, time.Now().Sub(start))
}()
var err error
for i, cSpec := range pc.spec.Containers {
e := pc.updateContainer(cluster, i)
if e != nil {
log.Warnf("%s Cannot update container, error=%q, spec=%+v", pc, err, cSpec)
if err == nil {
err = e
}
}
id := pc.pod.Containers[i].Id
pc.startContainer(cluster, id)
pc.refreshContainer(cluster, i)
if i == 0 && pc.pod.Containers[0].NodeName != "" {
pc.spec.PrevState.NodeName = pc.pod.Containers[i].NodeName
}
pc.spec.PrevState.IPs[i] = pc.pod.Containers[i].ContainerIp
}
if pc.pod.State == RunStatePending {
if err == nil {
pc.pod.State = RunStateSuccess
} else {
pc.pod.State = RunStateError
}
pc.pod.TargetState = ExpectStateRun
}
return err
}

func (pc *podController) Remove(cluster cluster.Cluster) {
log.Infof("%s removing", pc)
start := time.Now()
Expand Down Expand Up @@ -338,6 +374,7 @@ func (pc *podController) refreshContainer(kluster cluster.Cluster, index int) {
if network == "" {
network = pc.spec.Namespace
}
log.Infof("pc.spec.PrevState.IPs:%v", pc.spec.PrevState.IPs)
prevIP, nowIP := pc.spec.PrevState.IPs[index], info.NetworkSettings.Networks[network].IPAddress
// NOTE: if the container's ip is not equal to prev ip, try to correct it; if failed, accpet new ip
if prevIP != "" && prevIP != nowIP {
Expand Down Expand Up @@ -406,6 +443,19 @@ func (pc *podController) createContainer(cluster cluster.Cluster, filters []stri
return cluster.CreateContainer(cc, hc, nc, name)
}

func (pc *podController) updateContainer(cluster cluster.Cluster, index int) error {
podSpec := pc.spec
spec := podSpec.Containers[index]
id := pc.pod.Containers[index].Id
config := &CUpdateConfig{
Memory: spec.MemoryLimit,
MemorySwap: spec.MemoryLimit, // Memory == MemorySwap means disable swap
CPUPeriod: CPUQuota,
CPUQuota: int64(spec.CpuLimit*resource.Cpu*CPUMaxPctg) * CPUQuota / int64(CPUMaxLevel*100),
}
return cluster.UpdateContainer(id, config)
}

func (pc *podController) createContainerConfig(filters []string, index int) adoc.ContainerConfig {
podSpec := pc.spec
spec := podSpec.Containers[index]
Expand Down Expand Up @@ -527,7 +577,7 @@ func (pc *podController) createHostConfig(index int) adoc.HostConfig {
Resources: adoc.Resources{
Memory: spec.MemoryLimit,
MemorySwap: spec.MemoryLimit, // Memory == MemorySwap means disable swap
MemorySwappiness: &swappiness,
MemorySwappiness: &swappiness,
CPUPeriod: CPUQuota,
CPUQuota: int64(spec.CpuLimit*resource.Cpu*CPUMaxPctg) * CPUQuota / int64(CPUMaxLevel*100),
BlkioDeviceReadBps: BlkioDeviceReadBps,
Expand Down
33 changes: 27 additions & 6 deletions engine/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,29 @@ func (pgCtrl *podGroupController) RescheduleSpec(podSpec PodSpec) {
if ok := pgCtrl.updatePodPorts(podSpec); !ok {
return
}
// store oldPodSpec for rollback(with ttl 10min)
pgCtrl.opsChan <- pgOperCacheLastSpec{spec: spec}

oldPodSpec := spec.Pod.Clone()
spec.Pod = spec.Pod.Merge(podSpec)
spec.Version += 1
spec.UpdatedAt = time.Now()
reDeploy := shouldReDeploy(oldPodSpec, podSpec)
if reDeploy {
// store oldPodSpec for rollback(with ttl 10min)
pgCtrl.opsChan <- pgOperCacheLastSpec{spec: spec}
spec.Version += 1
} else {
spec.Pod.Version -= 1
}
pgCtrl.Lock()
pgCtrl.spec = spec
pgCtrl.Unlock()
pgCtrl.opsChan <- pgOperLogOperation{"Start to reschedule spec"}
pgCtrl.opsChan <- pgOperSaveStore{true}
pgCtrl.opsChan <- pgOperSnapshotEagleView{spec.Name}
for i := 0; i < spec.NumInstances; i += 1 {
pgCtrl.opsChan <- pgOperUpgradeInstance{i + 1, spec.Version, oldPodSpec, spec.Pod}
if reDeploy {
pgCtrl.opsChan <- pgOperUpgradeInstance{i + 1, spec.Version, oldPodSpec, spec.Pod}
} else {
pgCtrl.opsChan <- pgOperUpdateInsConfig{i + 1, spec.Version, oldPodSpec, spec.Pod}
}
pgCtrl.opsChan <- pgOperSnapshotGroup{true}
pgCtrl.opsChan <- pgOperSaveStore{true}
}
Expand Down Expand Up @@ -336,7 +344,6 @@ func (pgCtrl *podGroupController) Activate(c cluster.Cluster, store storage.Stor
}

func (pgCtrl *podGroupController) LastSpec() *PodGroupSpec {
log.Infof("Fetch LastPodSpec !")
var lastSpec PodGroupSpec
if err := pgCtrl.engine.store.Get(pgCtrl.lastPodSpecKey, &lastSpec); err != nil {
log.Infof("Fetch LastPodSpec with err:%v", err)
Expand Down Expand Up @@ -705,3 +712,17 @@ func newPodGroupController(spec PodGroupSpec, states []PodPrevState, pg PodGroup
pgCtrl.Publisher = NewPublisher(true)
return pgCtrl
}

// Assume reschedule spec operation change MemoryLimit or CpuLimit will not change other pod spec
func shouldReDeploy(oldSpec, newSpec PodSpec) bool {
if len(oldSpec.Containers) != len(newSpec.Containers) {
return true
}
for i, _ := range newSpec.Containers {
if oldSpec.Containers[i].MemoryLimit != newSpec.Containers[i].MemoryLimit ||
oldSpec.Containers[i].CpuLimit != newSpec.Containers[i].CpuLimit {
return false
}
}
return true
}
29 changes: 28 additions & 1 deletion engine/podgroup_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (op pgOperRefreshInstance) Do(pgCtrl *podGroupController, c cluster.Cluster
pgCtrl.RUnlock()
}()

if(op.instanceNo > len(pgCtrl.podCtrls)){
if op.instanceNo > len(pgCtrl.podCtrls) {
log.Warnf("Pod is not exists")
return false
}
Expand Down Expand Up @@ -314,6 +314,33 @@ func (op pgOperVerifyInstanceCount) Do(pgCtrl *podGroupController, c cluster.Clu
return false
}

type pgOperUpdateInsConfig struct {
instanceNo int
version int
oldPodSpec PodSpec
newPodSpec PodSpec
}

func (op pgOperUpdateInsConfig) Do(pgCtrl *podGroupController, c cluster.Cluster, store storage.Store, ev *RuntimeEagleView) bool {
var runtime ImRuntime
start := time.Now()
defer func() {
pgCtrl.RLock()
log.Infof("%s update instance, op=%+v, runtime=%+v, duration=%s", pgCtrl, op, runtime, time.Now().Sub(start))
pgCtrl.RUnlock()
}()
podCtrl := pgCtrl.podCtrls[op.instanceNo-1]
newPodSpec := op.newPodSpec.Clone()
newPodSpec.PrevState = podCtrl.spec.PrevState.Clone() // upgrade action, state should not changed
podCtrl.spec = newPodSpec
podCtrl.pod.State = RunStatePending
if err := podCtrl.Update(c); err != nil {
lowOp := pgOperUpgradeInstance{op.instanceNo, op.version, op.oldPodSpec, op.newPodSpec}
lowOp.Do(pgCtrl, c, store, ev)
}
return false
}

type pgOperDeployInstance struct {
instanceNo int
version int
Expand Down
4 changes: 2 additions & 2 deletions engine/podgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func TestEnginePodGroup(t *testing.T) {
time.Sleep(40 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if pg.Spec.Version != 2 {
t.Errorf("We should have version 2 of the pods")
} else if pg.Spec.Version != 1 {
t.Errorf("We should have version 1 of the pods")
}

if err := engine.RemovePodGroup(name); err != nil {
Expand Down
25 changes: 12 additions & 13 deletions engine/runtimes.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package engine

import (
"github.com/mijia/adoc"
"github.com/mijia/sweb/log"
"time"

"github.com/mijia/adoc"
)

type RunState int
Expand All @@ -14,16 +14,16 @@ type PGOpState int32
var RestartMaxCount int

const (
RunStatePending = iota // waiting for operation
RunStateDrift // drifting from one node to another
RunStateSuccess // ok
RunStateExit // exited
RunStateFail // start failed with error
RunStateInconsistent // container's state is different between deployd and swarm
RunStateMissing // container is missing and need create it. happened when node down .etc
RunStateRemoved // removed
RunStatePaused // paused
RunStateError // call docker interface with error
RunStatePending = iota // waiting for operation
RunStateDrift // drifting from one node to another
RunStateSuccess // ok
RunStateExit // exited
RunStateFail // start failed with error
RunStateInconsistent // container's state is different between deployd and swarm
RunStateMissing // container is missing and need create it. happened when node down .etc
RunStateRemoved // removed
RunStatePaused // paused
RunStateError // call docker interface with error
)

const (
Expand Down Expand Up @@ -251,7 +251,6 @@ func (pod Pod) PodIp() string {

func (pod *Pod) ChangeTargetState(state ExpectState) {
pod.TargetState = state
log.Infof("target state:::%v", state)
}

type PodGroup struct {
Expand Down
2 changes: 0 additions & 2 deletions engine/specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (

MinPodKillTimeout = 10
MaxPodKillTimeout = 120

)

var (
Expand Down Expand Up @@ -441,7 +440,6 @@ func (s PodSpec) Merge(o PodSpec) PodSpec {
s.Stateful = o.Stateful
s.Version += 1
s.UpdatedAt = time.Now()
s.PrevState = o.PrevState
s.SetupTime = o.SetupTime
s.KillTimeout = o.KillTimeout
s.HealthConfig = o.HealthConfig
Expand Down
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import:
- store
- store/etcd
- package: github.com/mijia/adoc
version: 1ef227e439ebbac803b4b9ec6f9a111edd4d6831
version: 61dbc8d45a4512b5e1e5c1ff25773cee578418b9
- package: github.com/mijia/go-generics
- package: github.com/mijia/sweb
subpackages:
Expand Down