Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
use init container for worker pod to wait master pod ready
Browse files Browse the repository at this point in the history
fix [186](#186)
  • Loading branch information
zlcnju committed Jul 8, 2019
1 parent 6aa39a4 commit 064188b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 24 deletions.
29 changes: 29 additions & 0 deletions pkg/common/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package config

import (
"io/ioutil"
log "github.com/sirupsen/logrus"
)

var initContainerTemplate string

func init() {
bytes, err := ioutil.ReadFile("/etc/config/initContainer.yaml")
if err != nil {
log.Warningf("error while read initContainerTemplate, use default. error: %s", err)
setDefaultInitContainerTemplate()
} else {
initContainerTemplate = string(bytes)
}
}

func setDefaultInitContainerTemplate() {
initContainerTemplate = `
- name: init-pytorch
image: busybox
command: ['sh', '-c', 'until nslookup {{.MasterAddr}}; do echo waiting for master; sleep 2; done;']`
}

func GetInitContainerTemplate() string {
return initContainerTemplate
}
32 changes: 8 additions & 24 deletions pkg/controller.v1/pytorch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,30 +56,7 @@ func (pc *PyTorchController) reconcilePods(
// Convert PyTorchReplicaType to lower string.
rt := strings.ToLower(string(rtype))
logger := pylogger.LoggerForReplica(job, rt)
// Workers are started only when master pod is in running state
if rtype == pyv1.PyTorchReplicaTypeWorker {
if ContainMasterSpec(job) {
masterPod, err := pc.FilterPodsForReplicaType(pods, strings.ToLower(string(pyv1.PyTorchReplicaTypeMaster)))
if err != nil {
return err
}
if len(masterPod) > 1 {
pylogger.LoggerForJob(job).Info("Invalid config: Job must contain only one master pod")
return errors.New("invalid config: Job must contain only one master pod")
} else if len(masterPod) == 1 {
if masterPod[0].Status.Phase != v1.PodRunning {
pylogger.LoggerForJob(job).Info("Master Pod is created but not yet in running phase")
return nil
}
} else {
pylogger.LoggerForJob(job).Info("Master Pod is not yet created")
return nil
}
} else {
pylogger.LoggerForJob(job).Info("Invalid config: Job must contain master replica spec")
return errors.New("invalid config: Job must contain master replica spec")
}
}

// Get all pods for the type rt.
pods, err := pc.FilterPodsForReplicaType(pods, rt)
if err != nil {
Expand Down Expand Up @@ -210,6 +187,13 @@ func (pc *PyTorchController) createNewPod(job *pyv1.PyTorchJob, rtype pyv1.PyTor
pc.Recorder.Event(job, v1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg)
}
setRestartPolicy(podTemplate, spec)
if !masterRole {
masterAddr := jobcontroller.GenGeneralName(job.Name, strings.ToLower(string(pyv1.PyTorchReplicaTypeMaster)), strconv.Itoa(0))
err := AddInitContainerForWorkerPod(podTemplate, InitContainerParam{masterAddr})
if err != nil {
return err
}
}

// if gang-scheduling is enabled:
// 1. if user has specified other scheduler, we report a warning without overriding any fields.
Expand Down
37 changes: 37 additions & 0 deletions pkg/controller.v1/pytorch/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"fmt"

pyv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1"
"k8s.io/api/core/v1"
"gopkg.in/yaml.v2"
"html/template"
"bytes"
"github.com/kubeflow/pytorch-operator/pkg/common/config"
)

var (
Expand All @@ -40,9 +45,41 @@ func GetPortFromPyTorchJob(job *pyv1.PyTorchJob, rtype pyv1.PyTorchReplicaType)
return -1, errPortNotFound
}

type InitContainerParam struct {
MasterAddr string
}

func ContainMasterSpec(job *pyv1.PyTorchJob) bool {
if _, ok := job.Spec.PyTorchReplicaSpecs[pyv1.PyTorchReplicaTypeMaster]; ok {
return true
}
return false
}

func GetInitContainer(containerTemplate string, param InitContainerParam) ([]v1.Container, error) {
var buf bytes.Buffer
tpl, err := template.New("container").Parse(containerTemplate)
if err != nil {
return nil, err
}
if err := tpl.Execute(&buf, param); err != nil {
return nil, err
}

var result []v1.Container
err = yaml.Unmarshal(buf.Bytes(), &result)
if err != nil {
return nil, err
}

return result, nil
}

func AddInitContainerForWorkerPod(podTemplate *v1.PodTemplateSpec, param InitContainerParam) error {
containers, err := GetInitContainer(config.GetInitContainerTemplate(), param)
if err != nil {
return err
}
podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, containers...)
return nil
}
17 changes: 17 additions & 0 deletions pkg/controller.v1/pytorch/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,20 @@ func TestConvertPyTorchJobToUnstructured(t *testing.T) {
t.Errorf("Expected error to be nil while got %v", err)
}
}

func TestGetInitContainer(t *testing.T) {
template := `
- name: init-pytorch
image: busybox
command: ['sh', '-c', 'until nslookup {{.MasterAddr}}; do echo waiting for master; sleep 2; done;']`

initContainer, err := GetInitContainer(template, InitContainerParam{"svc"})
if err != nil {
t.Errorf("Expected error to be nil while got %v", err)
}

expectedCMD := "until nslookup svc; do echo waiting for master; sleep 2; done;"
if initContainer[0].Command[2] != expectedCMD {
t.Errorf("Expected %s , got %s", expectedCMD, initContainer[0].Command[2])
}
}

0 comments on commit 064188b

Please sign in to comment.