Skip to content

Commit

Permalink
Address comments: AuthorizedOperations_parse uses array
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Sep 4, 2023
1 parent 9a6ef10 commit 2c7f5e1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 86 deletions.
2 changes: 1 addition & 1 deletion examples/describe_topics.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static void print_node_info(const rd_kafka_Node_t *node) {

printf("\t\tNode [id: %" PRId32
", host: %s"
", port: %" PRIu16 ", rack %s ]\n",
", port: %" PRIu16 ", rack %s]\n",
rd_kafka_Node_id(node), rd_kafka_Node_host(node),
rd_kafka_Node_port(node), rd_kafka_Node_rack_id(node));
}
Expand Down
151 changes: 66 additions & 85 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -7104,28 +7104,44 @@ const rd_kafka_error_t **rd_kafka_ListConsumerGroups_result_errors(
* - DescribeConsumerGroups
* - DescribeTopics
* - DescribeCluster
* @returns list of authorized operations (rd_kafka_AclOperation_t *)
*
* @param authorized_operations returned by RPC, containing operations encoded
* per-bit.
* @param cntp is set to the count of the operations.
* @returns rd_kafka_AclOperation_t *
*/
static rd_list_t *
rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations) {
int i;
rd_list_t *authorized_operations_list = NULL;
static rd_kafka_AclOperation_t *
rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations,
size_t *cntp) {
rd_kafka_AclOperation_t i;
int j = 0;
int count = 0;
rd_kafka_AclOperation_t *operations;

/* In case of authorized_operations not requested, return NULL. */
if (authorized_operations < 0)
if (authorized_operations < 0) {
*cntp = 0;
return NULL;
}

/* Count number of bits set. ALL, ANY and UNKNOWN bits are skipped as
* they are always unset as per KIP-430. */
for (i = RD_KAFKA_ACL_OPERATION_READ; i < RD_KAFKA_ACL_OPERATION__CNT;
i++)
count += ((authorized_operations >> i) & 1);

authorized_operations_list = rd_list_new(0, rd_free);
for (i = 0; i < RD_KAFKA_ACL_OPERATION__CNT; i++) {
int bit = (authorized_operations >> i) & 1;
if (bit) {
rd_kafka_AclOperation_t *entry =
rd_malloc(sizeof(rd_kafka_AclOperation_t));
*entry = (rd_kafka_AclOperation_t)i;
rd_list_add(authorized_operations_list, entry);
j = 0;
operations = rd_malloc(sizeof(rd_kafka_AclOperation_t) * count);
for (i = RD_KAFKA_ACL_OPERATION_READ; i < RD_KAFKA_ACL_OPERATION__CNT;
i++) {
if ((authorized_operations >> i) & 1) {
operations[j] = i;
j++;
}
}
return authorized_operations_list;

*cntp = count;
return operations;
}

/**
Expand Down Expand Up @@ -7284,7 +7300,7 @@ rd_kafka_ConsumerGroupDescription_new(
if (authorized_operations_cnt) {
grpdesc->authorized_operations =
rd_malloc(sizeof(rd_kafka_AclOperation_t) *
grpdesc->authorized_operations_cnt);
authorized_operations_cnt);
memcpy(grpdesc->authorized_operations, authorized_operations,
sizeof(rd_kafka_AclOperation_t) *
authorized_operations_cnt);
Expand Down Expand Up @@ -7537,7 +7553,8 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_error_t *error = NULL;
char *group_id = NULL, *group_state = NULL, *proto_type = NULL,
*proto = NULL, *host = NULL;
rd_list_t *authorized_operations_list = NULL;
rd_kafka_AclOperation_t *operations = NULL;
size_t operation_cnt;

api_version = rd_kafka_buf_ApiVersion(reply);
if (api_version >= 1) {
Expand Down Expand Up @@ -7670,43 +7687,17 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,

if (api_version >= 3) {
rd_kafka_buf_read_i32(reply, &authorized_operations);
/* The least significant 3 bits are never set, as they
* represent the operations ALL, ANY and UNKNOWN. They
* will never be set by the broker as per KIP-430. */
rd_dassert(!((authorized_operations >> 0) & 1));
rd_dassert(!((authorized_operations >> 1) & 1));
rd_dassert(!((authorized_operations >> 2) & 1));

/* Authorized_operations is INT_MIN
* in case of not being requested, and the list is NULL
* that case. */
authorized_operations_list =
rd_kafka_AuthorizedOperations_parse(
authorized_operations);
operations = rd_kafka_AuthorizedOperations_parse(
authorized_operations, &operation_cnt);
}

if (error == NULL) {
rd_kafka_AclOperation_t *authorized_operations = NULL;
size_t authorized_operations_cnt = 0;

if (authorized_operations_list) {
int i;
authorized_operations_cnt =
rd_list_cnt(authorized_operations_list);
rd_kafka_AclOperation_t *acl_operation;
authorized_operations =
rd_alloca(sizeof(rd_kafka_AclOperation_t) *
authorized_operations_cnt);
RD_LIST_FOREACH(acl_operation,
authorized_operations_list, i) {
authorized_operations[i] =
*acl_operation;
}
}

grpdesc = rd_kafka_ConsumerGroupDescription_new(
group_id, is_simple_consumer_group, &members, proto,
authorized_operations, authorized_operations_cnt,
operations, operation_cnt,
rd_kafka_consumer_group_state_code(group_state),
node, error);
} else
Expand All @@ -7721,14 +7712,14 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
rd_free(proto_type);
rd_free(proto);
RD_IF_FREE(error, rd_kafka_error_destroy);
RD_IF_FREE(authorized_operations_list, rd_list_destroy);
RD_IF_FREE(operations, rd_free);

error = NULL;
group_id = NULL;
group_state = NULL;
proto_type = NULL;
proto = NULL;
authorized_operations_list = NULL;
error = NULL;
group_id = NULL;
group_state = NULL;
proto_type = NULL;
proto = NULL;
operations = NULL;
}

if (host)
Expand All @@ -7755,7 +7746,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_Node_destroy(node);
if (rko_result)
rd_kafka_op_destroy(rko_result);
RD_IF_FREE(authorized_operations_list, rd_list_destroy);
RD_IF_FREE(operations, rd_free);

rd_snprintf(
errstr, errstr_size,
Expand Down Expand Up @@ -8000,7 +7991,8 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
const struct rd_kafka_metadata_broker *brokers,
const rd_kafka_metadata_broker_internal_t *brokers_internal,
int broker_cnt,
const rd_list_t *authorized_operations,
const rd_kafka_AclOperation_t *authorized_operations,
size_t authorized_operations_cnt,
rd_bool_t is_internal,
rd_kafka_error_t *error) {
rd_kafka_TopicDescription_t *topicdesc;
Expand All @@ -8012,16 +8004,14 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
if (error)
topicdesc->error = rd_kafka_error_copy(error);

if (authorized_operations) {
rd_kafka_AclOperation_t *acl_operation;
topicdesc->authorized_operations_cnt =
rd_list_cnt(authorized_operations);
topicdesc->authorized_operations_cnt = authorized_operations_cnt;
if (authorized_operations_cnt) {
topicdesc->authorized_operations =
rd_malloc(sizeof(rd_kafka_AclOperation_t) *
topicdesc->authorized_operations_cnt);
RD_LIST_FOREACH(acl_operation, authorized_operations, i) {
topicdesc->authorized_operations[i] = *acl_operation;
}
authorized_operations_cnt);
memcpy(topicdesc->authorized_operations, authorized_operations,
sizeof(rd_kafka_AclOperation_t) *
authorized_operations_cnt);
}

if (partitions) {
Expand All @@ -8048,7 +8038,7 @@ static rd_kafka_TopicDescription_t *
rd_kafka_TopicDescription_new_error(const char *topic,
rd_kafka_error_t *error) {
return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, NULL, 0,
NULL, rd_false, error);
NULL, 0, rd_false, error);
}

static void
Expand Down Expand Up @@ -8213,19 +8203,22 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
for (i = 0; i < md->topic_cnt; i++) {
rd_kafka_TopicDescription_t *topicdesc = NULL;
if (md->topics[i].err == RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_list_t *authorized_operations;
rd_kafka_AclOperation_t *authorized_operations;
size_t authorized_operation_cnt;
authorized_operations =
rd_kafka_AuthorizedOperations_parse(
mdi->topics[i].topic_authorized_operations);
mdi->topics[i].topic_authorized_operations,
&authorized_operation_cnt);
topicdesc = rd_kafka_TopicDescription_new(
md->topics[i].topic, md->topics[i].partitions,
md->topics[i].partition_cnt, md->brokers,
mdi->brokers, md->broker_cnt, authorized_operations,
authorized_operation_cnt,
mdi->topics[i].is_internal, NULL);
RD_IF_FREE(authorized_operations, rd_list_destroy);
RD_IF_FREE(authorized_operations, rd_free);
} else {
rd_kafka_error_t *error = rd_kafka_error_new(
md->topics[i].err,
md->topics[i].err, "%s",
rd_kafka_err2str(md->topics[i].err));
topicdesc = rd_kafka_TopicDescription_new_error(
md->topics[i].topic, error);
Expand Down Expand Up @@ -8373,8 +8366,6 @@ rd_kafka_ClusterDescription_new(const rd_kafka_metadata_internal_t *mdi) {
const rd_kafka_metadata_t *md = &mdi->metadata;
rd_kafka_ClusterDescription_t *clusterdesc =
rd_calloc(1, sizeof(*clusterdesc));
rd_list_t *authorized_operations = rd_kafka_AuthorizedOperations_parse(
mdi->cluster_authorized_operations);
int i;

clusterdesc->cluster_id = rd_strdup(mdi->cluster_id);
Expand All @@ -8384,18 +8375,10 @@ rd_kafka_ClusterDescription_new(const rd_kafka_metadata_internal_t *mdi) {
mdi->controller_id, md->brokers, mdi->brokers,
md->broker_cnt);

if (authorized_operations) {
rd_kafka_AclOperation_t *acl_operation;
clusterdesc->authorized_operations_cnt =
rd_list_cnt(authorized_operations);
clusterdesc->authorized_operations =
rd_malloc(sizeof(rd_kafka_AclOperation_t) *
clusterdesc->authorized_operations_cnt);
RD_LIST_FOREACH(acl_operation, authorized_operations, i) {
clusterdesc->authorized_operations[i] = *acl_operation;
}
}
RD_IF_FREE(authorized_operations, rd_list_destroy);
clusterdesc->authorized_operations =
rd_kafka_AuthorizedOperations_parse(
mdi->cluster_authorized_operations,
&clusterdesc->authorized_operations_cnt);

clusterdesc->node_cnt = md->broker_cnt;
clusterdesc->nodes =
Expand All @@ -8413,9 +8396,7 @@ static void rd_kafka_ClusterDescription_destroy(
rd_kafka_ClusterDescription_t *clusterdesc) {
RD_IF_FREE(clusterdesc->cluster_id, rd_free);
RD_IF_FREE(clusterdesc->controller, rd_kafka_Node_free);

if (clusterdesc->authorized_operations_cnt)
rd_free(clusterdesc->authorized_operations);
RD_IF_FREE(clusterdesc->authorized_operations, rd_free);

if (clusterdesc->node_cnt) {
size_t i;
Expand Down

0 comments on commit 2c7f5e1

Please sign in to comment.