Skip to content

Commit

Permalink
changes to describeCG tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jainruchir committed Mar 31, 2023
1 parent d2c902b commit c28152a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 30 deletions.
2 changes: 1 addition & 1 deletion examples/describe_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
0, " ");
}
}
printf("\n");
}
return 0;
}
Expand Down Expand Up @@ -410,6 +411,5 @@ int main(int argc, char **argv) {
}

cmd_describe_consumer_groups(conf, argc - optind, &argv[optind]);

return 0;
}
46 changes: 35 additions & 11 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6196,6 +6196,7 @@ rd_kafka_ConsumerGroupDescription_new(const char *group_id,
const rd_kafka_Node_t *coordinator,
rd_kafka_error_t *error) {
rd_kafka_ConsumerGroupDescription_t *grpdesc;
int i;
grpdesc = rd_calloc(1, sizeof(*grpdesc));
grpdesc->group_id = rd_strdup(group_id);
grpdesc->is_simple_consumer_group = is_simple_consumer_group;
Expand All @@ -6210,7 +6211,19 @@ rd_kafka_ConsumerGroupDescription_new(const char *group_id,
grpdesc->partition_assignor = !partition_assignor
? (char *)partition_assignor
: rd_strdup(partition_assignor);
grpdesc->authorized_operations = authorized_operations;
if(authorized_operations == NULL)
grpdesc->authorized_operations = authorized_operations;
else{
grpdesc->authorized_operations =
rd_list_new(rd_list_cnt(authorized_operations), rd_free);
for(i=0;i<rd_list_cnt(authorized_operations); i++){
int* entry = rd_list_elem(authorized_operations, i);
int* oper = rd_malloc(sizeof(int));
*oper = *entry;
rd_list_add(grpdesc->authorized_operations,
oper);
}
}
grpdesc->state = state;
if (coordinator != NULL)
grpdesc->coordinator = rd_kafka_Node_copy(coordinator);
Expand Down Expand Up @@ -6272,6 +6285,8 @@ static void rd_kafka_ConsumerGroupDescription_destroy(
rd_kafka_error_destroy(grpdesc->error);
if (grpdesc->coordinator)
rd_kafka_Node_destroy(grpdesc->coordinator);
if(likely(grpdesc->authorized_operations != NULL))
rd_list_destroy(grpdesc->authorized_operations);
rd_free(grpdesc);
}

Expand Down Expand Up @@ -6311,8 +6326,11 @@ size_t rd_kafka_ConsumerGroupDescription_authorized_operations_count(
int rd_kafka_ConsumerGroupDescription_authorized_operation(
const rd_kafka_ConsumerGroupDescription_t *grpdesc,
size_t idx) {
rd_kafka_AclOperation_t* entry = rd_list_elem(grpdesc->authorized_operations, idx);
return *entry;
if(grpdesc->authorized_operations){
rd_kafka_AclOperation_t* entry = rd_list_elem(grpdesc->authorized_operations, idx);
return *entry;
}
return 0;
}

rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupDescription_state(
Expand Down Expand Up @@ -6440,8 +6458,11 @@ static rd_kafka_resp_err_t rd_kafka_admin_DescribeConsumerGroupsRequest(
}

/**
* @brief Parse authorized_operations returned in DescribeConsumerGroups
* @returns array of rd_bool_t of size RD_KAFKA_ACL_OPERATIONS__CNT
* @brief Parse authorized_operations returned in
* - DescribeConsumerGroups
* - DescribeTopics
* - DescribeCluster
* @returns list of acl operations
*/
rd_list_t* rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations){
int i, bit;
Expand All @@ -6450,11 +6471,11 @@ rd_list_t* rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations){
/* in case of authorized_operations not requested, return NULL*/
if(authorized_operations<0)
return NULL;
authorized_operations_list = rd_list_new(0, NULL);
authorized_operations_list = rd_list_new(0, rd_free);
for(i=0; i<RD_KAFKA_ACL_OPERATION__CNT; i++){
bit = (authorized_operations >> i) & 1;
if(bit){
entry = malloc(sizeof(rd_kafka_AclOperation_t));
entry = rd_malloc(sizeof(rd_kafka_AclOperation_t));
*entry = i;
rd_list_add(authorized_operations_list, entry);
}
Expand Down Expand Up @@ -6503,7 +6524,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
while (cnt-- > 0) {
int16_t error_code;
int32_t authorized_operations = 0;
rd_list_t* authorized_operations_list;
rd_list_t* authorized_operations_list = NULL;
rd_kafkap_str_t GroupId, GroupState, ProtocolType, ProtocolData;
rd_bool_t is_simple_consumer_group, is_consumer_protocol_type;
int32_t member_cnt;
Expand Down Expand Up @@ -6613,14 +6634,15 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
}

if (api_version >= 3) {
/* TODO: implement KIP-430 */
rd_kafka_buf_read_i32(reply, &authorized_operations);
/* assert that the last 3 bits are never set*/
rd_assert(!((authorized_operations >> 0) & 1));
rd_assert(!((authorized_operations >> 1) & 1));
rd_assert(!((authorized_operations >> 2) & 1));
/* authorized_operations is -2147483648 in case of not requested, list has no elements in that case*/
authorized_operations_list = rd_kafka_AuthorizedOperations_parse(authorized_operations);
/* authorized_operations is -2147483648
* in case of not requested, list has no elements in that case*/
authorized_operations_list =
rd_kafka_AuthorizedOperations_parse(authorized_operations);
}

if (error == NULL) {
Expand All @@ -6629,6 +6651,8 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
authorized_operations_list,
rd_kafka_consumer_group_state_code(group_state),
node, error);
if(authorized_operations_list)
rd_list_destroy(authorized_operations_list);
} else {
grpdesc = rd_kafka_ConsumerGroupDescription_new_error(
group_id, error);
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,8 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
* with the groups (const char *) in \p groups.
* Uses \p max_ApiVersion as maximum API version,
* pass -1 to use the maximum available version.
* Uses \p include_authorized_operations to get
* group ACL authorized operations, 1 to request.
*
* The response (unparsed) will be enqueued on \p replyq
* for handling by \p resp_cb (with \p opaque passed).
Expand Down Expand Up @@ -2066,7 +2068,6 @@ rd_kafka_DescribeGroupsRequest(rd_kafka_broker_t *rkb,

/* write IncludeAuthorizedOperations */
if (ApiVersion >= 3) {
/* TODO: implement KIP-430 */
rd_kafka_buf_write_bool(rkbuf, include_authorized_operations);
}

Expand Down
16 changes: 15 additions & 1 deletion tests/0080-admin_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,10 @@ static void do_test_DescribeConsumerGroups(const char *what,
char errstr[512];
const char *errstr2;
rd_kafka_resp_err_t err;
rd_kafka_error_t *error;
test_timing_t timing;
rd_kafka_event_t *rkev;
const rd_kafka_DeleteGroups_result_t *res;
const rd_kafka_DescribeConsumerGroups_result_t *res;
const rd_kafka_ConsumerGroupDescription_t **resgroups;
size_t resgroup_cnt;
void *my_opaque = NULL, *opaque;
Expand All @@ -657,6 +658,15 @@ static void do_test_DescribeConsumerGroups(const char *what,
err = rd_kafka_AdminOptions_set_request_timeout(
options, exp_timeout, errstr, sizeof(errstr));
TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
if ((error =
rd_kafka_AdminOptions_set_include_authorized_operations(
options, 0))) {
fprintf(stderr,
"%% Failed to set require authorized operations: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
TEST_FAIL("Failed to set include authorized operations\n");
}

if (useq) {
my_opaque = (void *)456;
Expand Down Expand Up @@ -724,6 +734,10 @@ static void do_test_DescribeConsumerGroups(const char *what,
group_names[i],
rd_kafka_error_string(
rd_kafka_ConsumerGroupDescription_error(resgroups[i])));
TEST_ASSERT(
rd_kafka_ConsumerGroupDescription_authorized_operations_count(
resgroups[i]) == 0, "Got authorized operations"
"when not requested");
}

rd_kafka_event_destroy(rkev);
Expand Down
29 changes: 13 additions & 16 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,8 @@ static void do_test_DescribeConsumerGroups(const char *what,
rd_free(expected[i].group_id);
}

test_DeleteTopics_simple(rk, q, &topic, 1, NULL);

rd_free(topic);

if (options)
Expand Down Expand Up @@ -3465,11 +3467,11 @@ static void do_test_DescribeConsumerGroups_with_authorized_ops(const char *what,
TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
if ((error = rd_kafka_AdminOptions_set_include_authorized_operations(
options, 1))) {
fprintf(stderr,
"%% Failed to set require authorized operations: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
TEST_FAIL("Failed to set include authorized operations\n");
fprintf(stderr,
"%% Failed to set require authorized operations: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
TEST_FAIL("Failed to set include authorized operations\n");
}
}

Expand Down Expand Up @@ -3600,12 +3602,10 @@ static void do_test_DescribeConsumerGroups_with_authorized_ops(const char *what,
TEST_ASSERT(
rd_kafka_ConsumerGroupDescription_authorized_operations_count(act) != 0,
"Authorized operations not returned when requested\n");
for(j=0;j< rd_kafka_ConsumerGroupDescription_authorized_operations_count(act);j++){
acl_operation =
rd_kafka_ConsumerGroupDescription_authorized_operation(act,j);
TEST_SAY("%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
}
TEST_ASSERT(
rd_kafka_ConsumerGroupDescription_authorized_operations_count(act) < 3,
"Expected only READ and DESCRIBE operations after createAcl(), got DELETE"
"as well\n");
}

}
Expand Down Expand Up @@ -3645,6 +3645,8 @@ static void do_test_DescribeConsumerGroups_with_authorized_ops(const char *what,
rd_free(expected[i].group_id);
}

test_DeleteTopics_simple(rk, q, &topic, 1, NULL);

rd_free(topic);

if (options)
Expand Down Expand Up @@ -4468,11 +4470,6 @@ static void do_test_apis(rd_kafka_type_t cltype) {
do_test_unclean_destroy(cltype, 1 /*mainq*/);

test_conf_init(&conf, NULL, 180);
// test_conf_set(conf, "sasl.username", "broker");
// test_conf_set(conf, "sasl.password", "broker");
// test_conf_set(conf, "sasl.mechanism", "SCRAM-SHA-256");
// test_conf_set(conf, "security.protocol", "SASL_PLAINTEXT");
// test_conf_set(conf, "bootstrap.servers", "localhost:9092");
test_conf_set(conf, "socket.timeout.ms", "10000");
rk = test_create_handle(cltype, conf);

Expand Down

0 comments on commit c28152a

Please sign in to comment.