Skip to content

Commit

Permalink
Support permissions for databricks_pipeline
Browse files Browse the repository at this point in the history
This fixes #1359
  • Loading branch information
alexott committed Jun 7, 2022
1 parent d15fecb commit e23b2ac
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 31 deletions.
66 changes: 66 additions & 0 deletions docs/resources/permissions.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,72 @@ resource "databricks_permissions" "job_usage" {
}
```

## Delta Live Tables usage

There are four assignable [permission levels](https://docs.databricks.com/security/access-control/dlt-acl.html#delta-live-tables-permissions) for [databricks_pipeline](pipeline.md): `CAN_VIEW`, `CAN_RUN`, `CAN_MANAGE`, and `IS_OWNER`. Admins are granted the `CAN_MANAGE` permission by default, and they can assign that permission to non-admin users, and service principals.

- The creator of a DLT Pipeline has `IS_OWNER` permission. Destroying `databricks_permissions` resource for a pipeline would revert ownership to the creator.
- A DLT pipeline must have exactly one owner. If a resource is changed and no owner is specified, the currently authenticated principal would become the new owner of the pipeline. Nothing would change, per se, if the pipeline was created through Terraform.
- A DLT pipeline cannot have a group as an owner.
- DLT Pipelines triggered through _Start_ assume the permissions of the pipeline owner and not the user, and service principal who issued Run Now.
- Read [main documentation](https://docs.databricks.com/security/access-control/dlt-acl.html) for additional detail.

```hcl
resource "databricks_group" "eng" {
display_name = "Engineering"
}
resource "databricks_notebook" "dlt_demo" {
content_base64 = base64encode(<<-EOT
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@dlt.table(
comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets."
)
def clickstream_raw():
return (spark.read.format("json").load(json_path))
EOT
)
language = "PYTHON"
path = "${data.databricks_current_user.me.home}/DLT_Demo"
}
resource "databricks_pipeline" "this" {
name = "DLT Demo Pipeline (${data.databricks_current_user.me.alphanumeric})"
storage = "/test/tf-pipeline"
configuration = {
key1 = "value1"
key2 = "value2"
}
library {
notebook {
path = databricks_notebook.dlt_demo.id
}
}
continuous = false
filters {
include = ["com.databricks.include"]
exclude = ["com.databricks.exclude"]
}
}
resource "databricks_permissions" "dlt_usage" {
pipeline_id = databricks_pipeline.this.id
access_control {
group_name = "users"
permission_level = "CAN_VIEW"
}
access_control {
group_name = databricks_group.eng.display_name
permission_level = "CAN_MANAGE"
}
}
```

## Notebook usage

Valid [permission levels](https://docs.databricks.com/security/access-control/workspace-acl.html#notebook-permissions) for [databricks_notebook](notebook.md) are: `CAN_READ`, `CAN_RUN`, `CAN_EDIT`, and `CAN_MANAGE`.
Expand Down
13 changes: 12 additions & 1 deletion permissions/resource_permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/databrickslabs/terraform-provider-databricks/common"
"github.com/databrickslabs/terraform-provider-databricks/jobs"
"github.com/databrickslabs/terraform-provider-databricks/pipelines"
"github.com/databrickslabs/terraform-provider-databricks/scim"

"github.com/databrickslabs/terraform-provider-databricks/workspace"
Expand Down Expand Up @@ -150,7 +151,7 @@ func (a PermissionsAPI) Update(objectID string, objectACL AccessControlChangeLis
PermissionLevel: "CAN_MANAGE",
})
}
if strings.HasPrefix(objectID, "/jobs") {
if strings.HasPrefix(objectID, "/jobs") || strings.HasPrefix(objectID, "/pipelines") {
owners := 0
for _, acl := range objectACL.AccessControlList {
if acl.PermissionLevel == "IS_OWNER" {
Expand Down Expand Up @@ -196,6 +197,15 @@ func (a PermissionsAPI) Delete(objectID string) error {
UserName: job.CreatorUserName,
PermissionLevel: "IS_OWNER",
})
} else if strings.HasPrefix(objectID, "/pipelines") {
job, err := pipelines.NewPipelinesAPI(a.context, a.client).Read(strings.ReplaceAll(objectID, "/pipelines/", ""))
if err != nil {
return err
}
accl.AccessControlList = append(accl.AccessControlList, AccessControlChange{
UserName: job.CreatorUserName,
PermissionLevel: "IS_OWNER",
})
}
return a.put(objectID, accl)
}
Expand Down Expand Up @@ -241,6 +251,7 @@ func permissionsResourceIDFields() []permissionsIDFieldMapping {
{"cluster_policy_id", "cluster-policy", "cluster-policies", []string{"CAN_USE"}, SIMPLE},
{"instance_pool_id", "instance-pool", "instance-pools", []string{"CAN_ATTACH_TO", "CAN_MANAGE"}, SIMPLE},
{"cluster_id", "cluster", "clusters", []string{"CAN_ATTACH_TO", "CAN_RESTART", "CAN_MANAGE"}, SIMPLE},
{"pipeline_id", "pipelines", "pipelines", []string{"CAN_VIEW", "CAN_RUN", "CAN_MANAGE", "IS_OWNER"}, SIMPLE},
{"job_id", "job", "jobs", []string{"CAN_VIEW", "CAN_MANAGE_RUN", "IS_OWNER", "CAN_MANAGE"}, SIMPLE},
{"notebook_id", "notebook", "notebooks", []string{"CAN_READ", "CAN_RUN", "CAN_EDIT", "CAN_MANAGE"}, SIMPLE},
{"notebook_path", "notebook", "notebooks", []string{"CAN_READ", "CAN_RUN", "CAN_EDIT", "CAN_MANAGE"}, PATH},
Expand Down
55 changes: 55 additions & 0 deletions permissions/resource_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,61 @@ func TestShouldKeepAdminsOnAnythingExceptPasswordsAndAssignsOwnerForJob(t *testi
})
}

func TestShouldKeepAdminsOnAnythingExceptPasswordsAndAssignsOwnerForPipeline(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/permissions/pipelines/123",
Response: ObjectACL{
ObjectID: "/pipelines/123",
ObjectType: "pipeline",
AccessControlList: []AccessControl{
{
GroupName: "admins",
AllPermissions: []Permission{
{
PermissionLevel: "CAN_DO_EVERYTHING",
Inherited: true,
},
{
PermissionLevel: "CAN_MANAGE",
Inherited: false,
},
},
},
},
},
},
{
Method: "GET",
Resource: "/api/2.0/pipelines/123",
Response: jobs.Job{
CreatorUserName: "creator@example.com",
},
},
{
Method: "PUT",
Resource: "/api/2.0/permissions/pipelines/123",
ExpectedRequest: ObjectACL{
AccessControlList: []AccessControl{
{
GroupName: "admins",
PermissionLevel: "CAN_MANAGE",
},
{
UserName: "creator@example.com",
PermissionLevel: "IS_OWNER",
},
},
},
},
}, func(ctx context.Context, client *common.DatabricksClient) {
p := NewPermissionsAPI(ctx, client)
err := p.Delete("/pipelines/123")
assert.NoError(t, err)
})
}

func TestCustomizeDiffNoHostYet(t *testing.T) {
assert.Nil(t, ResourcePermissions().CustomizeDiff(context.TODO(), nil, &common.DatabricksClient{}))
}
Expand Down
51 changes: 26 additions & 25 deletions pipelines/resource_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,27 @@ const (
HealthStatusUnhealthy PipelineHealthStatus = "UNHEALTHY"
)

type pipelineInfo struct {
PipelineID string `json:"pipeline_id"`
Spec *pipelineSpec `json:"spec"`
State *PipelineState `json:"state"`
Cause string `json:"cause"`
ClusterID string `json:"cluster_id"`
Name string `json:"name"`
Health *PipelineHealthStatus `json:"health"`
type PipelineInfo struct {
PipelineID string `json:"pipeline_id"`
Spec *pipelineSpec `json:"spec"`
State *PipelineState `json:"state"`
Cause string `json:"cause"`
ClusterID string `json:"cluster_id"`
Name string `json:"name"`
Health *PipelineHealthStatus `json:"health"`
CreatorUserName string `json:"creator_user_name"`
}

type pipelinesAPI struct {
type PipelinesAPI struct {
client *common.DatabricksClient
ctx context.Context
}

func newPipelinesAPI(ctx context.Context, m interface{}) pipelinesAPI {
return pipelinesAPI{m.(*common.DatabricksClient), ctx}
func NewPipelinesAPI(ctx context.Context, m interface{}) PipelinesAPI {
return PipelinesAPI{m.(*common.DatabricksClient), ctx}
}

func (a pipelinesAPI) create(s pipelineSpec, timeout time.Duration) (string, error) {
func (a PipelinesAPI) Create(s pipelineSpec, timeout time.Duration) (string, error) {
var resp createPipelineResponse
err := a.client.Post(a.ctx, "/pipelines", s, &resp)
if err != nil {
Expand All @@ -128,7 +129,7 @@ func (a pipelinesAPI) create(s pipelineSpec, timeout time.Duration) (string, err
err = a.waitForState(id, timeout, StateRunning)
if err != nil {
log.Printf("[INFO] Pipeline creation failed, attempting to clean up pipeline %s", id)
err2 := a.delete(id, timeout)
err2 := a.Delete(id, timeout)
if err2 != nil {
log.Printf("[WARN] Unable to delete pipeline %s; this resource needs to be manually cleaned up", id)
return "", fmt.Errorf("multiple errors occurred when creating pipeline. Error while waiting for creation: \"%v\"; error while attempting to clean up failed pipeline: \"%v\"", err, err2)
Expand All @@ -139,27 +140,27 @@ func (a pipelinesAPI) create(s pipelineSpec, timeout time.Duration) (string, err
return id, nil
}

func (a pipelinesAPI) read(id string) (p pipelineInfo, err error) {
func (a PipelinesAPI) Read(id string) (p PipelineInfo, err error) {
err = a.client.Get(a.ctx, "/pipelines/"+id, nil, &p)
return
}

func (a pipelinesAPI) update(id string, s pipelineSpec, timeout time.Duration) error {
func (a PipelinesAPI) Update(id string, s pipelineSpec, timeout time.Duration) error {
err := a.client.Put(a.ctx, "/pipelines/"+id, s)
if err != nil {
return err
}
return a.waitForState(id, timeout, StateRunning)
}

func (a pipelinesAPI) delete(id string, timeout time.Duration) error {
func (a PipelinesAPI) Delete(id string, timeout time.Duration) error {
err := a.client.Delete(a.ctx, "/pipelines/"+id, map[string]string{})
if err != nil {
return err
}
return resource.RetryContext(a.ctx, timeout,
func() *resource.RetryError {
i, err := a.read(id)
i, err := a.Read(id)
if err != nil {
if common.IsMissing(err) {
return nil
Expand All @@ -172,10 +173,10 @@ func (a pipelinesAPI) delete(id string, timeout time.Duration) error {
})
}

func (a pipelinesAPI) waitForState(id string, timeout time.Duration, desiredState PipelineState) error {
func (a PipelinesAPI) waitForState(id string, timeout time.Duration, desiredState PipelineState) error {
return resource.RetryContext(a.ctx, timeout,
func() *resource.RetryError {
i, err := a.read(id)
i, err := a.Read(id)
if err != nil {
return resource.NonRetryableError(err)
}
Expand Down Expand Up @@ -229,8 +230,8 @@ func ResourcePipeline() *schema.Resource {
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
var s pipelineSpec
common.DataToStructPointer(d, pipelineSchema, &s)
api := newPipelinesAPI(ctx, c)
id, err := api.create(s, d.Timeout(schema.TimeoutCreate))
api := NewPipelinesAPI(ctx, c)
id, err := api.Create(s, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}
Expand All @@ -239,7 +240,7 @@ func ResourcePipeline() *schema.Resource {
return nil
},
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
i, err := newPipelinesAPI(ctx, c).read(d.Id())
i, err := NewPipelinesAPI(ctx, c).Read(d.Id())
if err != nil {
return err
}
Expand All @@ -251,11 +252,11 @@ func ResourcePipeline() *schema.Resource {
Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
var s pipelineSpec
common.DataToStructPointer(d, pipelineSchema, &s)
return newPipelinesAPI(ctx, c).update(d.Id(), s, d.Timeout(schema.TimeoutUpdate))
return NewPipelinesAPI(ctx, c).Update(d.Id(), s, d.Timeout(schema.TimeoutUpdate))
},
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
api := newPipelinesAPI(ctx, c)
return api.delete(d.Id(), d.Timeout(schema.TimeoutDelete))
api := NewPipelinesAPI(ctx, c)
return api.Delete(d.Id(), d.Timeout(schema.TimeoutDelete))
},
Timeouts: &schema.ResourceTimeout{
Default: schema.DefaultTimeout(DefaultTimeout),
Expand Down
10 changes: 5 additions & 5 deletions pipelines/resource_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestResourcePipelineRead(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines/abcd",
Response: pipelineInfo{
Response: PipelineInfo{
PipelineID: "abcd",
Spec: &basicPipelineSpec,
},
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestResourcePipelineUpdate(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines/abcd",
Response: pipelineInfo{
Response: PipelineInfo{
PipelineID: "abcd",
Spec: &spec,
State: &state,
Expand All @@ -344,7 +344,7 @@ func TestResourcePipelineUpdate(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines/abcd",
Response: pipelineInfo{
Response: PipelineInfo{
PipelineID: "abcd",
Spec: &spec,
State: &state,
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestResourcePipelineUpdate_FailsAfterUpdate(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines/abcd",
Response: pipelineInfo{
Response: PipelineInfo{
PipelineID: "abcd",
Spec: &spec,
State: &state,
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestResourcePipelineDelete(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines/abcd",
Response: pipelineInfo{
Response: PipelineInfo{
PipelineID: "abcd",
Spec: &basicPipelineSpec,
State: &state,
Expand Down

0 comments on commit e23b2ac

Please sign in to comment.