Skip to content

Commit

Permalink
Remove failed libraries on running clusters
Browse files Browse the repository at this point in the history
Whenever a library fails to get installed on a running cluster, we automatically remove it, so that the clean state of managed libraries is properly maintained. Without this fix users had to manually go to Clusters UI and remove library from a cluster, where it failed to install. Libraries add up to CREATE and UPDATE timeouts of `databricks_cluster` resource.

Fixes #599
  • Loading branch information
nfx committed Dec 11, 2021
1 parent ca42fef commit bea055a
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 44 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

## 0.4.1

* Fixed refresh of `library` blocks on a stopped `databricks_cluster` ([#952](https://github.com/databrickslabs/terraform-provider-databricks/issues/952))
* Added `databricks_clusters` data resource to list all clusters in the workspace.
* Fixed refresh of `library` blocks on a stopped `databricks_cluster` ([#952](https://github.com/databrickslabs/terraform-provider-databricks/issues/952)).
* Added `databricks_clusters` data resource to list all clusters in the workspace ([#955](https://github.com/databrickslabs/terraform-provider-databricks/pull/955)).
* Whenever a library fails to get installed on a running `databricks_cluster`, we now automatically remove this library, so that the clean state of managed libraries is properly maintained. Without this fix users had to manually go to Clusters UI and remove library from a cluster, where it failed to install. Libraries add up to CREATE and UPDATE timeouts of `databricks_cluster` resource. ([#599](https://github.com/databrickslabs/terraform-provider-databricks/issues/599)).
* Added new experimental resources and increased test coverage.

## 0.4.0

Expand Down
25 changes: 18 additions & 7 deletions clusters/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func resourceClusterSchema() map[string]*schema.Schema {

func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
var cluster Cluster
start := time.Now()
timeout := d.Timeout(schema.TimeoutCreate)
clusters := NewClustersAPI(ctx, c)
err := common.DataToStructPointer(d, clusterSchema, &cluster)
if err != nil {
Expand All @@ -131,6 +133,7 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
return err
}
cluster.ModifyRequestOnInstancePool()
// TODO: propagate d.Timeout(schema.TimeoutCreate)
clusterInfo, err := clusters.Create(cluster)
if err != nil {
return err
Expand All @@ -153,9 +156,12 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
if err = libs.Install(libraryList); err != nil {
return err
}
// TODO: share the remainder of timeout from clusters.Create
timeout := d.Timeout(schema.TimeoutCreate)
_, err := libs.WaitForLibrariesInstalled(d.Id(), timeout, clusterInfo.IsRunningOrResizing())
_, err := libs.WaitForLibrariesInstalled(libraries.Wait{
ClusterID: d.Id(),
Timeout: timeout - time.Since(start),
IsRunning: clusterInfo.IsRunningOrResizing(),
IsRefresh: false,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -195,8 +201,12 @@ func resourceClusterRead(ctx context.Context, d *schema.ResourceData, c *common.
}
d.Set("url", c.FormatURL("#setting/clusters/", d.Id(), "/configuration"))
librariesAPI := libraries.NewLibrariesAPI(ctx, c)
libsClusterStatus, err := librariesAPI.WaitForLibrariesInstalled(d.Id(),
d.Timeout(schema.TimeoutRead), clusterInfo.IsRunningOrResizing())
libsClusterStatus, err := librariesAPI.WaitForLibrariesInstalled(libraries.Wait{
ClusterID: d.Id(),
Timeout: d.Timeout(schema.TimeoutRead),
IsRunning: clusterInfo.IsRunningOrResizing(),
IsRefresh: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -284,9 +294,10 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo
return err
}
}
// clusters.StartAndGetInfo() always returns a running cluster
// clusters.StartAndGetInfo() always returns a running cluster
// or errors out, so we just know the cluster is active.
err = librariesAPI.UpdateLibraries(clusterID, libsToInstall, libsToUninstall, true)
err = librariesAPI.UpdateLibraries(clusterID, libsToInstall, libsToUninstall,
d.Timeout(schema.TimeoutUpdate))
if err != nil {
return err
}
Expand Down
70 changes: 63 additions & 7 deletions clusters/resource_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,8 +1210,8 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) {
Fixtures: []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/clusters/get?cluster_id=foo",
Response: ClusterInfo {
Resource: "/api/2.0/clusters/get?cluster_id=foo",
Response: ClusterInfo{
State: ClusterStateTerminated,
},
},
Expand All @@ -1220,10 +1220,10 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) {
Resource: "/api/2.0/clusters/events",
},
{
Method: "GET",
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo",
Response: libraries.ClusterLibraryStatuses {
Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo",
Response: libraries.ClusterLibraryStatuses{
ClusterID: "foo",
LibraryStatuses: []libraries.LibraryStatus{
{
Expand All @@ -1237,6 +1237,62 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) {
},
},
Read: true,
ID: "foo",
ID: "foo",
}.ApplyNoError(t)
}
}

// https://github.com/databrickslabs/terraform-provider-databricks/issues/599
func TestRefreshOnRunningClusterWithFailedLibraryUninstallsIt(t *testing.T) {
qa.ResourceFixture{
Resource: ResourceCluster(),
Fixtures: []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/clusters/get?cluster_id=foo",
Response: ClusterInfo{
State: ClusterStateRunning,
},
},
{
Method: "POST",
Resource: "/api/2.0/clusters/events",
},
{
Method: "GET",
Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo",
Response: libraries.ClusterLibraryStatuses{
ClusterID: "foo",
LibraryStatuses: []libraries.LibraryStatus{
{
Status: "FAILED",
Messages: []string{"fails for the test"},
Library: &libraries.Library{
Jar: "foo.bar",
},
},
{
Status: "INSTALLED",
Library: &libraries.Library{
Whl: "bar.whl",
},
},
},
},
},
{
Method: "POST",
Resource: "/api/2.0/libraries/uninstall",
ExpectedRequest: libraries.ClusterLibraryList{
ClusterID: "foo",
Libraries: []libraries.Library{
{
Jar: "foo.bar",
},
},
},
},
},
Read: true,
ID: "foo",
}.ApplyNoError(t)
}
74 changes: 58 additions & 16 deletions libraries/libraries_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ func (a LibrariesAPI) ClusterStatus(clusterID string) (cls ClusterLibraryStatuse
return
}

func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibraryList, isActive bool) error {
type Wait struct {
ClusterID string
Timeout time.Duration
IsRunning bool
IsRefresh bool
}

func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibraryList, timeout time.Duration) error {
if len(remove.Libraries) > 0 {
err := a.Uninstall(remove)
if err != nil {
Expand All @@ -58,29 +65,33 @@ func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibra
return err
}
}
// TODO: propagate timeout to method signature
_, err := a.WaitForLibrariesInstalled(clusterID, 30*time.Minute, isActive)
_, err := a.WaitForLibrariesInstalled(Wait{
ClusterID: clusterID,
Timeout: timeout,
IsRunning: true,
IsRefresh: false,
})
return err
}

func (a LibrariesAPI) WaitForLibrariesInstalled(clusterID string, timeout time.Duration,
isActive bool) (result *ClusterLibraryStatuses, err error) {
err = resource.RetryContext(a.context, timeout, func() *resource.RetryError {
libsClusterStatus, err := a.ClusterStatus(clusterID)
// clusterID string, timeout time.Duration, isActive bool, refresh bool
func (a LibrariesAPI) WaitForLibrariesInstalled(wait Wait) (result *ClusterLibraryStatuses, err error) {
err = resource.RetryContext(a.context, wait.Timeout, func() *resource.RetryError {
libsClusterStatus, err := a.ClusterStatus(wait.ClusterID)
if common.IsMissing(err) {
// eventual consistency error
return resource.RetryableError(err)
}
if err != nil {
return resource.NonRetryableError(err)
}
if !isActive {
if !wait.IsRunning {
log.Printf("[INFO] Cluster %s is currently not running, so just returning list of %d libraries",
clusterID, len(libsClusterStatus.LibraryStatuses))
wait.ClusterID, len(libsClusterStatus.LibraryStatuses))
result = &libsClusterStatus
return nil
}
retry, err := libsClusterStatus.IsRetryNeeded()
retry, err := libsClusterStatus.IsRetryNeeded(wait.IsRefresh)
if retry {
return resource.RetryableError(err)
}
Expand All @@ -90,6 +101,33 @@ func (a LibrariesAPI) WaitForLibrariesInstalled(clusterID string, timeout time.D
result = &libsClusterStatus
return nil
})
if err != nil {
return
}
if wait.IsRunning {
installed := []LibraryStatus{}
cleanup := ClusterLibraryList{
ClusterID: wait.ClusterID,
Libraries: []Library{},
}
// cleanup libraries that failed to install
for _, v := range result.LibraryStatuses {
if v.Status == "FAILED" {
log.Printf("[WARN] Removing failed library %s from %s", v.Library, wait.ClusterID)
cleanup.Libraries = append(cleanup.Libraries, *v.Library)
continue
}
installed = append(installed, v)
}
// and result contains only the libraries that were successfully installed
result.LibraryStatuses = installed
if len(cleanup.Libraries) > 0 {
err = a.Uninstall(cleanup)
if err != nil {
err = fmt.Errorf("cannot cleanup libraries: %w", err)
}
}
}
return
}

Expand Down Expand Up @@ -243,10 +281,10 @@ func (cll *ClusterLibraryList) String() string {

// LibraryStatus is the status on a given cluster when using the libraries status api
type LibraryStatus struct {
Library *Library `json:"library,omitempty"`
Status string `json:"status,omitempty"`
IsLibraryInstalledOnAllClusters bool `json:"is_library_for_all_clusters,omitempty"`
Messages []string `json:"messages,omitempty"`
Library *Library `json:"library,omitempty"`
Status string `json:"status,omitempty"`
IsGlobal bool `json:"is_library_for_all_clusters,omitempty"`
Messages []string `json:"messages,omitempty"`
}

// ClusterLibraryStatuses A status will be available for all libraries installed on the cluster via the API or
Expand All @@ -271,12 +309,12 @@ func (cls ClusterLibraryStatuses) ToLibraryList() ClusterLibraryList {
// IsRetryNeeded returns first bool if there needs to be retry.
// If there needs to be retry, error message will explain why.
// If retry does not need to happen and error is not nil - it failed.
func (cls ClusterLibraryStatuses) IsRetryNeeded() (bool, error) {
func (cls ClusterLibraryStatuses) IsRetryNeeded(refresh bool) (bool, error) {
pending := 0
ready := 0
errors := []string{}
for _, lib := range cls.LibraryStatuses {
if lib.IsLibraryInstalledOnAllClusters {
if lib.IsGlobal {
continue
}
switch lib.Status {
Expand All @@ -301,6 +339,10 @@ func (cls ClusterLibraryStatuses) IsRetryNeeded() (bool, error) {
ready++
//Some step in installation failed. More information can be found in the messages field.
case "FAILED":
if refresh {
// we're reading library list on a running cluster and some of the libs failed to install
continue
}
errors = append(errors, fmt.Sprintf("%s failed: %s", lib.Library, strings.Join(lib.Messages, ", ")))
continue
}
Expand Down
Loading

0 comments on commit bea055a

Please sign in to comment.