Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] uamqp switch support #25494

Merged
merged 25 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c546a02
add shared connection back into conn manager
swathipil Aug 1, 2022
a5f0a55
add uamqp switch changes
swathipil Aug 1, 2022
76e11d1
sync tests
swathipil Aug 1, 2022
5622e5b
update changelog
swathipil Aug 1, 2022
8de034d
fix reconn test
swathipil Aug 1, 2022
9625aad
add switch to async
swathipil Aug 2, 2022
0a6d1b8
fix bugs
swathipil Aug 2, 2022
4fbe9db
update consumer code to fix tests
swathipil Aug 2, 2022
0711b86
update conftest with uamqp_transport fixture
swathipil Aug 2, 2022
28a14a0
lint + mypy
swathipil Aug 2, 2022
bef9257
Merge branch 'main' into swathipil/eh/uamqp-switch-support
swathipil Aug 3, 2022
f9e9a28
address Anna/Libba/Kashifs comments
swathipil Aug 9, 2022
fb42f69
Merge branch 'swathipil/eh/uamqp-switch-support' of https://github.co…
swathipil Aug 9, 2022
e19a0d9
Annas comments
swathipil Aug 15, 2022
db695be
remove kwargs from EventDataBatch
swathipil Aug 15, 2022
792bd78
fix bugs
swathipil Aug 15, 2022
10fc8dd
Merge branch 'main' into swathipil/eh/uamqp-switch-support
swathipil Aug 15, 2022
2e4437d
fix lint, mypy, errors
swathipil Aug 15, 2022
ef68450
update tests to take uamqp TransportType as well
swathipil Aug 16, 2022
0ee220c
merge main
swathipil Aug 17, 2022
b2a5914
update changelog
swathipil Aug 17, 2022
40933b3
update uamqp min dep + release date
swathipil Aug 18, 2022
7e90936
Merge branch 'main' into swathipil/eh/uamqp-switch-support
swathipil Aug 18, 2022
73fdaa8
update message prop back to ivar
swathipil Aug 18, 2022
f05f362
set message ivar when creating ED._from_message
swathipil Aug 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Release History

## 5.10.1 (Unreleased)

### Bugs Fixed

### Other Changes

- Internal refactoring to support upcoming Pure Python AMQP-based release.
swathipil marked this conversation as resolved.
Show resolved Hide resolved

## 5.10.0 (2022-06-08)

### Features Added
Expand Down
4 changes: 1 addition & 3 deletions sdk/eventhub/azure-eventhub/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from uamqp import constants
from ._common import EventData, EventDataBatch
from ._version import VERSION

__version__ = VERSION

from ._constants import TransportType
from ._producer_client import EventHubProducerClient
from ._consumer_client import EventHubConsumerClient
from ._client_base import EventHubSharedKeyCredential
Expand All @@ -19,8 +19,6 @@
EventHubConnectionStringProperties,
)

TransportType = constants.TransportType

__all__ = [
"EventData",
"EventDataBatch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import time
import queue
import logging
Expand All @@ -14,6 +15,7 @@
from ..exceptions import OperationTimeoutError

if TYPE_CHECKING:
from .._transport._base import AmqpTransport
from .._producer_client import SendEventTypes

_LOGGER = logging.getLogger(__name__)
Expand All @@ -31,7 +33,8 @@ def __init__(
executor: ThreadPoolExecutor,
*,
max_wait_time: float = 1,
max_buffer_length: int
max_buffer_length: int,
amqp_transport: AmqpTransport
):
self._buffered_queue: queue.Queue = queue.Queue()
self._max_buffer_len = max_buffer_length
Expand All @@ -47,11 +50,12 @@ def __init__(
self._cur_batch: Optional[EventDataBatch] = None
self._max_message_size_on_link = max_message_size_on_link
self._check_max_wait_time_future = None
self._amqp_transport = amqp_transport
self.partition_id = partition_id

def start(self):
with self._lock:
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
self._running = True
if self._max_wait_time:
self._last_send_time = time.time()
Expand Down Expand Up @@ -111,11 +115,11 @@ def put_events(self, events, timeout_time=None):
self._buffered_queue.put(self._cur_batch)
self._buffered_queue.put(events)
# create a new batch for incoming events
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
except ValueError:
# add single event exceeds the cur batch size, create new batch
self._buffered_queue.put(self._cur_batch)
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
self._cur_batch.add(events)
self._cur_buffered_len += new_events_len

Expand Down Expand Up @@ -182,7 +186,7 @@ def flush(self, timeout_time=None, raise_error=True):
break
# after finishing flushing, reset cur batch and put it into the buffer
self._last_send_time = time.time()
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
_LOGGER.info("Partition %r finished flushing.", self.partition_id)

def check_max_wait_time_worker(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import logging
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -14,6 +15,7 @@

if TYPE_CHECKING:
from .._producer_client import SendEventTypes
from .._transport._base import AmqpTransport

_LOGGER = logging.getLogger(__name__)

Expand All @@ -31,7 +33,8 @@ def __init__(
*,
max_buffer_length: int = 1500,
max_wait_time: float = 1,
executor: Optional[Union[ThreadPoolExecutor, int]] = None
executor: Optional[Union[ThreadPoolExecutor, int]] = None,
amqp_transport: AmqpTransport
swathipil marked this conversation as resolved.
Show resolved Hide resolved
):
self._buffered_producers: Dict[str, BufferedProducer] = {}
self._partition_ids: List[str] = partitions
Expand All @@ -45,6 +48,7 @@ def __init__(
self._max_wait_time = max_wait_time
self._max_buffer_length = max_buffer_length
self._existing_executor = False
self._amqp_transport = amqp_transport

if not executor:
self._executor = ThreadPoolExecutor()
Expand Down Expand Up @@ -86,6 +90,7 @@ def enqueue_events(
executor=self._executor,
max_wait_time=self._max_wait_time,
max_buffer_length=self._max_buffer_length,
amqp_transport=self._amqp_transport
)
buffered_producer.start()
self._buffered_producers[pid] = buffered_producer
Expand Down
Loading