Skip to content

Commit

Permalink
Retrieve jwt token from token provider
Browse files Browse the repository at this point in the history
  • Loading branch information
jliunyu committed Sep 27, 2021
1 parent 0b27423 commit 04dc84c
Show file tree
Hide file tree
Showing 8 changed files with 608 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ if(WITH_SASL_OAUTHBEARER)
list(APPEND sources rdkafka_sasl_oauthbearer.c)
endif()

if(WITH_CURL)
list(APPEND sources rdkafka_sasl_oauthbearer_oidc.c)
endif()

if(WITH_ZLIB)
list(APPEND sources rdgz.c)
endif()
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ SRCS_$(WITH_ZSTD) += rdkafka_zstd.c
SRCS_$(WITH_HDRHISTOGRAM) += rdhdrhistogram.c
SRCS_$(WITH_SSL) += rdkafka_ssl.c
SRCS_$(WITH_CURL) += rdhttp.c
SRCS_$(WITH_CURL) += rdkafka_sasl_oauthbearer_oidc.c

SRCS_LZ4 = rdxxhash.c
ifneq ($(WITH_LZ4_EXT), y)
Expand Down
149 changes: 149 additions & 0 deletions src/rdhttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,155 @@ rd_http_error_t *rd_http_get (const char *url, rd_buf_t **rbufp) {
return NULL;
}

/**
* @brief Extract the jwt from \p *cjson.
*
* @returns The token will be responsed in \p *res.
* Returns NULL on success or an error
* object on error - this error object must be destroyed
* by calling rd_http_error_destroy().
*/
rd_http_error_t *rd_http_extract_jwt_from_json (cJSON **cjson,
char **res,
const char *key) {
char *token;
cJSON *jval;
cJSON *parsed_token = NULL;
rd_http_error_t *herr;

cJSON_ArrayForEach(jval, *cjson) {
parsed_token = cJSON_GetObjectItem(*cjson, key);
break;
}
if (parsed_token == NULL) {
herr = rd_http_error_new(-1,
"Expected non-empty JSON response");
return herr;
}

token = cJSON_Print(parsed_token);
*res = rd_malloc(strlen(token) + 1);
strcpy(*res, token);
rd_free(token);
return NULL;
}


/**
* @brief Extract the json string that includes jwt from \p hreq.
*
* @returns Returns the response (even if there's a HTTP error code returned)
* in \p *json.
*/
rd_http_error_t *rd_http_parse_token_to_json (rd_http_req_t *hreq,
cJSON **jsonp) {
size_t len = 0;
char *raw_json = NULL;
const char *end = NULL;
rd_slice_t slice;
rd_http_error_t *herr = NULL;
*jsonp = NULL;

/* cJSON requires the entire input to parse in contiguous memory. */
rd_slice_init_full(&slice, hreq->hreq_buf);
len = rd_buf_len(hreq->hreq_buf);

raw_json = rd_malloc(len + 1);
rd_slice_read(&slice, raw_json, len);
raw_json[len] = '\0';

/* Parse JSON */
end = NULL;
*jsonp = cJSON_ParseWithOpts(raw_json, &end, 0);

if (!*jsonp && !herr)
herr = rd_http_error_new(hreq->hreq_code,
"Failed to parse JSON response "
"at %"PRIusz"/%"PRIusz,
(size_t)(end - raw_json), len);
rd_free(raw_json);
rd_http_req_destroy(hreq);
return herr;
}


/**
* @brief Perform a blocking HTTP(S) request to \p url with HTTP(S)
* headers and data with 20s timeout.
* If the HTTP(S) request fails, will retry another \p retry times.
*
* @returns The result will be responsed in \p *jsonp.
* Returns NULL on success (HTTP response code < 400), or an error
* object on transport or HTTP error - this error object must be
* destroyed by calling rd_http_error_destroy().
*
* @locality Any thread.
*/
rd_http_error_t
*rd_http_post_json (const char *url,
struct curl_slist **headers,
const char *data_to_token,
const size_t data_to_token_size,
const long timeout,
const size_t retry,
cJSON **jsonp) {
rd_http_error_t *herr;
rd_http_req_t hreq;
size_t i;
size_t len;
const char *content_type;

herr = rd_http_req_init(&hreq, url);
if (unlikely(herr != NULL))
return herr;

curl_easy_setopt(hreq.hreq_curl, CURLOPT_HTTPHEADER, *headers);
curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT, timeout);

curl_easy_setopt(hreq.hreq_curl,
CURLOPT_POSTFIELDSIZE,
data_to_token_size);
curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDS, data_to_token);

for (i = 0; i <= retry; i++) {
herr = rd_http_req_perform_sync(&hreq);
len = rd_buf_len(hreq.hreq_buf);
if (!herr && len > 0) {
break;
}
}

if (herr && len == 0) {
rd_http_req_destroy(&hreq);
return herr;
}

if (len == 0) {
/* Empty response: create empty JSON object */
*jsonp = cJSON_CreateObject();
rd_http_req_destroy(&hreq);
return NULL;
}

content_type = rd_http_req_get_content_type(&hreq);

if (!content_type ||
rd_strncasecmp(content_type,
"application/json", strlen("application/json"))) {
if (!herr)
herr = rd_http_error_new(
hreq.hreq_code,
"Response is not JSON encoded: %s",
content_type ? content_type : "(n/a)");
rd_http_req_destroy(&hreq);
return herr;
}

herr = rd_http_parse_token_to_json(&hreq, &*jsonp);

return herr;
}


/**
* @brief Same as rd_http_get() but requires a JSON response.
Expand Down
15 changes: 14 additions & 1 deletion src/rdhttp.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,22 @@ typedef struct rd_http_req_s {
* write to. */
} rd_http_req_t;

static void rd_http_req_destroy (rd_http_req_t *hreq);
rd_http_error_t *rd_http_req_init (rd_http_req_t *hreq, const char *url);
rd_http_error_t *rd_http_req_perform_sync (rd_http_req_t *hreq);
rd_http_error_t *rd_http_parse_token_to_json (rd_http_req_t *hreq,
cJSON **jsonp);
rd_http_error_t *rd_http_extract_jwt_from_json (cJSON **cjson,
char **jwt_token,
const char *key);
rd_http_error_t
*rd_http_post_json (const char *url,
struct curl_slist **headers,
const char *data_to_token,
const size_t data_to_token_size,
const long timeout,
const size_t retry,
cJSON **jsonp);

#endif


Expand Down
Loading

0 comments on commit 04dc84c

Please sign in to comment.