Skip to content

Commit

Permalink
Merge pull request #10 from ApplauseOSS/add-job-runs-support
Browse files Browse the repository at this point in the history
add support for run history
  • Loading branch information
tmad committed Jan 24, 2020
2 parents 04f4718 + 8e7da79 commit d107516
Show file tree
Hide file tree
Showing 13 changed files with 469 additions and 16 deletions.
1 change: 1 addition & 0 deletions example/kube-manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rules:
- ""
resources:
- namespaces
- pods
verbs:
- get
- list
Expand Down
37 changes: 27 additions & 10 deletions src/api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"fmt"

"github.com/applauseoss/metronomikon/helpers"
"github.com/applauseoss/metronomikon/kube"
"github.com/gin-gonic/gin"
Expand All @@ -21,8 +22,18 @@ func handleGetJobs(c *gin.Context) {
return
}
for _, job := range jobs {
tmp_job := helpers.CronJobKubernetesToMetronome(&job)
ret = append(ret, tmp_job)
metronomeJob := helpers.CronJobKubernetesToMetronome(&job)

var ginErrorMessage *helpers.GinErrorMessage
embed := c.Query("embed")
if embed != "" {
metronomeJob, ginErrorMessage = helpers.HandleGetJobEmbed(embed, metronomeJob)
if ginErrorMessage != nil {
JsonError(c, ginErrorMessage.HTTPCode, ginErrorMessage.Message)
}
}

ret = append(ret, metronomeJob)
}
}
c.JSON(200, ret)
Expand All @@ -39,13 +50,23 @@ func handleGetJob(c *gin.Context) {
JsonError(c, 500, err.Error())
return
}
job, err := kube.GetCronJob(namespace, name)
cronJob, err := kube.GetCronJob(namespace, name)
if err != nil {
JsonError(c, 404, fmt.Sprintf("cannot retrieve job: %s", err))
return
}
tmp_job := helpers.CronJobKubernetesToMetronome(job)
c.JSON(200, tmp_job)

metronomeJob := helpers.CronJobKubernetesToMetronome(cronJob)
var ginErrorMessage *helpers.GinErrorMessage
embed := c.Query("embed")
if embed != "" {
metronomeJob, ginErrorMessage = helpers.HandleGetJobEmbed(embed, metronomeJob)
if ginErrorMessage != nil {
JsonError(c, ginErrorMessage.HTTPCode, ginErrorMessage.Message)
}
}

c.JSON(200, metronomeJob)
}

func handleUpdateJob(c *gin.Context) {
Expand All @@ -62,11 +83,7 @@ func handleDeleteJob(c *gin.Context) {

job, err := kube.DeleteCronJob(namespace, name)
if job == nil {
var msg struct {
message string `json:message`
}
msg.message = fmt.Sprintf("Job '%s' does not exist", jobId)
c.JSON(404, msg)
c.JSON(404, gin.H{"message": fmt.Sprintf("Job '%s' does not exist", jobId)})
return
} else if err != nil {
JsonError(c, 500, fmt.Sprintf("failed to delete job: %s", err))
Expand Down
18 changes: 17 additions & 1 deletion src/api/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,29 @@ package api

import (
"fmt"

"github.com/applauseoss/metronomikon/helpers"
"github.com/applauseoss/metronomikon/kube"
"github.com/gin-gonic/gin"
)

func handleGetJobRuns(c *gin.Context) {
c.String(200, "TODO")
jobId := c.Param("jobid")
namespace, cronJobName, err := helpers.SplitMetronomeJobId(jobId)
if err != nil {
JsonError(c, 500, err.Error())
return
}
jobs, err := kube.GetJobsFromCronJob(namespace, cronJobName)
if err != nil {
JsonError(c, 500, err.Error())
return
}
res := []helpers.MetronomeJobRun{}
for _, job := range jobs {
res = append(res, *helpers.JobKubernetesToMetronome(&job))
}
c.JSON(200, res)
}

func handleTriggerJobRun(c *gin.Context) {
Expand Down
1 change: 1 addition & 0 deletions src/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2QiWkw0UV77qRpcH/JHPKGpKa2E8g=
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-gonic/gin v1.4.0 h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ=
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 h1:WSBJMqJbLxsn+bTCPyPYZfqHdJmc8MK4wrBjMft6BAM=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down
151 changes: 147 additions & 4 deletions src/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@ package helpers

import (
"fmt"
"strings"
"time"

"github.com/applauseoss/metronomikon/config"
"github.com/applauseoss/metronomikon/kube"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"strings"
corev1 "k8s.io/api/core/v1"
)

// MetronomeErrorMessage supports Metronome API fileds when returning error
type GinErrorMessage struct {
HTTPCode int
Message string `json:"message"`
}

// Struct for handling parsing/generation of Metronome job JSON schema
type MetronomeJob struct {
Id string `json:"id"`
Description string `json:"description,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Id string `json:"id"`
Description string `json:"description,omitempty"`
History *MetronomeJobHistory `json:"history,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Run struct {
Args []string `json:"args,omitempty"`
Artifacts []struct {
Expand Down Expand Up @@ -50,6 +61,24 @@ type MetronomeJob struct {
} `json:"run"`
}

// MetronomeJobHistory is used for embeding job history into MetronomeJob
type MetronomeJobHistory struct {
SuccessCount int `json:"successCount"`
FailureCount int `json:"failureCount"`
LastSuccessAt *string `json:"lastSuccessAt"`
LastFailureAt *string `json:"lastFailureAt"`
SuccessfulFinishedRuns []MetronomeJobHistoryEntry `json:"successfulFinishedRuns"`
FailedFinishedRuns []MetronomeJobHistoryEntry `json:"failedFinishedRuns"`
}

// MetronomeJobHistoryEntry represents single metronome job run
type MetronomeJobHistoryEntry struct {
ID string `json:"id"`
CreatedAt string `json:"createdAt"`
FinishedAt *string `json:"finishedAt"`
Tasks []string `json:"tasks"`
}

type MetronomeJobRun struct {
CompletedAt *string `json:"completedAt"` // we use a pointer so that we can get a null in the JSON if not populated
CreatedAt string `json:"createdAt"`
Expand Down Expand Up @@ -101,6 +130,120 @@ func JobKubernetesToMetronome(job *batchv1.Job) *MetronomeJobRun {
return ret
}

// GetMaxTime returns larger from given times, first is time
// second is string, given layout: 2006-01-02 15:04:05.999999999 -0700 MST
func GetMaxTime(time1 time.Time, time2 string) (*time.Time, error) {
dateTimeLayout := "2006-01-02 15:04:05.999999999 -0700 MST"
parsedTime2, err := time.Parse(dateTimeLayout, time2)
if err != nil {
return nil, err
}
if time1.After(parsedTime2) {
return &time1, nil
}
return &parsedTime2, nil
}

// HandleGetJobEmbed handles embed parameter in the query
func HandleGetJobEmbed(embed string, metronomeJob *MetronomeJob) (*MetronomeJob, *GinErrorMessage) {
namespace, name, err := SplitMetronomeJobId(metronomeJob.Id)
if err != nil {
return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()}
}
switch embed {
case "history":
jobs, err := kube.GetJobsFromCronJob(namespace, name)
if err != nil {
return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()}
}
pods, err := kube.GetPods(namespace, "Job")
if err != nil {
return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()}
}
metronomeJob, err = AppendHistoryToMetronomeFromKubeJobs(metronomeJob, jobs, pods)
if err != nil {
return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()}
}
return metronomeJob, nil
case "activeRuns":
return nil, &GinErrorMessage{HTTPCode: 200, Message: "TODO"}
case "schedules":
return nil, &GinErrorMessage{HTTPCode: 200, Message: "TODO"}
case "historySummary":
return nil, &GinErrorMessage{HTTPCode: 200, Message: "TODO"}
}
return nil, &GinErrorMessage{
HTTPCode: 404,
Message: "Unknown embed options. Valid options are: activeRuns, schedules, history, historySummary"}
}

// MatchKubeJobWithPods matches pod with its job, assuming jobId is formatted namespace.jobid
// pased on pods ownerReference
func MatchKubeJobWithPods(jobId string, pods []corev1.Pod) []string {
result := []string{}
for _, pod := range pods {
for _, ownerReference := range pod.ObjectMeta.GetOwnerReferences() {
if fmt.Sprintf("%s.%s", pod.ObjectMeta.Namespace, ownerReference.Name) == jobId {
result = append(result, fmt.Sprintf("%s.%s", pod.ObjectMeta.Namespace, pod.Name))
break
}
}
}
return result
}

// AppendHistoryToMetronomeFromKubeJobs appends job run history to existing MetronomeJob
func AppendHistoryToMetronomeFromKubeJobs(metronomeJob *MetronomeJob, kubeJobs []batchv1.Job, pods []corev1.Pod) (*MetronomeJob, error) {
failureCount := 0
successCount := 0
lastSuccessAtTime := time.Unix(0, 0)
lastFailureAtTime := time.Unix(0, 0)
jobHistory := MetronomeJobHistory{
SuccessfulFinishedRuns: []MetronomeJobHistoryEntry{},
FailedFinishedRuns: []MetronomeJobHistoryEntry{}}

for _, kubeJob := range kubeJobs {
metronomeJob := JobKubernetesToMetronome(&kubeJob)
jobHistoryEntry := MetronomeJobHistoryEntry{
ID: metronomeJob.Id,
CreatedAt: metronomeJob.CreatedAt,
FinishedAt: metronomeJob.CompletedAt,
Tasks: MatchKubeJobWithPods(metronomeJob.Id, pods)}

switch metronomeJob.Status {
case "COMPLETED":
jobHistory.SuccessfulFinishedRuns = append(jobHistory.SuccessfulFinishedRuns, jobHistoryEntry)
successCount++
successAtTime, err := GetMaxTime(lastSuccessAtTime, *jobHistoryEntry.FinishedAt)
if err != nil {
return nil, err
}
lastSuccessAtTime = *successAtTime
case "FAILED":
jobHistory.FailedFinishedRuns = append(jobHistory.FailedFinishedRuns, jobHistoryEntry)
failureCount++
failureAtTime, err := GetMaxTime(lastSuccessAtTime, *jobHistoryEntry.FinishedAt)
if err != nil {
return nil, err
}
lastFailureAtTime = *failureAtTime
}
}

jobHistory.SuccessCount = successCount
jobHistory.FailureCount = failureCount
if successCount > 0 {
lastSuccessAtString := lastSuccessAtTime.String()
jobHistory.LastSuccessAt = &lastSuccessAtString
}
if failureCount > 0 {
lastFailureAtString := lastFailureAtTime.String()
jobHistory.LastFailureAt = &lastFailureAtString
}
metronomeJob.History = &jobHistory
return metronomeJob, nil
}

// Split Metronome job ID into Kubernetes namespace and job name
func SplitMetronomeJobId(jobId string) (string, string, error) {
parts := strings.SplitN(jobId, ".", 2)
Expand Down
39 changes: 38 additions & 1 deletion src/kube/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package kube

import (
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
)

// Delete specific CronJob
Expand Down Expand Up @@ -43,6 +45,41 @@ func GetCronJob(namespace string, name string) (*batchv1beta1.CronJob, error) {
return job, nil
}

// GetPods returns array of pods where owner kind is given
func GetPods(namespace string, ownerKind string) ([]corev1.Pod, error) {
pods, err := client.CoreV1().Pods(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("could not get pods from job: %s", err)
}
result := []corev1.Pod{}
for _, pod := range pods.Items {
for _, ownerReference := range pod.ObjectMeta.GetOwnerReferences() {
if ownerReference.Kind == ownerKind {
result = append(result, pod)
break
}
}
}
return result, nil
}

func GetJobsFromCronJob(namespace string, cronJobName string) ([]batchv1.Job, error) {
jobs, err := client.BatchV1().Jobs(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("could not get jobs from cronjob: %s", err)
}
res := []batchv1.Job{}
for _, job := range jobs.Items {
for _, ownerReference := range job.ObjectMeta.GetOwnerReferences() {
if ownerReference.Name == cronJobName {
res = append(res, job)
break
}
}
}
return res, nil
}

// Create Job from CronJob
func CreateJobFromCronjob(cronJob *batchv1beta1.CronJob) (*batchv1.Job, error) {
// This duplicates the logic used by kubectl to create a Job from a CronJob
Expand Down
11 changes: 11 additions & 0 deletions test/run-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,18 @@

def compare_structures(data1, data2, use_regex=False):
if type(data1) != type(data2):
if use_regex and isinstance(data1, int) and isinstance(data2, string_types):
if re.match(data2, str(data1)) is None:
return False
return True
return False
if isinstance(data1, list):
if any(["METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" in i for i in data1 + data2]):
for idx1 in range(0, len(data1)):
for idx2 in range(0, len(data2)):
if compare_structures(data1[idx1], data2[idx2], use_regex=use_regex):
return True
return False
if len(data1) != len(data2):
return False
for idx in range(0, len(data1)):
Expand Down Expand Up @@ -104,3 +114,4 @@ def perform_request(test_data, url):
continue
else:
sys.exit(1)
print('Test succeded')
9 changes: 9 additions & 0 deletions test/tests/07_runjob_test1_hello1/metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
"useRegex": true,
"retries": 10,
"responseJsonFile": "response2.json"
},
{
"name": "runjob_test1_hello1_runsget",
"urlPath": "/v1/jobs/metronomikon-test1.hello1/runs",
"method": "GET",
"responseCode": 200,
"useRegex": true,
"preserveKeys": true,
"responseJsonFile": "response3.json"
}
]
}
Loading

0 comments on commit d107516

Please sign in to comment.