Skip to content

Commit

Permalink
Support pod conditions for pending job count calculation
Browse files Browse the repository at this point in the history
Signed-off-by: Yaron Yarimi <yaron.yarimi@env0.com>
  • Loading branch information
yaronya committed Aug 1, 2021
1 parent 03d0f98 commit bd0163d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945))
- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872))
- Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958))
- Support pod conditions for pending job count calculation ([#1970](https://github.com/kedacore/keda/pull/1970))

### Improvements

Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type ScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"`
// +optional
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
// +optional
PendingPodConditions []string `json:"pendingPodConditions,omitempty"`
}

func init() {
Expand Down
19 changes: 15 additions & 4 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (e *scaleExecutor) getRunningJobCount(scaledJob *kedav1alpha1.ScaledJob) in
return runningJobs
}

func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool {
func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job, s *kedav1alpha1.ScalingStrategy) bool {
opts := []client.ListOption{
client.InNamespace(j.GetNamespace()),
client.MatchingLabels(map[string]string{"job-name": j.GetName()}),
Expand All @@ -176,9 +176,20 @@ func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool {
}

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return true
if len(s.PendingPodConditions) > 0 {
for _, pendingConditionType := range s.PendingPodConditions {
for _, podCondition := range pod.Status.Conditions {
if string(podCondition.Type) == pendingConditionType && podCondition.Status == corev1.ConditionTrue {
return true
}
}
}
} else {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return true
}
}

}

return false
Expand All @@ -201,7 +212,7 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in

for _, job := range jobs.Items {
job := job
if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job) {
if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job, &scaledJob.Spec.ScalingStrategy) {
pendingJobs++
}
}
Expand Down

0 comments on commit bd0163d

Please sign in to comment.