Skip to content

Commit

Permalink
Differentiate between 0 and unrequested authorized ops
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Sep 25, 2023
1 parent adc0c4f commit 7205354
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 59 deletions.
9 changes: 6 additions & 3 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -8147,7 +8147,8 @@ const rd_kafka_Node_t **rd_kafka_TopicPartitionInfo_replicas(
* @param topicdesc The topic description.
* @param cntp is updated with authorized ACL operations count.
*
* @return The topic authorized operations.
* @return The topic authorized operations. Is NULL if operations were not
* requested.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p topicdesc object.
Expand Down Expand Up @@ -8242,7 +8243,8 @@ const rd_kafka_Node_t **rd_kafka_DescribeCluster_result_nodes(
* @param result The result of DescribeCluster.
* @param cntp is updated with authorized ACL operations count.
*
* @return The cluster authorized operations.
* @return The cluster authorized operations. Is NULL if operations were not
* requested.
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p result object.
*/
Expand Down Expand Up @@ -8509,7 +8511,8 @@ const char *rd_kafka_ConsumerGroupDescription_partition_assignor(
* @param grpdesc The group description.
* @param cntp is updated with authorized ACL operations count.
*
* @return The group authorized operations.
* @return The group authorized operations. Is NULL if operations were not
* requested.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p grpdesc object.
Expand Down
87 changes: 55 additions & 32 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -7107,20 +7107,20 @@ const rd_kafka_error_t **rd_kafka_ListConsumerGroups_result_errors(
*
* @param authorized_operations returned by RPC, containing operations encoded
* per-bit.
* @param cntp is set to the count of the operations.
* @returns rd_kafka_AclOperation_t *
* @param cntp is set to the count of the operations, or -1 if the operations
* were not requested.
* @returns rd_kafka_AclOperation_t *. May be NULL.
*/
static rd_kafka_AclOperation_t *
rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations,
size_t *cntp) {
rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations, int *cntp) {
rd_kafka_AclOperation_t i;
int j = 0;
int count = 0;
rd_kafka_AclOperation_t *operations = NULL;

/* In case of authorized_operations not requested, return NULL. */
if (authorized_operations < 0) {
*cntp = 0;
*cntp = -1;
return NULL;
}

Expand All @@ -7131,11 +7131,14 @@ rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations,
count += ((authorized_operations >> i) & 1);
*cntp = count;

if (!count)
return operations;
/* In case no operations exist, allocate 1 byte so that the returned
* pointer is non-NULL. A NULL pointer implies that authorized
* operations were not requested. */
if (count == 0)
return rd_malloc(1);

j = 0;
operations = rd_malloc(sizeof(rd_kafka_AclOperation_t) * count);
j = 0;
for (i = RD_KAFKA_ACL_OPERATION_READ; i < RD_KAFKA_ACL_OPERATION__CNT;
i++) {
if ((authorized_operations >> i) & 1) {
Expand All @@ -7147,6 +7150,38 @@ rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations,
return operations;
}

/**
* @brief Copy a list of rd_kafka_AclOperation_t.
*
* @param src Array of rd_kafka_AclOperation_t to copy from. May be NULL if
* authorized operations were not requested.
* @param authorized_operations_cnt Count of \p src. May be -1 if authorized
* operations were not requested.
* @returns Copy of \p src. May be NULL.
*/
static rd_kafka_AclOperation_t *
rd_kafka_AuthorizedOperations_copy(const rd_kafka_AclOperation_t *src,
int authorized_operations_cnt) {
size_t copy_bytes = 0;
rd_kafka_AclOperation_t *dst = NULL;

if (authorized_operations_cnt == -1 || src == NULL)
return NULL;

/* Allocate and copy 1 byte so that the returned pointer
* is non-NULL. A NULL pointer implies that authorized operations were
* not requested. */
if (authorized_operations_cnt == 0)
copy_bytes = 1;
else
copy_bytes =
sizeof(rd_kafka_AclOperation_t) * authorized_operations_cnt;

dst = rd_malloc(copy_bytes);
memcpy(dst, src, copy_bytes);
return dst;
}

/**
* @brief Create a new MemberDescription object. This object is used for
* creating a ConsumerGroupDescription.
Expand Down Expand Up @@ -7279,7 +7314,7 @@ rd_kafka_ConsumerGroupDescription_new(
const rd_list_t *members,
const char *partition_assignor,
const rd_kafka_AclOperation_t *authorized_operations,
size_t authorized_operations_cnt,
int authorized_operations_cnt,
rd_kafka_consumer_group_state_t state,
const rd_kafka_Node_t *coordinator,
rd_kafka_error_t *error) {
Expand All @@ -7300,14 +7335,8 @@ rd_kafka_ConsumerGroupDescription_new(
: rd_strdup(partition_assignor);

grpdesc->authorized_operations_cnt = authorized_operations_cnt;
if (authorized_operations_cnt) {
grpdesc->authorized_operations =
rd_malloc(sizeof(rd_kafka_AclOperation_t) *
authorized_operations_cnt);
memcpy(grpdesc->authorized_operations, authorized_operations,
sizeof(rd_kafka_AclOperation_t) *
authorized_operations_cnt);
}
grpdesc->authorized_operations = rd_kafka_AuthorizedOperations_copy(
authorized_operations, authorized_operations_cnt);

grpdesc->state = state;
if (coordinator != NULL)
Expand Down Expand Up @@ -7406,7 +7435,7 @@ const rd_kafka_AclOperation_t *
rd_kafka_ConsumerGroupDescription_authorized_operations(
const rd_kafka_ConsumerGroupDescription_t *grpdesc,
size_t *cntp) {
*cntp = grpdesc->authorized_operations_cnt;
*cntp = RD_MAX(grpdesc->authorized_operations_cnt, 0);
return grpdesc->authorized_operations;
}

Expand Down Expand Up @@ -7557,7 +7586,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
char *group_id = NULL, *group_state = NULL, *proto_type = NULL,
*proto = NULL, *host = NULL;
rd_kafka_AclOperation_t *operations = NULL;
size_t operation_cnt;
int operation_cnt;

api_version = rd_kafka_buf_ApiVersion(reply);
if (api_version >= 1) {
Expand All @@ -7579,7 +7608,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
node = rd_kafka_Node_new(nodeid, host, port, NULL);
while (cnt-- > 0) {
int16_t error_code;
int32_t authorized_operations = 0;
int32_t authorized_operations = -1;
rd_kafkap_str_t GroupId, GroupState, ProtocolType, ProtocolData;
rd_bool_t is_simple_consumer_group, is_consumer_protocol_type;
int32_t member_cnt;
Expand Down Expand Up @@ -7996,7 +8025,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
const rd_kafka_metadata_broker_internal_t *brokers_internal,
int broker_cnt,
const rd_kafka_AclOperation_t *authorized_operations,
size_t authorized_operations_cnt,
int authorized_operations_cnt,
rd_bool_t is_internal,
rd_kafka_error_t *error) {
rd_kafka_TopicDescription_t *topicdesc;
Expand All @@ -8009,14 +8038,8 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
topicdesc->error = rd_kafka_error_copy(error);

topicdesc->authorized_operations_cnt = authorized_operations_cnt;
if (authorized_operations_cnt) {
topicdesc->authorized_operations =
rd_malloc(sizeof(rd_kafka_AclOperation_t) *
authorized_operations_cnt);
memcpy(topicdesc->authorized_operations, authorized_operations,
sizeof(rd_kafka_AclOperation_t) *
authorized_operations_cnt);
}
topicdesc->authorized_operations = rd_kafka_AuthorizedOperations_copy(
authorized_operations, authorized_operations_cnt);

if (partitions) {
topicdesc->partitions =
Expand Down Expand Up @@ -8099,7 +8122,7 @@ const rd_kafka_TopicPartitionInfo_t **rd_kafka_TopicDescription_partitions(
const rd_kafka_AclOperation_t *rd_kafka_TopicDescription_authorized_operations(
const rd_kafka_TopicDescription_t *topicdesc,
size_t *cntp) {
*cntp = topicdesc->authorized_operations_cnt;
*cntp = RD_MAX(topicdesc->authorized_operations_cnt, 0);
return topicdesc->authorized_operations;
}

Expand Down Expand Up @@ -8211,7 +8234,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,

if (md->topics[i].err == RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_AclOperation_t *authorized_operations;
size_t authorized_operation_cnt;
int authorized_operation_cnt;
authorized_operations =
rd_kafka_AuthorizedOperations_parse(
mdi->topics[i].topic_authorized_operations,
Expand Down Expand Up @@ -8365,7 +8388,7 @@ rd_kafka_DescribeCluster_result_authorized_operations(
size_t *cntp) {
const rd_kafka_ClusterDescription_t *clusterdesc =
rd_kafka_DescribeCluster_result_description(result);
*cntp = clusterdesc->authorized_operations_cnt;
*cntp = RD_MAX(clusterdesc->authorized_operations_cnt, 0);
return clusterdesc->authorized_operations;
}

Expand Down
34 changes: 20 additions & 14 deletions src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,11 @@ struct rd_kafka_ConsumerGroupDescription_s {
rd_kafka_consumer_group_state_t state;
/** Consumer group coordinator. */
rd_kafka_Node_t *coordinator;
/** Count of operations allowed for topic.*/
size_t authorized_operations_cnt;
/** Operations allowed for topic. */
/** Count of operations allowed for topic. -1 indicates operations not
* requested.*/
int authorized_operations_cnt;
/** Operations allowed for topic. May be NULL if operations were not
* requested */
rd_kafka_AclOperation_t *authorized_operations;
/** Group specific error. */
rd_kafka_error_t *error;
Expand Down Expand Up @@ -527,11 +529,13 @@ struct rd_kafka_TopicDescription_s {
int partition_cnt; /**< Number of partitions in \p partitions*/
rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */
rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */
rd_kafka_error_t *error; /**< Topic error reported by broker */
size_t authorized_operations_cnt; /**< Count of operations allowed for
topic.*/
rd_kafka_error_t *error; /**< Topic error reported by broker */
int authorized_operations_cnt; /**< Count of operations allowed for
* topic. -1 indicates operations not
* requested. */
rd_kafka_AclOperation_t
*authorized_operations; /**< Operations allowed for topic. */
*authorized_operations; /**< Operations allowed for topic. May be
* NULL if operations were not requested */
};

/**@}*/
Expand All @@ -544,14 +548,16 @@ struct rd_kafka_TopicDescription_s {
* @struct DescribeCluster result - internal type.
*/
typedef struct rd_kafka_ClusterDescription_s {
char *cluster_id; /**< Cluster id */
rd_kafka_Node_t *controller; /**< Current controller. */
size_t node_cnt; /**< Count of brokers in the cluster. */
rd_kafka_Node_t **nodes; /**< Brokers in the cluster. */
size_t authorized_operations_cnt; /**< Count of operations allowed for
cluster.*/
char *cluster_id; /**< Cluster id */
rd_kafka_Node_t *controller; /**< Current controller. */
size_t node_cnt; /**< Count of brokers in the cluster. */
rd_kafka_Node_t **nodes; /**< Brokers in the cluster. */
int authorized_operations_cnt; /**< Count of operations allowed for
* cluster. -1 indicates operations not
* requested. */
rd_kafka_AclOperation_t
*authorized_operations; /**< Operations allowed for cluster. */
*authorized_operations; /**< Operations allowed for cluster. May be
* NULL if operations were not requested */

} rd_kafka_ClusterDescription_t;

Expand Down
41 changes: 31 additions & 10 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2987,11 +2987,16 @@ static void do_test_DescribeConsumerGroups(const char *what,
rd_kafka_ConsumerGroupDescription_error(act));
rd_kafka_consumer_group_state_t state =
rd_kafka_ConsumerGroupDescription_state(act);
rd_kafka_ConsumerGroupDescription_authorized_operations(
act, &authorized_operation_cnt);
const rd_kafka_AclOperation_t *authorized_operations =
rd_kafka_ConsumerGroupDescription_authorized_operations(
act, &authorized_operation_cnt);
TEST_ASSERT(
authorized_operation_cnt == 0,
"Authorized operations returned when not requested\n");
"Authorized operation count should be 0, is %" PRIusz,
authorized_operation_cnt);
TEST_ASSERT(
authorized_operations == NULL,
"Authorized operations should be NULL when not requested");
TEST_ASSERT(
strcmp(exp->group_id,
rd_kafka_ConsumerGroupDescription_group_id(act)) ==
Expand Down Expand Up @@ -3266,6 +3271,8 @@ static void do_test_DescribeTopics(const char *what,
"Expected partion id to be %d, got %d", 0,
rd_kafka_TopicPartitionInfo_partition(partitions[0]));

authorized_operations = rd_kafka_TopicDescription_authorized_operations(
result_topics[0], &authorized_operations_cnt);
if (include_authorized_operations) {
const rd_kafka_AclOperation_t expected[] = {
RD_KAFKA_ACL_OPERATION_ALTER,
Expand All @@ -3277,14 +3284,19 @@ static void do_test_DescribeTopics(const char *what,
RD_KAFKA_ACL_OPERATION_READ,
RD_KAFKA_ACL_OPERATION_WRITE};

authorized_operations =
rd_kafka_TopicDescription_authorized_operations(
result_topics[0], &authorized_operations_cnt);

test_match_authorized_operations(expected, 8,
authorized_operations,
authorized_operations_cnt);
} else {
TEST_ASSERT(
authorized_operations_cnt == 0,
"Authorized operation count should be 0, is %" PRIusz,
authorized_operations_cnt);
TEST_ASSERT(
authorized_operations == NULL,
"Authorized operations should be NULL when not requested");
}

rd_kafka_event_destroy(rkev);

/* If we don't have authentication/authorization set up in our
Expand Down Expand Up @@ -3481,6 +3493,9 @@ static void do_test_DescribeCluster(const char *what,
TEST_ASSERT(rd_kafka_Node_port(nodes[0]),
"Expected first node of cluster to have a port");

authorized_operations =
rd_kafka_DescribeCluster_result_authorized_operations(
res, &authorized_operations_cnt);
if (include_authorized_operations) {
const rd_kafka_AclOperation_t expected[] = {
RD_KAFKA_ACL_OPERATION_ALTER,
Expand All @@ -3490,12 +3505,18 @@ static void do_test_DescribeCluster(const char *what,
RD_KAFKA_ACL_OPERATION_DESCRIBE,
RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS,
RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE};
authorized_operations =
rd_kafka_DescribeCluster_result_authorized_operations(
res, &authorized_operations_cnt);

test_match_authorized_operations(expected, 7,
authorized_operations,
authorized_operations_cnt);
} else {
TEST_ASSERT(
authorized_operations_cnt == 0,
"Authorized operation count should be 0, is %" PRIusz,
authorized_operations_cnt);
TEST_ASSERT(
authorized_operations == NULL,
"Authorized operations should be NULL when not requested");
}

rd_kafka_event_destroy(rkev);
Expand Down

0 comments on commit 7205354

Please sign in to comment.