Skip to content

Commit

Permalink
Support for Single Node Clusters #411 #454
Browse files Browse the repository at this point in the history
  • Loading branch information
alexott committed Jan 22, 2021
1 parent 7a01703 commit dacd91d
Show file tree
Hide file tree
Showing 24 changed files with 629 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* `databricks_me` data source was added to represent `user_name`, `home` & `id` of the caller user (or service principal).
* Added validation for secret scope name in `databricks_secret`, `databricks_secret_scope` and `databricks_secret_acl`. Non-compliant names may cause errors.
* Added [databricks_spark_version](https://github.com/databrickslabs/terraform-provider-databricks/issues/347) data source.
* Fixed support for [single node clusters](https://docs.databricks.com/clusters/single-node.html) support by allowing [`num_workers` to be `0`](https://github.com/databrickslabs/terraform-provider-databricks/pull/454).

**Behavior changes**

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ snapshot:
@echo "✓ Making Snapshot ..."
@goreleaser release --rm-dist --snapshot

.PHONY: build fmt python-setup docs vendor build fmt coverage test lint
.PHONY: build fmt python-setup docs vendor build fmt coverage test lint
10 changes: 2 additions & 8 deletions access/acceptance/permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ func TestAccDatabricksPermissionsResourceFullLifecycle(t *testing.T) {
{
Config: fmt.Sprintf(`
resource "databricks_notebook" "this" {
content = base64encode("# Databricks notebook source\nprint(1)")
content_base64 = base64encode("# Databricks notebook source\nprint(1)")
path = "/Beginning/%[1]s/Init"
overwrite = true
mkdirs = true
language = "PYTHON"
format = "SOURCE"
}
resource "databricks_group" "first" {
display_name = "First %[1]s"
Expand Down Expand Up @@ -55,12 +52,9 @@ func TestAccDatabricksPermissionsResourceFullLifecycle(t *testing.T) {
{
Config: fmt.Sprintf(`
resource "databricks_notebook" "this" {
content = base64encode("# Databricks notebook source\nprint(1)")
content_base64 = base64encode("# Databricks notebook source\nprint(1)")
path = "/Beginning/%[1]s/Init"
overwrite = true
mkdirs = true
language = "PYTHON"
format = "SOURCE"
}
resource "databricks_group" "first" {
display_name = "First %[1]s"
Expand Down
6 changes: 3 additions & 3 deletions access/acceptance/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func TestAccSecretResource(t *testing.T) {
},
Config: config,
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("databricks_secret.my_secret", "scope", scope),
resource.TestCheckResourceAttr("databricks_secret.my_secret", "key", key),
resource.TestCheckResourceAttr("databricks_secret.my_secret", "string_value", secret),
resource.TestCheckResourceAttr("databricks_secret.this", "scope", scope),
resource.TestCheckResourceAttr("databricks_secret.this", "key", key),
resource.TestCheckResourceAttr("databricks_secret.this", "string_value", secret),
),
},
},
Expand Down
59 changes: 49 additions & 10 deletions compute/acceptance/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ resource "databricks_instance_pool" "%[1]s" {
%[3]s
idle_instance_autotermination_minutes = 10
disk_spec {
%[4]s
disk_type {
%[4]s
}
disk_size = 80
disk_count = 1
}
Expand Down Expand Up @@ -106,14 +108,18 @@ func (i *instancePoolHCLBuilder) withAwsAttributes(attributesMap map[string]stri

func getCommonLibraries() string {
return `
library_maven {
coordinates = "org.jsoup:jsoup:1.7.2"
repo = "https://mavencentral.org"
exclusions = ["slf4j:slf4j"]
library {
maven {
coordinates = "org.jsoup:jsoup:1.7.2"
repo = "https://mavencentral.org"
exclusions = ["slf4j:slf4j"]
}
}
library_pypi {
package = "faker"
repo = "https://pypi.org"
library {
pypi {
package = "Faker"
repo = "https://pypi.org/simple"
}
}
library {
pypi {
Expand Down Expand Up @@ -172,11 +178,13 @@ func (c *clusterHCLBuilder) withCloudDiskSpec() *clusterHCLBuilder {
}

func (c *clusterHCLBuilder) build() string {
clusterAPI := NewClustersAPI(context.Background(), common.CommonEnvironmentClient())
sparkVersion := clusterAPI.LatestSparkVersionOrDefault(SparkVersionRequest{Latest: true, LongTermSupport: true})
return fmt.Sprintf(`
resource "databricks_cluster" "%[1]s" {
cluster_name = "%[1]s"
%[2]s
spark_version = "6.6.x-scala2.11"
spark_version = "%[6]s"
autoscale {
min_workers = 1
max_workers = 2
Expand All @@ -195,7 +203,7 @@ resource "databricks_cluster" "%[1]s" {
custom_tags = {
"ResourceClass" = "Serverless"
}
}`, c.Name, c.instancePool, c.awsAttributes, c.libraries, c.nodeTypeID, c.diskSpec)
}`, c.Name, c.instancePool, c.awsAttributes, c.libraries, c.nodeTypeID, sparkVersion)
}

func TestAwsAccClusterResource_ValidatePlan(t *testing.T) {
Expand Down Expand Up @@ -300,6 +308,9 @@ func TestAwsAccClusterResource_CreateClusterViaInstancePool(t *testing.T) {
}

func TestAzureAccClusterResource_CreateClusterViaInstancePool(t *testing.T) {
if os.Getenv("CLOUD_ENV") == "" {
return
}
randomInstancePoolName := fmt.Sprintf("pool_%s", acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum))
randomClusterName := fmt.Sprintf("cluster_%s", acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum))
defaultAzureInstancePoolClusterTest := newInstancePoolHCLBuilder(randomInstancePoolName).withCloudEnv().build() +
Expand Down Expand Up @@ -350,6 +361,34 @@ func TestAccClusterResource_CreateClusterWithLibraries(t *testing.T) {
})
}

func TestAccClusterResource_CreateSingleNodeCluster(t *testing.T) {
if os.Getenv("CLOUD_ENV") == "" {
return
}
randomName := fmt.Sprintf("cluster-%s", acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum))
clusterAPI := NewClustersAPI(context.Background(), common.CommonEnvironmentClient())

acceptance.AccTest(t, resource.TestCase{
Steps: []resource.TestStep{
{
Config: fmt.Sprintf(`
resource "databricks_cluster" "%[1]s" {
cluster_name = "%[1]s"
spark_version = "%[3]s"
node_type_id = "%[2]s"
num_workers = 0
autotermination_minutes = 10
spark_conf = {
"spark.databricks.cluster.profile" = "singleNode"
"spark.master" = "local[*]"
}
}`, randomName, clusterAPI.GetSmallestNodeType(NodeTypeRequest{LocalDisk: true}),
clusterAPI.LatestSparkVersionOrDefault(SparkVersionRequest{Latest: true, LongTermSupport: true})),
},
},
})
}

func testClusterCheckAndTerminateForFutureTests(n string, t *testing.T) resource.TestCheckFunc {
return acceptance.ResourceCheck(n,
func(ctx context.Context, client *common.DatabricksClient, id string) error {
Expand Down
68 changes: 68 additions & 0 deletions compute/acceptance/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,74 @@ import (
"github.com/stretchr/testify/assert"
)

func TestAwsAccJobsCreate(t *testing.T) {
if _, ok := os.LookupEnv("CLOUD_ENV"); !ok {
t.Skip("Acceptance tests skipped unless env 'CLOUD_ENV' is set")
}

client := common.NewClientFromEnvironment()
jobsAPI := NewJobsAPI(context.Background(), client)
clustersAPI := NewClustersAPI(context.Background(), client)
sparkVersion := clustersAPI.LatestSparkVersionOrDefault(compute.SparkVersionRequest{Latest: true, LongTermSupport: true})

jobSettings := JobSettings{
NewCluster: &Cluster{
NumWorkers: 2,
SparkVersion: sparkVersion,
SparkConf: nil,
AwsAttributes: &AwsAttributes{
Availability: "ON_DEMAND",
},
NodeTypeID: clustersAPI.GetSmallestNodeType(NodeTypeRequest{}),
},
NotebookTask: &NotebookTask{
NotebookPath: "/tf-test/demo-terraform/demo-notebook",
},
Name: "1-test-job",
Libraries: []Library{
{
Maven: &Maven{
Coordinates: "org.jsoup:jsoup:1.7.2",
},
},
},
EmailNotifications: &JobEmailNotifications{
OnStart: []string{},
OnSuccess: []string{},
OnFailure: []string{},
},
TimeoutSeconds: 3600,
MaxRetries: 1,
Schedule: &CronSchedule{
QuartzCronExpression: "0 15 22 ? * *",
TimezoneID: "America/Los_Angeles",
},
MaxConcurrentRuns: 1,
}

job, err := jobsAPI.Create(jobSettings)
assert.NoError(t, err, err)
id := job.ID()
defer func() {
err := jobsAPI.Delete(id)
assert.NoError(t, err, err)
}()
t.Log(id)
job, err = jobsAPI.Read(id)
assert.NoError(t, err, err)
assert.True(t, job.Settings.NewCluster.SparkVersion == sparkVersion, "Something is wrong with spark version")

newSparkVersion := clustersAPI.LatestSparkVersionOrDefault(compute.SparkVersionRequest{Latest: true})
jobSettings.NewCluster.SparkVersion = newSparkVersion

err = jobsAPI.Update(id, jobSettings)
assert.NoError(t, err, err)

job, err = jobsAPI.Read(id)
assert.NoError(t, err, err)
assert.True(t, job.Settings.NewCluster.SparkVersion == newSparkVersion, "Something is wrong with spark version")
}

func TestAccJobResource(t *testing.T) {
if _, ok := os.LookupEnv("CLOUD_ENV"); !ok {
t.Skip("Acceptance tests skipped unless env 'CLOUD_ENV' is set")
Expand Down
2 changes: 1 addition & 1 deletion compute/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ type Cluster struct {
ClusterName string `json:"cluster_name,omitempty"`

SparkVersion string `json:"spark_version"` // TODO: perhaps make a default
NumWorkers int32 `json:"num_workers,omitempty" tf:"group:size"`
NumWorkers int32 `json:"num_workers" tf:"group:size"`
Autoscale *AutoScale `json:"autoscale,omitempty" tf:"group:size"`
EnableElasticDisk bool `json:"enable_elastic_disk,omitempty" tf:"computed"`
EnableLocalDiskEncryption bool `json:"enable_local_disk_encryption,omitempty"`
Expand Down
29 changes: 29 additions & 0 deletions compute/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package compute

import (
"context"
"fmt"
"log"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"

Expand Down Expand Up @@ -82,17 +86,38 @@ func resourceClusterSchema() map[string]*schema.Schema {
Type: schema.TypeMap,
Computed: true,
}
s["num_workers"] = &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 0,
ValidateDiagFunc: validation.ToDiagFunc(validation.IntAtLeast(0)),
}
return s
})
}

func validateClusterDefinition(cluster Cluster) error {
if cluster.NumWorkers > 0 || cluster.Autoscale != nil {
return nil
}
profile := cluster.SparkConf["spark.databricks.cluster.profile"]
master := cluster.SparkConf["spark.master"]
if profile == "singleNode" && strings.HasPrefix(master, "local") {
return nil
}
return fmt.Errorf("NumWorkers could be 0 only for SingleNode clusters. See https://docs.databricks.com/clusters/single-node.html for more details")
}

func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
var cluster Cluster
clusters := NewClustersAPI(ctx, c)
err := internal.DataToStructPointer(d, clusterSchema, &cluster)
if err != nil {
return err
}
if err = validateClusterDefinition(cluster); err != nil {
return err
}
modifyClusterRequest(&cluster)
clusterInfo, err := clusters.Create(cluster)
if err != nil {
Expand Down Expand Up @@ -218,6 +243,10 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo
var clusterInfo ClusterInfo
if hasClusterConfigChanged(d) {
log.Printf("[DEBUG] Cluster state has changed!")
err = validateClusterDefinition(cluster)
if err != nil {
return err
}
modifyClusterRequest(&cluster)
clusterInfo, err = clusters.Edit(cluster)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion compute/resource_cluster_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func NewClusterPoliciesAPI(ctx context.Context, m interface{}) ClusterPoliciesAP
return ClusterPoliciesAPI{m.(*common.DatabricksClient), ctx}
}

// Listing can be performed by any user and is limited to policies accessible by that user.
// ClusterPoliciesAPI struct for cluster policies API
type ClusterPoliciesAPI struct {
client *common.DatabricksClient
context context.Context
Expand Down
Loading

0 comments on commit dacd91d

Please sign in to comment.