Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pyamqp] Pyampq debug build Linkedin #25296

Merged
6 changes: 4 additions & 2 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# Release History

## 5.8.0b5 (Unreleased)
## 5.8.0b5 (2022-07-19)
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved

### Features Added

### Breaking Changes

### Bugs Fixed

- Bug fixes that would prevent token refresh at regular intervals
- Pass in the proper kwarg, so that debug levels will be output when requested
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved
### Other Changes
- Specific logging added in to track error scenarios
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved

## 5.8.0a4 (2022-06-07)

Expand Down
16 changes: 14 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import functools
import collections
from typing import Any, Dict, Tuple, List, Optional, TYPE_CHECKING, cast, Union
from datetime import timedelta
from datetime import timedelta, datetime
from urllib.parse import urlparse
import six

Expand Down Expand Up @@ -386,7 +386,19 @@ def _management_request(self, mgmt_msg, op_type):
mgmt_client.open()
while not mgmt_client.client_ready():
time.sleep(0.05)
mgmt_msg.application_properties["security_token"] = mgmt_auth.get_token()
access_token = mgmt_auth.get_token()

if not access_token:
_LOGGER.info("Management client received an access token object")
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved

elif not access_token.token:
_LOGGER.info("Management client received an empty token")

else:
_LOGGER.info(f"Management client token expires on: {datetime.fromtimestamp(access_token.expires_on)}")
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved

mgmt_msg.application_properties["security_token"] = access_token.token

response = mgmt_client.mgmt_request(
mgmt_msg,
operation=READ_OPERATION.decode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __exit__(self, *args):
def _get_partitions(self):
# type: () -> None
if not self._partition_ids:
_LOGGER.info("Populating partition IDs so producers can be started.")
self._partition_ids = self.get_partition_ids() # type: ignore
for p_id in cast(List[str], self._partition_ids):
self._producers[p_id] = None
Expand Down Expand Up @@ -311,7 +312,9 @@ def send_batch(self, event_data_batch, **kwargs):
cast(EventHubProducer, self._producers[partition_id]).send(
to_send_batch, timeout=send_timeout
)
except (KeyError, AttributeError, EventHubError):
except (KeyError, AttributeError, EventHubError) as e:
_LOGGER.info(
"Producer for partition ID '{}' not available: {}. Rebuilding new producer.".format(partition_id, e))
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
self._start_producer(partition_id, send_timeout)
cast(EventHubProducer, self._producers[partition_id]).send(
to_send_batch, timeout=send_timeout
Expand Down Expand Up @@ -431,6 +434,7 @@ def close(self):
:caption: Close down the client.

"""
_LOGGER.info("Closing ProducerClient")
with self._lock:
for pid in self._producers:
if self._producers[pid]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ async def close(self):
async def update_token(self):
self.auth_state = CbsAuthState.IN_PROGRESS
access_token = await self._auth.get_token()
if not access_token.token:
_LOGGER.debug("update_token received an empty token")
self._expires_on = access_token.expires_on
expires_in = self._expires_on - int(utc_now().timestamp())
self._refresh_window = int(float(expires_in) * 0.1)
Expand Down
8 changes: 8 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/cbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,15 @@ def _on_execute_operation_complete(

def _update_status(self):
if self.auth_state == CbsAuthState.OK or self.auth_state == CbsAuthState.REFRESH_REQUIRED:
_LOGGER.info('update_status In refresh required or OK.')
is_expired, is_refresh_required = check_expiration_and_refresh_status(self._expires_on, self._refresh_window)
_LOGGER.info('is expired == %r, is refresh required == %r', is_expired, is_refresh_required)
if is_expired:
self.auth_state = CbsAuthState.EXPIRED
elif is_refresh_required:
self.auth_state = CbsAuthState.REFRESH_REQUIRED
elif self.auth_state == CbsAuthState.IN_PROGRESS:
_LOGGER.info('In update status, in progress. token put time: %r', self._token_put_time)
put_timeout = check_put_timeout_status(self._auth_timeout, self._token_put_time)
if put_timeout:
self.auth_state = CbsAuthState.TIMEOUT
Expand Down Expand Up @@ -186,7 +189,12 @@ def close(self):
def update_token(self):
self.auth_state = CbsAuthState.IN_PROGRESS
access_token = self._auth.get_token()
if not access_token.token:
_LOGGER.info("Update_token received an empty token")
self._expires_on = access_token.expires_on
_LOGGER.info('Update_token after token has been updated')
_LOGGER.info('Current time: %r', datetime.now())
_LOGGER.info('Token expiry: %r', datetime.fromtimestamp(self._expires_on))
expires_in = self._expires_on - int(utc_now().timestamp())
self._refresh_window = int(float(expires_in) * 0.1)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,12 @@ async def _management_request_async(self, mgmt_msg: Message, op_type: bytes) ->
await mgmt_client.open_async()
while not (await mgmt_client.client_ready_async()):
await asyncio.sleep(0.05)
mgmt_msg.application_properties["security_token"] = await mgmt_auth.get_token()
access_token = await mgmt_auth.get_token()
mgmt_msg.application_properties["security_token"] = access_token.token

if not access_token.token:
_LOGGER.info("update_token received an empty token")

response = await mgmt_client.mgmt_request_async(
mgmt_msg,
operation=READ_OPERATION.decode(),
Expand Down
9 changes: 7 additions & 2 deletions sdk/eventhub/azure-eventhub/samples/sync_samples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
Examples to show sending events with different options to an Event Hub partition.
"""

import logging
import sys
import time
import os
from tkinter.tix import Tree
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError
from azure.identity import DefaultAzureCredential

CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']


def send_event_data_batch(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
Expand Down Expand Up @@ -82,11 +85,13 @@ def send_event_data_list(producer):
print("Sending error: ", eh_err)



producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)


start_time = time.time()
with producer:
send_event_data_batch(producer)
Expand All @@ -96,4 +101,4 @@ def send_event_data_list(producer):
send_event_data_batch_with_properties(producer)
send_event_data_list(producer)

print("Send messages in {} seconds.".format(time.time() - start_time))
print("Send messages in {} seconds.".format(time.time() - start_time))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like no changes were made to this sample, so you can probably revert it? I think the CI pylint might fail since this file doesn't have a newline at the end.