-
Notifications
You must be signed in to change notification settings - Fork 776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support describeacls #1166
Support describeacls #1166
Changes from 18 commits
cbc8f78
ef59217
2609002
ec01572
fdfe93a
5622f4f
a865cc5
ca11cb9
6805667
f6e2d37
b8746fa
565bc7a
fbea1d3
28383ab
cf63056
9947470
6870645
283760d
3f32979
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,15 +15,18 @@ func TestClientCreateACLs(t *testing.T) { | |
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ | ||
topic := makeTopic() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made these non-deterministic to avoid collisions with other tests |
||
group := makeGroupID() | ||
|
||
createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ | ||
ACLs: []ACLEntry{ | ||
{ | ||
Principal: "User:alice", | ||
PermissionType: ACLPermissionTypeAllow, | ||
Operation: ACLOperationTypeRead, | ||
ResourceType: ResourceTypeTopic, | ||
ResourcePatternType: PatternTypeLiteral, | ||
ResourceName: "fake-topic-for-alice", | ||
ResourceName: topic, | ||
Host: "*", | ||
}, | ||
{ | ||
|
@@ -32,7 +35,7 @@ func TestClientCreateACLs(t *testing.T) { | |
Operation: ACLOperationTypeRead, | ||
ResourceType: ResourceTypeGroup, | ||
ResourcePatternType: PatternTypeLiteral, | ||
ResourceName: "fake-group-for-bob", | ||
ResourceName: group, | ||
Host: "*", | ||
}, | ||
}, | ||
|
@@ -41,7 +44,7 @@ func TestClientCreateACLs(t *testing.T) { | |
t.Fatal(err) | ||
} | ||
|
||
for _, err := range res.Errors { | ||
for _, err := range createRes.Errors { | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/describeacls" | ||
) | ||
|
||
// DescribeACLsRequest represents a request sent to a kafka broker to describe | ||
// existing ACLs. | ||
type DescribeACLsRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// Filter to filter ACLs on. | ||
Filter ACLFilter | ||
} | ||
|
||
type ACLFilter struct { | ||
ResourceTypeFilter ResourceType | ||
ResourceNameFilter string | ||
ResourcePatternTypeFilter PatternType | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not really sure how to handle this field since it was added in v1. We could use a pointer to make this nullable or have separate data structures and code paths for different versions. WDYT @rhansen2 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this works as is but we should probably have a comment explaining that it won't be used if the only supported API version is v0. I think the encoding will leave it out as appropriate based on the api version being used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, I added a comment |
||
PrincipalFilter string | ||
HostFilter string | ||
Operation ACLOperationType | ||
PermissionType ACLPermissionType | ||
} | ||
|
||
// DescribeACLsResponse represents a response from a kafka broker to an ACL | ||
// describe request. | ||
type DescribeACLsResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration | ||
|
||
// Error that occurred while attempting to describe | ||
// the ACLs. | ||
Error error | ||
|
||
// ACL resources returned from the describe request. | ||
Resources []ACLResource | ||
} | ||
|
||
type ACLResource struct { | ||
ResourceType ResourceType | ||
ResourceName string | ||
PatternType PatternType | ||
ACLs []ACLDescription | ||
} | ||
|
||
type ACLDescription struct { | ||
Principal string | ||
Host string | ||
Operation ACLOperationType | ||
PermissionType ACLPermissionType | ||
} | ||
|
||
func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) { | ||
m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{ | ||
Filter: describeacls.ACLFilter{ | ||
ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter), | ||
ResourceNameFilter: req.Filter.ResourceNameFilter, | ||
ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter), | ||
PrincipalFilter: req.Filter.PrincipalFilter, | ||
HostFilter: req.Filter.HostFilter, | ||
Operation: int8(req.Filter.Operation), | ||
PermissionType: int8(req.Filter.PermissionType), | ||
}, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err) | ||
} | ||
|
||
res := m.(*describeacls.Response) | ||
resources := make([]ACLResource, len(res.Resources)) | ||
|
||
for resourceIdx, respResource := range res.Resources { | ||
descriptions := make([]ACLDescription, len(respResource.ACLs)) | ||
|
||
for descriptionIdx, respDescription := range respResource.ACLs { | ||
descriptions[descriptionIdx] = ACLDescription{ | ||
Principal: respDescription.Principal, | ||
Host: respDescription.Host, | ||
Operation: ACLOperationType(respDescription.Operation), | ||
PermissionType: ACLPermissionType(respDescription.PermissionType), | ||
} | ||
} | ||
|
||
resources[resourceIdx] = ACLResource{ | ||
ResourceType: ResourceType(respResource.ResourceType), | ||
ResourceName: respResource.ResourceName, | ||
PatternType: PatternType(respResource.PatternType), | ||
ACLs: descriptions, | ||
} | ||
} | ||
|
||
ret := &DescribeACLsResponse{ | ||
Throttle: makeDuration(res.ThrottleTimeMs), | ||
Error: makeError(res.ErrorCode, res.ErrorMessage), | ||
Resources: resources, | ||
} | ||
|
||
return ret, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestClientDescribeACLs(t *testing.T) { | ||
if !ktesting.KafkaIsAtLeast("2.0.1") { | ||
return | ||
} | ||
|
||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
topic := makeTopic() | ||
group := makeGroupID() | ||
|
||
createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ | ||
ACLs: []ACLEntry{ | ||
{ | ||
Principal: "User:alice", | ||
PermissionType: ACLPermissionTypeAllow, | ||
Operation: ACLOperationTypeRead, | ||
ResourceType: ResourceTypeTopic, | ||
ResourcePatternType: PatternTypeLiteral, | ||
ResourceName: topic, | ||
Host: "*", | ||
}, | ||
{ | ||
Principal: "User:bob", | ||
PermissionType: ACLPermissionTypeAllow, | ||
Operation: ACLOperationTypeRead, | ||
ResourceType: ResourceTypeGroup, | ||
ResourcePatternType: PatternTypeLiteral, | ||
ResourceName: group, | ||
Host: "*", | ||
}, | ||
}, | ||
}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
for _, err := range createRes.Errors { | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
} | ||
|
||
describeResp, err := client.DescribeACLs(context.Background(), &DescribeACLsRequest{ | ||
Filter: ACLFilter{ | ||
ResourceTypeFilter: ResourceTypeTopic, | ||
ResourceNameFilter: topic, | ||
ResourcePatternTypeFilter: PatternTypeLiteral, | ||
Operation: ACLOperationTypeRead, | ||
PermissionType: ACLPermissionTypeAllow, | ||
}, | ||
}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
expectedDescribeResp := DescribeACLsResponse{ | ||
Throttle: 0, | ||
Error: makeError(0, ""), | ||
Resources: []ACLResource{ | ||
{ | ||
ResourceType: ResourceTypeTopic, | ||
ResourceName: topic, | ||
PatternType: PatternTypeLiteral, | ||
ACLs: []ACLDescription{ | ||
{ | ||
Principal: "User:alice", | ||
Host: "*", | ||
Operation: ACLOperationTypeRead, | ||
PermissionType: ACLPermissionTypeAllow, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
assert.Equal(t, expectedDescribeResp, *describeResp) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this file to match the file name convention of the corresponding source code file https://github.com/segmentio/kafka-go/blob/main/createacls.go