Skip to content

Commit

Permalink
Added webhook_notifications support to databricks_job (databricks…
Browse files Browse the repository at this point in the history
…#1674)

* Add support for job webhooks
* Do not sort empty webhooks
* Move logic to WebhookNotifications receiver
  • Loading branch information
akolar-db committed Oct 21, 2022
1 parent 3567a0a commit 63545f6
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 5 deletions.
42 changes: 37 additions & 5 deletions jobs/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type DbtTask struct {
WarehouseId string `json:"warehouse_id,omitempty"`
}

// EmailNotifications contains the information for email notifications after job completion
// EmailNotifications contains the information for email notifications after job or task run start or completion
type EmailNotifications struct {
OnStart []string `json:"on_start,omitempty"`
OnSuccess []string `json:"on_success,omitempty"`
Expand All @@ -97,6 +97,31 @@ type EmailNotifications struct {
AlertOnLastAttempt bool `json:"alert_on_last_attempt,omitempty"`
}

// WebhookNotifications contains the information for webhook notifications sent after job start or completion.
type WebhookNotifications struct {
OnStart []Webhook `json:"on_start,omitempty"`
OnSuccess []Webhook `json:"on_success,omitempty"`
OnFailure []Webhook `json:"on_failure,omitempty"`
}

func (wn *WebhookNotifications) Sort() {
if wn == nil {
return
}

notifs := [][]Webhook{wn.OnStart, wn.OnFailure, wn.OnSuccess}
for _, ns := range notifs {
sort.Slice(ns, func(i, j int) bool {
return ns[i].ID < ns[j].ID
})
}
}

// Webhook contains a reference by id to one of the centrally configured webhooks.
type Webhook struct {
ID string `json:"id"`
}

// CronSchedule contains the information for the quartz cron expression
type CronSchedule struct {
QuartzCronExpression string `json:"quartz_cron_expression"`
Expand Down Expand Up @@ -179,10 +204,11 @@ type JobSettings struct {
GitSource *GitSource `json:"git_source,omitempty"`
// END Jobs + Repo integration preview

Schedule *CronSchedule `json:"schedule,omitempty"`
MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty"`
EmailNotifications *EmailNotifications `json:"email_notifications,omitempty" tf:"suppress_diff"`
Tags map[string]string `json:"tags,omitempty"`
Schedule *CronSchedule `json:"schedule,omitempty"`
MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty"`
EmailNotifications *EmailNotifications `json:"email_notifications,omitempty" tf:"suppress_diff"`
WebhookNotifications *WebhookNotifications `json:"webhook_notifications,omitempty" tf:"suppress_diff"`
Tags map[string]string `json:"tags,omitempty"`
}

func (js *JobSettings) isMultiTask() bool {
Expand All @@ -195,6 +221,10 @@ func (js *JobSettings) sortTasksByKey() {
})
}

func (js *JobSettings) sortWebhooksByID() {
js.WebhookNotifications.Sort()
}

// JobList returns a list of all jobs
type JobList struct {
Jobs []Job `json:"jobs"`
Expand Down Expand Up @@ -381,6 +411,7 @@ func (a JobsAPI) Restart(id string, timeout time.Duration) error {
func (a JobsAPI) Create(jobSettings JobSettings) (Job, error) {
var job Job
jobSettings.sortTasksByKey()
jobSettings.sortWebhooksByID()
var gitSource *GitSource = jobSettings.GitSource
if gitSource != nil && gitSource.Provider == "" {
gitSource.Provider = repos.GetGitProviderFromUrl(gitSource.Url)
Expand Down Expand Up @@ -418,6 +449,7 @@ func (a JobsAPI) Read(id string) (job Job, err error) {
}, &job), id)
if job.Settings != nil {
job.Settings.sortTasksByKey()
job.Settings.sortWebhooksByID()
}
return
}
Expand Down
87 changes: 87 additions & 0 deletions jobs/resource_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,93 @@ func TestResourceJobCreateNWorkers(t *testing.T) {
assert.Equal(t, "789", d.Id())
}

func TestResourceJobCreateWithWebhooks(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.0/jobs/create",
ExpectedRequest: JobSettings{
ExistingClusterID: "abc",
MaxConcurrentRuns: 1,
SparkJarTask: &SparkJarTask{
MainClassName: "com.labs.BarMain",
},
Libraries: []libraries.Library{
{
Jar: "dbfs://aa/bb/cc.jar",
},
},
Name: "Featurizer",
WebhookNotifications: &WebhookNotifications{
OnStart: []Webhook{{ID: "id1"}, {ID: "id2"}, {ID: "id3"}},
OnSuccess: []Webhook{{ID: "id2"}},
OnFailure: []Webhook{{ID: "id3"}},
},
},
Response: Job{
JobID: 789,
},
},
{
Method: "GET",
Resource: "/api/2.0/jobs/get?job_id=789",
Response: Job{
JobID: 789,
Settings: &JobSettings{
ExistingClusterID: "abc",
MaxConcurrentRuns: 1,
SparkJarTask: &SparkJarTask{
MainClassName: "com.labs.BarMain",
},
Libraries: []libraries.Library{
{
Jar: "dbfs://aa/bb/cc.jar",
},
},
Name: "Featurizer",
WebhookNotifications: &WebhookNotifications{
OnStart: []Webhook{{ID: "id1"}, {ID: "id2"}, {ID: "id3"}},
OnSuccess: []Webhook{{ID: "id2"}},
OnFailure: []Webhook{{ID: "id3"}},
},
},
},
},
},
Create: true,
Resource: ResourceJob(),
HCL: `existing_cluster_id = "abc"
name = "Featurizer"
max_concurrent_runs = 1
spark_jar_task {
main_class_name = "com.labs.BarMain"
}
library {
jar = "dbfs://aa/bb/cc.jar"
}
webhook_notifications {
on_start {
id = "id3"
}
on_start {
id = "id1"
}
on_start {
id = "id2"
}
on_success {
id = "id2"
}
on_failure {
id = "id3"
}
}`,
}.Apply(t)
assert.NoError(t, err, err)
assert.Equal(t, "789", d.Id())
}

func TestResourceJobCreateFromGitSource(t *testing.T) {
qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
Expand Down

0 comments on commit 63545f6

Please sign in to comment.