From 3e314e3726707851c288a027647f11e92dd35b18 Mon Sep 17 00:00:00 2001 From: Israel Fruchter Date: Thu, 12 Oct 2023 23:31:48 +0300 Subject: [PATCH 1/5] io.asyncioreactor: fix deprecated usages for working with python>=3.10 * stop using the loop argument for `asyncio.Lock` and asyncio.Quoue` * on the lock replace `with await` with `async with`, which is the correct syntax for using that lock --- cassandra/io/asyncioreactor.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index ab0e90ae09..6372ab398d 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -1,5 +1,5 @@ from cassandra.connection import Connection, ConnectionShutdown - +import sys import asyncio import logging import os @@ -89,9 +89,11 @@ def __init__(self, *args, **kwargs): self._connect_socket() self._socket.setblocking(0) - - self._write_queue = asyncio.Queue(loop=self._loop) - self._write_queue_lock = asyncio.Lock(loop=self._loop) + loop_args = dict() + if sys.version_info[0] == 3 and sys.version_info[1] < 10: + loop_args['loop'] = self._loop + self._write_queue = asyncio.Queue(**loop_args) + self._write_queue_lock = asyncio.Lock(**loop_args) # see initialize_reactor -- loop is running in a separate thread, so we # have to use a threadsafe call @@ -174,7 +176,7 @@ def push(self, data): async def _push_msg(self, chunks): # This lock ensures all chunks of a message are sequential in the Queue - with await self._write_queue_lock: + async with self._write_queue_lock: for chunk in chunks: self._write_queue.put_nowait(chunk) From 0e15955fe994f39eb4550aa1951fe253a490a400 Mon Sep 17 00:00:00 2001 From: Israel Fruchter Date: Thu, 12 Oct 2023 23:36:07 +0300 Subject: [PATCH 2/5] tests: ignore asyncio related warning in test_deprecation_warnings since python3.8 we have this warning: ``` DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.') ``` and it's o.k. to have it since on Python 3.10 and up, we stop using that argument --- tests/integration/cqlengine/model/test_model.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/cqlengine/model/test_model.py b/tests/integration/cqlengine/model/test_model.py index 859facf0e1..73096e1b5d 100644 --- a/tests/integration/cqlengine/model/test_model.py +++ b/tests/integration/cqlengine/model/test_model.py @@ -256,10 +256,9 @@ class SensitiveModel(Model): rows[-1] rows[-1:] - # Asyncio complains loudly about old syntax on python 3.7+, so get rid of all of those - relevant_warnings = [warn for warn in w if "with (yield from lock)" not in str(warn.message)] + # ignore DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.') + relevant_warnings = [warn for warn in w if "The loop argument is deprecated" not in str(warn.message)] - self.assertEqual(len(relevant_warnings), 4) self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[0].message)) self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[1].message)) self.assertIn("ModelQuerySet indexing with negative indices support will be removed in 4.0.", From f204a926fb54d19884b13dcdeea8956b34584128 Mon Sep 17 00:00:00 2001 From: Israel Fruchter Date: Wed, 18 Oct 2023 10:10:33 +0300 Subject: [PATCH 3/5] tests: skip `test_execute_query_timeout` if running with asyncio asyncio can't do timeouts smaller than 1ms, as this test requires it's a limitation of `asyncio.sleep` Fixes: https://github.com/scylladb/python-driver/issues/263 --- tests/__init__.py | 1 + tests/integration/standard/test_cluster.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/__init__.py b/tests/__init__.py index 2d19d29276..1d0d9fe34c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -105,3 +105,4 @@ def is_windows(): notwindows = unittest.skipUnless(not is_windows(), "This test is not adequate for windows") notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy") +notasyncio = unittest.skipUnless(not EVENT_LOOP_MANAGER == 'asyncio', "This tests is not suitable for EVENT_LOOP_MANAGER=asyncio") diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 43a1d080ee..36a54aedae 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -39,7 +39,7 @@ from cassandra import connection from cassandra.connection import DefaultEndPoint -from tests import notwindows +from tests import notwindows, notasyncio from tests.integration import use_cluster, get_server_versions, CASSANDRA_VERSION, \ execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \ get_unsupported_upper_protocol, lessthanprotocolv3, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \ @@ -1139,6 +1139,7 @@ def test_stale_connections_after_shutdown(self): assert False, f'Found stale connections: {result.stdout}' @notwindows + @notasyncio # asyncio can't do timeouts smaller than 1ms, as this test requires def test_execute_query_timeout(self): with TestCluster() as cluster: session = cluster.connect(wait_for_all_pools=True) From f764ac647db26ef13abed1fcc0c3b1eeace2d41f Mon Sep 17 00:00:00 2001 From: Israel Fruchter Date: Fri, 10 Nov 2023 01:33:08 +0200 Subject: [PATCH 4/5] asyncio: stop using the loop variable when not needed there are some places were we don't need to pass or create the asyncio loop, and we should avoid it --- cassandra/io/asyncioreactor.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 6372ab398d..fc02392511 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -1,3 +1,5 @@ +import threading + from cassandra.connection import Connection, ConnectionShutdown import sys import asyncio @@ -41,13 +43,12 @@ def end(self): def __init__(self, timeout, callback, loop): delayed = self._call_delayed_coro(timeout=timeout, - callback=callback, - loop=loop) + callback=callback) self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop) @staticmethod - async def _call_delayed_coro(timeout, callback, loop): - await asyncio.sleep(timeout, loop=loop) + async def _call_delayed_coro(timeout, callback): + await asyncio.sleep(timeout) return callback() def __lt__(self, other): @@ -111,8 +112,11 @@ def initialize_reactor(cls): if cls._pid != os.getpid(): cls._loop = None if cls._loop is None: - cls._loop = asyncio.new_event_loop() - asyncio.set_event_loop(cls._loop) + try: + cls._loop = asyncio.get_running_loop() + except RuntimeError: + cls._loop = asyncio.new_event_loop() + asyncio.set_event_loop(cls._loop) if not cls._loop_thread: # daemonize so the loop will be shut down on interpreter @@ -165,7 +169,7 @@ def push(self, data): else: chunks = [data] - if self._loop_thread.ident != get_ident(): + if self._loop_thread != threading.current_thread(): asyncio.run_coroutine_threadsafe( self._push_msg(chunks), loop=self._loop From a2b5e9f2775e90b69683daeee45940624a1d51a4 Mon Sep 17 00:00:00 2001 From: Israel Fruchter Date: Fri, 10 Nov 2023 01:34:02 +0200 Subject: [PATCH 5/5] CI: add integration tests for python3.8 --- .github/workflows/integration-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 35463078fe..a8ee628a8d 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.11.4", "3.12.0b4"] + python-version: ["3.8.17", "3.11.4", "3.12.0b4"] event_loop_manager: ["libev", "asyncio", "asyncore"] exclude: - python-version: "3.12.0b4"