Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
PyTorch Operator Prometheus Metrics (#175)
Browse files Browse the repository at this point in the history
* Implements all PyTorch Operator Prometheus Metrics
Adds README for documentation

* Corrects naming for is leader metric

* Corrects README pods to be monitored
  • Loading branch information
krishnadurai authored and k8s-ci-robot committed Jun 10, 2019
1 parent 6d689cb commit f8d09ba
Show file tree
Hide file tree
Showing 90 changed files with 16,987 additions and 0 deletions.
63 changes: 63 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/pytorch-operator.v1/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ServerOption struct {
JSONLogFormat bool
EnableGangScheduling bool
Namespace string
MonitoringPort int
ResyncPeriod time.Duration
}

Expand Down Expand Up @@ -61,5 +62,7 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {

fs.BoolVar(&s.EnableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling by kube-batch.")

fs.IntVar(&s.MonitoringPort, "monitoring-port", 8443, `Endpoint port for displaying monitoring metrics`)

fs.DurationVar(&s.ResyncPeriod, "resyc-period", DefaultResyncPeriod, "Resync interval of the tf-operator")
}
8 changes: 8 additions & 0 deletions cmd/pytorch-operator.v1/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -52,6 +54,10 @@ var (
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
isLeader = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pytorch_operator_is_leader",
Help: "Is this client the leader of this pytorch-operator client set?",
})
)

const RecommendedKubeConfigPathEnv = "KUBECONFIG"
Expand Down Expand Up @@ -112,6 +118,7 @@ func Run(opt *options.ServerOption) error {

// Set leader election start function.
run := func(<-chan struct{}) {
isLeader.Set(1)
if err := tc.Run(opt.Threadiness, stopCh); err != nil {
log.Errorf("Failed to run the controller: %v", err)
}
Expand Down Expand Up @@ -152,6 +159,7 @@ func Run(opt *options.ServerOption) error {
Callbacks: election.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
isLeader.Set(0)
log.Fatalf("leader election lost")
},
},
Expand Down
17 changes: 17 additions & 0 deletions cmd/pytorch-operator.v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,29 @@ package main

import (
"flag"
"fmt"
"net/http"
"strconv"

"github.com/onrik/logrus/filename"
log "github.com/sirupsen/logrus"

"github.com/kubeflow/pytorch-operator/cmd/pytorch-operator.v1/app"
"github.com/kubeflow/pytorch-operator/cmd/pytorch-operator.v1/app/options"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func startMonitoring(monitoringPort int) {
go func() {
log.Infof("Setting up client for monitoring on port: %s", strconv.Itoa(monitoringPort))
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(fmt.Sprintf(":%s", strconv.Itoa(monitoringPort)), nil)
if err != nil {
log.Error("Monitoring endpoint setup failure.")
}
}()
}

func init() {
// Add filename as one of the fields of the structured log message.
filenameHook := filename.NewHook()
Expand All @@ -42,6 +57,8 @@ func main() {
log.SetFormatter(&log.JSONFormatter{})
}

startMonitoring(s.MonitoringPort)

if err := app.Run(s); err != nil {
log.Fatalf("%v\n", err)
}
Expand Down
87 changes: 87 additions & 0 deletions docs/monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Prometheus Monitoring for Pytorch operator

## Available Metrics

Currently available metrics to monitor are listed below.

### Metrics for Each Component Container for PyTorch operator

Component Containers:
* pytorch-master
* pytorch-worker

#### Each Container Reports on its:

Use prometheus graph to run the following example commands to visualize metrics.

*Note*: These metrics are derived from [cAdvisor](https://github.com/google/cadvisor) kubelet integration which reports to Prometheus through our prometheus-operator installation. You may see a complete list of metrics available in `\metrics` page of your Prometheus web UI which you can further use to compose your own queries.

**CPU usage**
```
sum (rate (container_cpu_usage_seconds_total{pod_name=~"pytorchjob-name-.*"}[1m])) by (pod_name)
```

**GPU Usage**
```
sum (rate (container_accelerator_memory_used_bytes{pod_name=~"pytorchjob-name-.*"}[1m])) by (pod_name)
```

**Memory Usage**
```
sum (rate (container_memory_usage_bytes{pod_name=~"pytorchjob-name-.*"}[1m])) by (pod_name)
```

**Network Usage**
```
sum (rate (container_network_transmit_bytes_total{pod_name=~"pytorchjob-name-.*"}[1m])) by (pod_name)
```

**I/O Usage**
```
sum (rate (container_fs_write_seconds_total{pod_name=~"pytorchjob-name-.*"}[1m])) by (pod_name)
```

**Keep-Alive check**
```
up
```
This is maintained by Prometheus on its own with its `up` metric detailed in the documentation [here](https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series).

**Is Leader check**
```
pytorch_operator_is_leader
```

*Note*: Replace `pytorchjob-name` with your own PyTorch Job name you want to monitor for the example queries above.

### Report PyTorch Job metrics:

**Job Creation**
```
pytorch_operator_jobs_created
```

**Job Creation**
```
sum (rate (pytorch_operator_jobs_created[60m]))
```

**Job Deletion**
```
pytorch_operator_jobs_deleted
```

**Successful Job Completions**
```
pytorch_operator_jobs_successful
```

**Failed Jobs**
```
pytorch_operator_jobs_failed
```

**Restarted Jobs**
```
pytorch_operator_jobs_restarted
```
9 changes: 9 additions & 0 deletions pkg/controller.v1/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"github.com/kubeflow/tf-operator/pkg/common/jobcontroller"
pylogger "github.com/kubeflow/tf-operator/pkg/logger"
"github.com/kubeflow/tf-operator/pkg/util/k8sutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
Expand All @@ -68,6 +70,11 @@ var (
ReconcilerSyncLoopPeriod: metav1.Duration{Duration: 15 * time.Second},
EnableGangScheduling: false,
}

pytorchJobsDeletedCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "pytorch_operator_jobs_deleted",
Help: "Counts number of PyTorch jobs deleted",
})
)

// PyTorchController is the type for PyTorchJob Controller, which manages
Expand Down Expand Up @@ -236,6 +243,7 @@ func (pc *PyTorchController) processNextWorkItem() bool {
if err != nil {
if err == errNotExists {
logger.Infof("PyTorchJob has been deleted: %v", key)
pytorchJobsDeletedCount.Inc()
return true
}

Expand Down Expand Up @@ -298,6 +306,7 @@ func (pc *PyTorchController) syncPyTorchJob(key string) (bool, error) {
if err != nil {
if err == errNotExists {
logger.Infof("PyTorchJob has been deleted: %v", key)
pytorchJobsDeletedCount.Inc()
// jm.expectations.DeleteExpectations(key)
return true, nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller.v1/pytorch/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ import (
common "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
pylogger "github.com/kubeflow/tf-operator/pkg/logger"
"github.com/kubeflow/tf-operator/pkg/util/k8sutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
failedMarshalPyTorchJobReason = "InvalidPyTorchJobSpec"
)

var (
pytorchJobsCreatedCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "pytorch_operator_jobs_created",
Help: "Counts number of PyTorch jobs created",
})
)

// When a pod is added, set the defaults and enqueue the current pytorchjob.
func (pc *PyTorchController) addPyTorchJob(obj interface{}) {
// Convert from unstructured object.
Expand Down Expand Up @@ -97,6 +106,7 @@ func (pc *PyTorchController) addPyTorchJob(obj interface{}) {
return
}
pc.enqueuePyTorchJob(obj)
pytorchJobsCreatedCount.Inc()
}

// When a pod is updated, enqueue the current pytorchjob.
Expand Down
21 changes: 21 additions & 0 deletions pkg/controller.v1/pytorch/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
pyv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1"
common "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
pylogger "github.com/kubeflow/tf-operator/pkg/logger"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
Expand All @@ -42,6 +44,21 @@ const (
pytorchJobRestartingReason = "PyTorchJobRestarting"
)

var (
pytorchJobsSuccessCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "pytorch_operator_jobs_successful",
Help: "Counts number of PyTorch jobs successful",
})
pytorchJobsFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "pytorch_operator_jobs_failed",
Help: "Counts number of PyTorch jobs failed",
})
pytorchJobsRestartCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "pytorch_operator_jobs_restarted",
Help: "Counts number of PyTorch jobs restarted",
})
)

// updateStatus updates the status of the job.
func (pc *PyTorchController) updateStatusSingle(job *pyv1.PyTorchJob, rtype pyv1.PyTorchReplicaType, replicas int, restart bool) error {
jobKey, err := KeyFunc(job)
Expand Down Expand Up @@ -91,6 +108,7 @@ func (pc *PyTorchController) updateStatusSingle(job *pyv1.PyTorchJob, rtype pyv1
pylogger.LoggerForJob(job).Infof("Append job condition error: %v", err)
return err
}
pytorchJobsSuccessCount.Inc()
}
}
} else {
Expand All @@ -107,6 +125,8 @@ func (pc *PyTorchController) updateStatusSingle(job *pyv1.PyTorchJob, rtype pyv1
pylogger.LoggerForJob(job).Infof("Append job condition error: %v", err)
return err
}
pytorchJobsFailureCount.Inc()
pytorchJobsRestartCount.Inc()
} else {
msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", job.Name, failed, rtype)
pc.Recorder.Event(job, v1.EventTypeNormal, pytorchJobFailedReason, msg)
Expand All @@ -119,6 +139,7 @@ func (pc *PyTorchController) updateStatusSingle(job *pyv1.PyTorchJob, rtype pyv1
pylogger.LoggerForJob(job).Infof("Append job condition error: %v", err)
return err
}
pytorchJobsFailureCount.Inc()
}
}
return nil
Expand Down
Loading

0 comments on commit f8d09ba

Please sign in to comment.