From c06f18d6c41835be16075abb7f3031d3010e699c Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Tue, 31 Oct 2023 13:11:52 -0500 Subject: [PATCH 1/4] Work in progress --- cassandra/cluster.py | 16 ++++++++++++---- cassandra/io/asyncorereactor.py | 9 ++++++++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 6514838050..21067f028f 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -128,6 +128,15 @@ def _is_gevent_monkey_patched(): import gevent.socket return socket.socket is gevent.socket.socket +def try_import_libev(): + try: + from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA + return True + except ImportError: + return False + +def try_import_asyncore(): + from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA # default to gevent when we are monkey patched with gevent, eventlet when # monkey patched with eventlet, otherwise if libev is available, use that as @@ -136,11 +145,10 @@ def _is_gevent_monkey_patched(): from cassandra.io.geventreactor import GeventConnection as DefaultConnection elif _is_eventlet_monkey_patched(): from cassandra.io.eventletreactor import EventletConnection as DefaultConnection +elif try_import_libev(): + pass else: - try: - from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA - except ImportError: - from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA + try_import_asyncore(); # Forces load of utf8 encoding module to avoid deadlock that occurs # if code that is being imported tries to import the module in a seperate diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index a45d657828..f737a033ca 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -30,7 +30,14 @@ except ImportError: from cassandra.util import WeakSet # noqa -import asyncore +try: + import asyncore +except ModuleNotFoundError: + raise ImportError( + "Unable to import asyncore module. Note that this module has been removed in Python 3.12 " + "so when using the driver with this version (or anything newer) you will need to use on of the " + "other event loop implementations." + ) from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING, Timer, TimerManager From 79d1de7d7e857e279cb4f514711ef3110c2bdbce Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Mon, 6 Nov 2023 15:21:56 -0600 Subject: [PATCH 2/4] Implementation of warning messages which appears to work with 3.12 --- cassandra/__init__.py | 16 ++++ cassandra/cluster.py | 75 ++++++++++++------- cassandra/io/asyncorereactor.py | 5 +- cassandra/io/libevreactor.py | 4 +- tests/__init__.py | 7 +- tests/integration/standard/test_connection.py | 13 ++-- 6 files changed, 82 insertions(+), 38 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index b048bd9358..4398c86f69 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -728,3 +728,19 @@ class UnresolvableContactPoints(DriverException): contact points, only when lookup fails for all hosts """ pass + +class DependencyException(Exception): + """ + Specific exception class for handling issues with driver dependencies + """ + + excs = [] + """ + A sequence of child exceptions + """ + + def __init__(self, msg, excs=[]): + complete_msg = msg + if excs: + complete_msg += ("The following exceptions were observed: \n" + '\n'.join(str(e) for e in excs)) + Exception.__init__(self, complete_msg) \ No newline at end of file diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 21067f028f..e0c09ca64f 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -24,7 +24,7 @@ from collections.abc import Mapping from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures from copy import copy -from functools import partial, wraps +from functools import partial, reduce, wraps from itertools import groupby, count, chain import json import logging @@ -44,7 +44,7 @@ from cassandra import (ConsistencyLevel, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, SchemaTargetType, DriverException, ProtocolVersion, - UnresolvableContactPoints) + UnresolvableContactPoints, DependencyException) from cassandra.auth import _proxy_execute_key, PlainTextAuthProvider from cassandra.connection import (ConnectionException, ConnectionShutdown, ConnectionHeartbeat, ProtocolVersionUnsupported, @@ -111,6 +111,19 @@ except ImportError: from cassandra.util import WeakSet # NOQA +def _is_gevent_monkey_patched(): + if 'gevent.monkey' not in sys.modules: + return False + import gevent.socket + return socket.socket is gevent.socket.socket + +def _try_gevent_import(): + if _is_gevent_monkey_patched(): + from cassandra.io.geventreactor import GeventConnection + return (GeventConnection,None) + else: + return (None,None) + def _is_eventlet_monkey_patched(): if 'eventlet.patcher' not in sys.modules: return False @@ -121,34 +134,42 @@ def _is_eventlet_monkey_patched(): except AttributeError: return False +def _try_eventlet_import(): + if _is_eventlet_monkey_patched(): + from cassandra.io.eventletreactor import EventletConnection + return (EventletConnection,None) + else: + return (None,None) -def _is_gevent_monkey_patched(): - if 'gevent.monkey' not in sys.modules: - return False - import gevent.socket - return socket.socket is gevent.socket.socket - -def try_import_libev(): +def _try_libev_import(): try: - from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA - return True - except ImportError: - return False + from cassandra.io.libevreactor import LibevConnection + return (LibevConnection,None) + except DependencyException as e: + return (None, e) -def try_import_asyncore(): - from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA - -# default to gevent when we are monkey patched with gevent, eventlet when -# monkey patched with eventlet, otherwise if libev is available, use that as -# the default because it's fastest. Otherwise, use asyncore. -if _is_gevent_monkey_patched(): - from cassandra.io.geventreactor import GeventConnection as DefaultConnection -elif _is_eventlet_monkey_patched(): - from cassandra.io.eventletreactor import EventletConnection as DefaultConnection -elif try_import_libev(): - pass -else: - try_import_asyncore(); +def _try_asyncore_import(): + try: + from cassandra.io.asyncorereactor import AsyncoreConnection + return (AsyncoreConnection,None) + except DependencyException as e: + return (None, e) + +def _connection_reduce_fn(val,import_fn): + (rv, excs) = val + # If we've already found a workable Connection class return immediately + if rv: + return val + (import_result, exc) = import_fn() + if exc: + excs.append(exc) + return (rv or import_result, excs) + +conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import) +(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[])) +if excs: + raise DependencyException("Exception loading connection class dependencies", excs) +DefaultConnection = conn_class # Forces load of utf8 encoding module to avoid deadlock that occurs # if code that is being imported tries to import the module in a seperate diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index f737a033ca..a50b719c5d 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -30,12 +30,13 @@ except ImportError: from cassandra.util import WeakSet # noqa +from cassandra import DependencyException try: import asyncore except ModuleNotFoundError: - raise ImportError( + raise DependencyException( "Unable to import asyncore module. Note that this module has been removed in Python 3.12 " - "so when using the driver with this version (or anything newer) you will need to use on of the " + "so when using the driver with this version (or anything newer) you will need to use one of the " "other event loop implementations." ) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 484690da89..4d4098ca7b 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -21,13 +21,13 @@ from threading import Lock, Thread import time - +from cassandra import DependencyException from cassandra.connection import (Connection, ConnectionShutdown, NONBLOCKING, Timer, TimerManager) try: import cassandra.io.libevwrapper as libev except ImportError: - raise ImportError( + raise DependencyException( "The C extension needed to use libev was not found. This " "probably means that you didn't have the required build dependencies " "when installing the driver. See " diff --git a/tests/__init__.py b/tests/__init__.py index 48c589c424..90e4a14bec 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,6 +20,8 @@ import os from concurrent.futures import ThreadPoolExecutor +from cassandra import DependencyException + log = logging.getLogger() log.setLevel('DEBUG') # if nose didn't already attach a log handler, add one here @@ -86,17 +88,18 @@ def is_monkey_patched(): elif "asyncio" in EVENT_LOOP_MANAGER: from cassandra.io.asyncioreactor import AsyncioConnection connection_class = AsyncioConnection - else: + log.debug("Using default event loop (libev)") try: from cassandra.io.libevreactor import LibevConnection connection_class = LibevConnection - except ImportError as e: + except DependencyException as e: log.debug('Could not import LibevConnection, ' 'using connection_class=None; ' 'failed with error:\n {}'.format( repr(e) )) + log.debug("Will attempt to set connection class at cluster initialization") connection_class = None diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index 3323baf20b..761a5505f9 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -23,12 +23,9 @@ import time from unittest import SkipTest -from cassandra import ConsistencyLevel, OperationTimedOut +from cassandra import ConsistencyLevel, OperationTimedOut, DependencyException from cassandra.cluster import NoHostAvailable, ConnectionShutdown, ExecutionProfile, EXEC_PROFILE_DEFAULT -import cassandra.io.asyncorereactor -from cassandra.io.asyncorereactor import AsyncoreConnection from cassandra.protocol import QueryMessage -from cassandra.connection import Connection from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, HostStateListener from cassandra.pool import HostConnectionPool @@ -36,10 +33,16 @@ from tests.integration import use_singledc, get_node, CASSANDRA_IP, local, \ requiresmallclockgranularity, greaterthancass20, TestCluster +try: + import cassandra.io.asyncorereactor + from cassandra.io.asyncorereactor import AsyncoreConnection +except DependencyException: + AsyncoreConnection = None + try: from cassandra.io.libevreactor import LibevConnection import cassandra.io.libevreactor -except ImportError: +except DependencyException: LibevConnection = None From f971249e4f5364efa2ac38675a1ea34a5b0ec670 Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Mon, 6 Nov 2023 16:18:44 -0600 Subject: [PATCH 3/4] Add fix for PYTHON-1364 to yet another spot where it's needed --- tests/__init__.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index 90e4a14bec..4735bbd383 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -34,9 +34,12 @@ def is_eventlet_monkey_patched(): if 'eventlet.patcher' not in sys.modules: return False - import eventlet.patcher - return eventlet.patcher.is_monkey_patched('socket') - + try: + import eventlet.patcher + return eventlet.patcher.is_monkey_patched('socket') + # Yet another case related to PYTHON-1364 + except AttributeError: + return False def is_gevent_monkey_patched(): if 'gevent.monkey' not in sys.modules: From 14511a26c590e2ff78216ff4b209f7b112dbd919 Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Wed, 8 Nov 2023 10:46:39 -0600 Subject: [PATCH 4/4] Add more robust check for installed asyncore in order to skip tests that rely on it when it isn't present --- tests/integration/standard/test_connection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index 761a5505f9..463080fc32 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -443,6 +443,8 @@ class AsyncoreConnectionTests(ConnectionTests, unittest.TestCase): def setUp(self): if is_monkey_patched(): raise unittest.SkipTest("Can't test asyncore with monkey patching") + if AsyncoreConnection is None: + raise unittest.SkipTest('Unable to import asyncore module') ConnectionTests.setUp(self) def clean_global_loop(self):