From 8e7da79f0c2fc197beae813c0dc6de1fdb0a2b62 Mon Sep 17 00:00:00 2001 From: Tomasz Madycki Date: Thu, 23 Jan 2020 11:47:23 +0100 Subject: [PATCH] add support for run history --- example/kube-manifest.yaml | 1 + src/api/jobs.go | 37 +++-- src/api/runs.go | 18 ++- src/go.sum | 1 + src/helpers/helpers.go | 151 +++++++++++++++++- src/kube/jobs.go | 39 ++++- test/run-test.py | 11 ++ .../07_runjob_test1_hello1/metadata.json | 9 ++ .../07_runjob_test1_hello1/response3.json | 11 ++ .../metadata.json | 12 ++ .../response.json | 36 +++++ test/tests/09_getjobshistory/metadata.json | 13 ++ test/tests/09_getjobshistory/response.json | 146 +++++++++++++++++ 13 files changed, 469 insertions(+), 16 deletions(-) create mode 100644 test/tests/07_runjob_test1_hello1/response3.json create mode 100644 test/tests/08_getjobhistory_test1_hello1/metadata.json create mode 100644 test/tests/08_getjobhistory_test1_hello1/response.json create mode 100644 test/tests/09_getjobshistory/metadata.json create mode 100644 test/tests/09_getjobshistory/response.json diff --git a/example/kube-manifest.yaml b/example/kube-manifest.yaml index e571159..8702315 100644 --- a/example/kube-manifest.yaml +++ b/example/kube-manifest.yaml @@ -8,6 +8,7 @@ rules: - "" resources: - namespaces + - pods verbs: - get - list diff --git a/src/api/jobs.go b/src/api/jobs.go index db0d6b1..286783a 100644 --- a/src/api/jobs.go +++ b/src/api/jobs.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "github.com/applauseoss/metronomikon/helpers" "github.com/applauseoss/metronomikon/kube" "github.com/gin-gonic/gin" @@ -21,8 +22,18 @@ func handleGetJobs(c *gin.Context) { return } for _, job := range jobs { - tmp_job := helpers.CronJobKubernetesToMetronome(&job) - ret = append(ret, tmp_job) + metronomeJob := helpers.CronJobKubernetesToMetronome(&job) + + var ginErrorMessage *helpers.GinErrorMessage + embed := c.Query("embed") + if embed != "" { + metronomeJob, ginErrorMessage = helpers.HandleGetJobEmbed(embed, metronomeJob) + if ginErrorMessage != nil { + JsonError(c, ginErrorMessage.HTTPCode, ginErrorMessage.Message) + } + } + + ret = append(ret, metronomeJob) } } c.JSON(200, ret) @@ -39,13 +50,23 @@ func handleGetJob(c *gin.Context) { JsonError(c, 500, err.Error()) return } - job, err := kube.GetCronJob(namespace, name) + cronJob, err := kube.GetCronJob(namespace, name) if err != nil { JsonError(c, 404, fmt.Sprintf("cannot retrieve job: %s", err)) return } - tmp_job := helpers.CronJobKubernetesToMetronome(job) - c.JSON(200, tmp_job) + + metronomeJob := helpers.CronJobKubernetesToMetronome(cronJob) + var ginErrorMessage *helpers.GinErrorMessage + embed := c.Query("embed") + if embed != "" { + metronomeJob, ginErrorMessage = helpers.HandleGetJobEmbed(embed, metronomeJob) + if ginErrorMessage != nil { + JsonError(c, ginErrorMessage.HTTPCode, ginErrorMessage.Message) + } + } + + c.JSON(200, metronomeJob) } func handleUpdateJob(c *gin.Context) { @@ -62,11 +83,7 @@ func handleDeleteJob(c *gin.Context) { job, err := kube.DeleteCronJob(namespace, name) if job == nil { - var msg struct { - message string `json:message` - } - msg.message = fmt.Sprintf("Job '%s' does not exist", jobId) - c.JSON(404, msg) + c.JSON(404, gin.H{"message": fmt.Sprintf("Job '%s' does not exist", jobId)}) return } else if err != nil { JsonError(c, 500, fmt.Sprintf("failed to delete job: %s", err)) diff --git a/src/api/runs.go b/src/api/runs.go index b2e04c8..d5d49b3 100644 --- a/src/api/runs.go +++ b/src/api/runs.go @@ -2,13 +2,29 @@ package api import ( "fmt" + "github.com/applauseoss/metronomikon/helpers" "github.com/applauseoss/metronomikon/kube" "github.com/gin-gonic/gin" ) func handleGetJobRuns(c *gin.Context) { - c.String(200, "TODO") + jobId := c.Param("jobid") + namespace, cronJobName, err := helpers.SplitMetronomeJobId(jobId) + if err != nil { + JsonError(c, 500, err.Error()) + return + } + jobs, err := kube.GetJobsFromCronJob(namespace, cronJobName) + if err != nil { + JsonError(c, 500, err.Error()) + return + } + res := []helpers.MetronomeJobRun{} + for _, job := range jobs { + res = append(res, *helpers.JobKubernetesToMetronome(&job)) + } + c.JSON(200, res) } func handleTriggerJobRun(c *gin.Context) { diff --git a/src/go.sum b/src/go.sum index 85594b6..1032940 100644 --- a/src/go.sum +++ b/src/go.sum @@ -10,6 +10,7 @@ github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2QiWkw0UV77qRpcH/JHPKGpKa2E8g= github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= +github.com/gin-gonic/gin v1.4.0 h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ= github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 h1:WSBJMqJbLxsn+bTCPyPYZfqHdJmc8MK4wrBjMft6BAM= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/src/helpers/helpers.go b/src/helpers/helpers.go index 23e5ba6..7321bdd 100644 --- a/src/helpers/helpers.go +++ b/src/helpers/helpers.go @@ -2,17 +2,28 @@ package helpers import ( "fmt" + "strings" + "time" + "github.com/applauseoss/metronomikon/config" + "github.com/applauseoss/metronomikon/kube" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" - "strings" + corev1 "k8s.io/api/core/v1" ) +// MetronomeErrorMessage supports Metronome API fileds when returning error +type GinErrorMessage struct { + HTTPCode int + Message string `json:"message"` +} + // Struct for handling parsing/generation of Metronome job JSON schema type MetronomeJob struct { - Id string `json:"id"` - Description string `json:"description,omitempty"` - Labels map[string]string `json:"labels,omitempty"` + Id string `json:"id"` + Description string `json:"description,omitempty"` + History *MetronomeJobHistory `json:"history,omitempty"` + Labels map[string]string `json:"labels,omitempty"` Run struct { Args []string `json:"args,omitempty"` Artifacts []struct { @@ -50,6 +61,24 @@ type MetronomeJob struct { } `json:"run"` } +// MetronomeJobHistory is used for embeding job history into MetronomeJob +type MetronomeJobHistory struct { + SuccessCount int `json:"successCount"` + FailureCount int `json:"failureCount"` + LastSuccessAt *string `json:"lastSuccessAt"` + LastFailureAt *string `json:"lastFailureAt"` + SuccessfulFinishedRuns []MetronomeJobHistoryEntry `json:"successfulFinishedRuns"` + FailedFinishedRuns []MetronomeJobHistoryEntry `json:"failedFinishedRuns"` +} + +// MetronomeJobHistoryEntry represents single metronome job run +type MetronomeJobHistoryEntry struct { + ID string `json:"id"` + CreatedAt string `json:"createdAt"` + FinishedAt *string `json:"finishedAt"` + Tasks []string `json:"tasks"` +} + type MetronomeJobRun struct { CompletedAt *string `json:"completedAt"` // we use a pointer so that we can get a null in the JSON if not populated CreatedAt string `json:"createdAt"` @@ -101,6 +130,120 @@ func JobKubernetesToMetronome(job *batchv1.Job) *MetronomeJobRun { return ret } +// GetMaxTime returns larger from given times, first is time +// second is string, given layout: 2006-01-02 15:04:05.999999999 -0700 MST +func GetMaxTime(time1 time.Time, time2 string) (*time.Time, error) { + dateTimeLayout := "2006-01-02 15:04:05.999999999 -0700 MST" + parsedTime2, err := time.Parse(dateTimeLayout, time2) + if err != nil { + return nil, err + } + if time1.After(parsedTime2) { + return &time1, nil + } + return &parsedTime2, nil +} + +// HandleGetJobEmbed handles embed parameter in the query +func HandleGetJobEmbed(embed string, metronomeJob *MetronomeJob) (*MetronomeJob, *GinErrorMessage) { + namespace, name, err := SplitMetronomeJobId(metronomeJob.Id) + if err != nil { + return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()} + } + switch embed { + case "history": + jobs, err := kube.GetJobsFromCronJob(namespace, name) + if err != nil { + return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()} + } + pods, err := kube.GetPods(namespace, "Job") + if err != nil { + return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()} + } + metronomeJob, err = AppendHistoryToMetronomeFromKubeJobs(metronomeJob, jobs, pods) + if err != nil { + return nil, &GinErrorMessage{HTTPCode: 500, Message: err.Error()} + } + return metronomeJob, nil + case "activeRuns": + return nil, &GinErrorMessage{HTTPCode: 200, Message: "TODO"} + case "schedules": + return nil, &GinErrorMessage{HTTPCode: 200, Message: "TODO"} + case "historySummary": + return nil, &GinErrorMessage{HTTPCode: 200, Message: "TODO"} + } + return nil, &GinErrorMessage{ + HTTPCode: 404, + Message: "Unknown embed options. Valid options are: activeRuns, schedules, history, historySummary"} +} + +// MatchKubeJobWithPods matches pod with its job, assuming jobId is formatted namespace.jobid +// pased on pods ownerReference +func MatchKubeJobWithPods(jobId string, pods []corev1.Pod) []string { + result := []string{} + for _, pod := range pods { + for _, ownerReference := range pod.ObjectMeta.GetOwnerReferences() { + if fmt.Sprintf("%s.%s", pod.ObjectMeta.Namespace, ownerReference.Name) == jobId { + result = append(result, fmt.Sprintf("%s.%s", pod.ObjectMeta.Namespace, pod.Name)) + break + } + } + } + return result +} + +// AppendHistoryToMetronomeFromKubeJobs appends job run history to existing MetronomeJob +func AppendHistoryToMetronomeFromKubeJobs(metronomeJob *MetronomeJob, kubeJobs []batchv1.Job, pods []corev1.Pod) (*MetronomeJob, error) { + failureCount := 0 + successCount := 0 + lastSuccessAtTime := time.Unix(0, 0) + lastFailureAtTime := time.Unix(0, 0) + jobHistory := MetronomeJobHistory{ + SuccessfulFinishedRuns: []MetronomeJobHistoryEntry{}, + FailedFinishedRuns: []MetronomeJobHistoryEntry{}} + + for _, kubeJob := range kubeJobs { + metronomeJob := JobKubernetesToMetronome(&kubeJob) + jobHistoryEntry := MetronomeJobHistoryEntry{ + ID: metronomeJob.Id, + CreatedAt: metronomeJob.CreatedAt, + FinishedAt: metronomeJob.CompletedAt, + Tasks: MatchKubeJobWithPods(metronomeJob.Id, pods)} + + switch metronomeJob.Status { + case "COMPLETED": + jobHistory.SuccessfulFinishedRuns = append(jobHistory.SuccessfulFinishedRuns, jobHistoryEntry) + successCount++ + successAtTime, err := GetMaxTime(lastSuccessAtTime, *jobHistoryEntry.FinishedAt) + if err != nil { + return nil, err + } + lastSuccessAtTime = *successAtTime + case "FAILED": + jobHistory.FailedFinishedRuns = append(jobHistory.FailedFinishedRuns, jobHistoryEntry) + failureCount++ + failureAtTime, err := GetMaxTime(lastSuccessAtTime, *jobHistoryEntry.FinishedAt) + if err != nil { + return nil, err + } + lastFailureAtTime = *failureAtTime + } + } + + jobHistory.SuccessCount = successCount + jobHistory.FailureCount = failureCount + if successCount > 0 { + lastSuccessAtString := lastSuccessAtTime.String() + jobHistory.LastSuccessAt = &lastSuccessAtString + } + if failureCount > 0 { + lastFailureAtString := lastFailureAtTime.String() + jobHistory.LastFailureAt = &lastFailureAtString + } + metronomeJob.History = &jobHistory + return metronomeJob, nil +} + // Split Metronome job ID into Kubernetes namespace and job name func SplitMetronomeJobId(jobId string) (string, string, error) { parts := strings.SplitN(jobId, ".", 2) diff --git a/src/kube/jobs.go b/src/kube/jobs.go index 7e05278..82aad56 100644 --- a/src/kube/jobs.go +++ b/src/kube/jobs.go @@ -2,11 +2,13 @@ package kube import ( "fmt" + "time" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "time" ) // Delete specific CronJob @@ -43,6 +45,41 @@ func GetCronJob(namespace string, name string) (*batchv1beta1.CronJob, error) { return job, nil } +// GetPods returns array of pods where owner kind is given +func GetPods(namespace string, ownerKind string) ([]corev1.Pod, error) { + pods, err := client.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("could not get pods from job: %s", err) + } + result := []corev1.Pod{} + for _, pod := range pods.Items { + for _, ownerReference := range pod.ObjectMeta.GetOwnerReferences() { + if ownerReference.Kind == ownerKind { + result = append(result, pod) + break + } + } + } + return result, nil +} + +func GetJobsFromCronJob(namespace string, cronJobName string) ([]batchv1.Job, error) { + jobs, err := client.BatchV1().Jobs(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("could not get jobs from cronjob: %s", err) + } + res := []batchv1.Job{} + for _, job := range jobs.Items { + for _, ownerReference := range job.ObjectMeta.GetOwnerReferences() { + if ownerReference.Name == cronJobName { + res = append(res, job) + break + } + } + } + return res, nil +} + // Create Job from CronJob func CreateJobFromCronjob(cronJob *batchv1beta1.CronJob) (*batchv1.Job, error) { // This duplicates the logic used by kubectl to create a Job from a CronJob diff --git a/test/run-test.py b/test/run-test.py index 5a0c8f6..984ace9 100755 --- a/test/run-test.py +++ b/test/run-test.py @@ -21,8 +21,18 @@ def compare_structures(data1, data2, use_regex=False): if type(data1) != type(data2): + if use_regex and isinstance(data1, int) and isinstance(data2, string_types): + if re.match(data2, str(data1)) is None: + return False + return True return False if isinstance(data1, list): + if any(["METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" in i for i in data1 + data2]): + for idx1 in range(0, len(data1)): + for idx2 in range(0, len(data2)): + if compare_structures(data1[idx1], data2[idx2], use_regex=use_regex): + return True + return False if len(data1) != len(data2): return False for idx in range(0, len(data1)): @@ -104,3 +114,4 @@ def perform_request(test_data, url): continue else: sys.exit(1) +print('Test succeded') diff --git a/test/tests/07_runjob_test1_hello1/metadata.json b/test/tests/07_runjob_test1_hello1/metadata.json index 7b75f01..b1391db 100644 --- a/test/tests/07_runjob_test1_hello1/metadata.json +++ b/test/tests/07_runjob_test1_hello1/metadata.json @@ -17,6 +17,15 @@ "useRegex": true, "retries": 10, "responseJsonFile": "response2.json" + }, + { + "name": "runjob_test1_hello1_runsget", + "urlPath": "/v1/jobs/metronomikon-test1.hello1/runs", + "method": "GET", + "responseCode": 200, + "useRegex": true, + "preserveKeys": true, + "responseJsonFile": "response3.json" } ] } diff --git a/test/tests/07_runjob_test1_hello1/response3.json b/test/tests/07_runjob_test1_hello1/response3.json new file mode 100644 index 0000000..54b3b2d --- /dev/null +++ b/test/tests/07_runjob_test1_hello1/response3.json @@ -0,0 +1,11 @@ +[ + { + "completedAt": "20.*", + "createdAt": "20.*", + "id": "metronomikon-test1.hello1-[0-9]+", + "jobId": "metronomikon-test1.hello1", + "status": "COMPLETED", + "tasks": [] + }, + "METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" +] diff --git a/test/tests/08_getjobhistory_test1_hello1/metadata.json b/test/tests/08_getjobhistory_test1_hello1/metadata.json new file mode 100644 index 0000000..fe5d05f --- /dev/null +++ b/test/tests/08_getjobhistory_test1_hello1/metadata.json @@ -0,0 +1,12 @@ +{ + "steps": [ + { + "name": "getjobhistory_test1_hello1", + "urlPath": "/v1/jobs/metronomikon-test1.hello1?embed=history", + "method": "GET", + "responseCode": 200, + "useRegex": true, + "responseJsonFile": "response.json" + } + ] +} diff --git a/test/tests/08_getjobhistory_test1_hello1/response.json b/test/tests/08_getjobhistory_test1_hello1/response.json new file mode 100644 index 0000000..009908c --- /dev/null +++ b/test/tests/08_getjobhistory_test1_hello1/response.json @@ -0,0 +1,36 @@ +{ + "id": "metronomikon-test1.hello1", + "history": { + "successCount": "[1-9][0-9]*", + "failureCount": 0, + "lastSuccessAt": "20.*", + "lastFailureAt": null, + "successfulFinishedRuns": [ + { + "id": "metronomikon-test1.hello1-[0-9]+", + "createdAt": "20.*", + "finishedAt": "20.*", + "tasks": [ + "metronomikon-test1.hello1-[0-9]+-.+" + ] + }, + "METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" + ], + "failedFinishedRuns": [] + }, + "run": { + "args": [ + "/bin/sh", + "-c", + "date; echo Hello from the Kubernetes cluster hello1" + ], + "cpus": 1, + "disk": 5, + "docker": { + "image": "busybox" + }, + "mem": 512, + "placement": {}, + "restart": {} + } +} diff --git a/test/tests/09_getjobshistory/metadata.json b/test/tests/09_getjobshistory/metadata.json new file mode 100644 index 0000000..b3ccae4 --- /dev/null +++ b/test/tests/09_getjobshistory/metadata.json @@ -0,0 +1,13 @@ +{ + "steps": [ + { + "name": "getjobhistory", + "urlPath": "/v1/jobs?embed=history", + "method": "GET", + "responseCode": 200, + "useRegex": true, + "responseJsonFile": "response.json", + "retries": 60 + } + ] +} diff --git a/test/tests/09_getjobshistory/response.json b/test/tests/09_getjobshistory/response.json new file mode 100644 index 0000000..3dbd35c --- /dev/null +++ b/test/tests/09_getjobshistory/response.json @@ -0,0 +1,146 @@ +[ + { + "id": "metronomikon-test1.hello1", + "history": { + "successCount": "[1-9][0-9]*", + "failureCount": 0, + "lastSuccessAt": "20.*", + "lastFailureAt": null, + "successfulFinishedRuns": [ + { + "id": "metronomikon-test1.hello1-[0-9]+", + "createdAt": "20.*", + "finishedAt": "20.*", + "tasks": [ + "metronomikon-test1.hello1-[0-9]+-.+" + ] + }, + "METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" + ], + "failedFinishedRuns": [] + }, + "run": { + "args": [ + "/bin/sh", + "-c", + "date; echo Hello from the Kubernetes cluster hello1" + ], + "cpus": 1, + "disk": 5, + "docker": { + "image": "busybox" + }, + "mem": 512, + "placement": {}, + "restart": {} + } + }, + { + "id": "metronomikon-test1.hello2", + "history": { + "successCount": "[1-9][0-9]*", + "failureCount": 0, + "lastSuccessAt": "20.*", + "lastFailureAt": null, + "successfulFinishedRuns": [ + { + "id": "metronomikon-test1.hello2-[0-9]+", + "createdAt": "20.*", + "finishedAt": "20.*", + "tasks": [ + "metronomikon-test1.hello2-[0-9]+-.+" + ] + }, + "METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" + ], + "failedFinishedRuns": [] + }, + "run": { + "args": [ + "/bin/sh", + "-c", + "date; echo Hello from the Kubernetes cluster hello2" + ], + "cpus": 1, + "disk": 5, + "docker": { + "image": "busybox" + }, + "mem": 512, + "placement": {}, + "restart": {} + } + }, + { + "id": "metronomikon-test2.hello3", + "history": { + "successCount": "[1-9][0-9]*", + "failureCount": 0, + "lastSuccessAt": "20.*", + "lastFailureAt": null, + "successfulFinishedRuns": [ + { + "id": "metronomikon-test2.hello3-[0-9]+", + "createdAt": "20.*", + "finishedAt": "20.*", + "tasks": [ + "metronomikon-test2.hello3-[0-9]+-.+" + ] + }, + "METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" + ], + "failedFinishedRuns": [] + }, + "run": { + "args": [ + "/bin/sh", + "-c", + "date; echo Hello from the Kubernetes cluster hello3" + ], + "cpus": 1, + "disk": 5, + "docker": { + "image": "busybox" + }, + "mem": 512, + "placement": {}, + "restart": {} + } + }, + { + "id": "metronomikon-test2.hello4", + "history": { + "successCount": "[1-9][0-9]*", + "failureCount": 0, + "lastSuccessAt": "20.*", + "lastFailureAt": null, + "successfulFinishedRuns": [ + { + "id": "metronomikon-test2.hello4-[0-9]+", + "createdAt": "20.*", + "finishedAt": "20.*", + "tasks": [ + "metronomikon-test2.hello4-[0-9]+-.+" + ] + }, + "METRONOMIKON_TEST_REQUIRE_ONE_LIST_MATCH_ONLY" + ], + "failedFinishedRuns": [] + }, + "run": { + "args": [ + "/bin/sh", + "-c", + "date; echo Hello from the Kubernetes cluster hello4" + ], + "cpus": 1, + "disk": 5, + "docker": { + "image": "busybox" + }, + "mem": 512, + "placement": {}, + "restart": {} + } + } +]