Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io.asyncioreactor: fix deprecated usages for working with python>=3.10 #271

Merged
merged 5 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.11.4", "3.12.0b4"]
python-version: ["3.8.17", "3.11.4", "3.12.0b4"]
event_loop_manager: ["libev", "asyncio", "asyncore"]
exclude:
- python-version: "3.12.0b4"
Expand Down
30 changes: 18 additions & 12 deletions cassandra/io/asyncioreactor.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In push function, self._loop.create_task is called and it's return value is ignored. While the tests may pass now, this code is not correct and this example is called out in docs as a source of bugs: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from cassandra.connection import Connection, ConnectionShutdown
import threading

from cassandra.connection import Connection, ConnectionShutdown
import sys
import asyncio
import logging
import os
Expand Down Expand Up @@ -41,13 +43,12 @@ def end(self):

def __init__(self, timeout, callback, loop):
delayed = self._call_delayed_coro(timeout=timeout,
callback=callback,
loop=loop)
callback=callback)
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)

@staticmethod
async def _call_delayed_coro(timeout, callback, loop):
await asyncio.sleep(timeout, loop=loop)
async def _call_delayed_coro(timeout, callback):
await asyncio.sleep(timeout)
return callback()

def __lt__(self, other):
Expand Down Expand Up @@ -89,9 +90,11 @@ def __init__(self, *args, **kwargs):

self._connect_socket()
self._socket.setblocking(0)

self._write_queue = asyncio.Queue(loop=self._loop)
self._write_queue_lock = asyncio.Lock(loop=self._loop)
loop_args = dict()
if sys.version_info[0] == 3 and sys.version_info[1] < 10:
loop_args['loop'] = self._loop
self._write_queue = asyncio.Queue(**loop_args)
self._write_queue_lock = asyncio.Lock(**loop_args)

# see initialize_reactor -- loop is running in a separate thread, so we
# have to use a threadsafe call
Expand All @@ -109,8 +112,11 @@ def initialize_reactor(cls):
if cls._pid != os.getpid():
cls._loop = None
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
try:
cls._loop = asyncio.get_running_loop()
except RuntimeError:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)

if not cls._loop_thread:
# daemonize so the loop will be shut down on interpreter
Expand Down Expand Up @@ -163,7 +169,7 @@ def push(self, data):
else:
chunks = [data]

if self._loop_thread.ident != get_ident():
if self._loop_thread != threading.current_thread():
asyncio.run_coroutine_threadsafe(
self._push_msg(chunks),
loop=self._loop
Expand All @@ -174,7 +180,7 @@ def push(self, data):

async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with await self._write_queue_lock:
async with self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)

Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ def is_windows():

notwindows = unittest.skipUnless(not is_windows(), "This test is not adequate for windows")
notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy")
notasyncio = unittest.skipUnless(not EVENT_LOOP_MANAGER == 'asyncio', "This tests is not suitable for EVENT_LOOP_MANAGER=asyncio")
5 changes: 2 additions & 3 deletions tests/integration/cqlengine/model/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,9 @@ class SensitiveModel(Model):
rows[-1]
rows[-1:]

# Asyncio complains loudly about old syntax on python 3.7+, so get rid of all of those
relevant_warnings = [warn for warn in w if "with (yield from lock)" not in str(warn.message)]
# ignore DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.')
relevant_warnings = [warn for warn in w if "The loop argument is deprecated" not in str(warn.message)]

self.assertEqual(len(relevant_warnings), 4)
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[0].message))
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[1].message))
self.assertIn("ModelQuerySet indexing with negative indices support will be removed in 4.0.",
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from cassandra import connection
from cassandra.connection import DefaultEndPoint

from tests import notwindows
from tests import notwindows, notasyncio
from tests.integration import use_cluster, get_server_versions, CASSANDRA_VERSION, \
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
get_unsupported_upper_protocol, lessthanprotocolv3, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \
Expand Down Expand Up @@ -1139,6 +1139,7 @@ def test_stale_connections_after_shutdown(self):
assert False, f'Found stale connections: {result.stdout}'

@notwindows
@notasyncio # asyncio can't do timeouts smaller than 1ms, as this test requires
def test_execute_query_timeout(self):
with TestCluster() as cluster:
session = cluster.connect(wait_for_all_pools=True)
Expand Down
Loading