-
Notifications
You must be signed in to change notification settings - Fork 388
feat: add support for persistent recursive watches #715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
from kazoo.protocol.connection import ConnectionHandler | ||
from kazoo.protocol.paths import _prefix_root, normpath | ||
from kazoo.protocol.serialization import ( | ||
AddWatch, | ||
Auth, | ||
CheckVersion, | ||
CloseInstance, | ||
|
@@ -38,6 +39,7 @@ | |
SetACL, | ||
GetData, | ||
Reconfig, | ||
RemoveWatches, | ||
SetData, | ||
Sync, | ||
Transaction, | ||
|
@@ -48,6 +50,8 @@ | |
KazooState, | ||
KeeperState, | ||
WatchedEvent, | ||
AddWatchMode, | ||
WatcherType, | ||
) | ||
from kazoo.retry import KazooRetry | ||
from kazoo.security import ACL, OPEN_ACL_UNSAFE | ||
|
@@ -248,6 +252,8 @@ def __init__( | |
self.state_listeners = set() | ||
self._child_watchers = defaultdict(set) | ||
self._data_watchers = defaultdict(set) | ||
self._persistent_watchers = defaultdict(set) | ||
self._persistent_recursive_watchers = defaultdict(set) | ||
self._reset() | ||
self.read_only = read_only | ||
|
||
|
@@ -416,8 +422,16 @@ def _reset_watchers(self): | |
for data_watchers in self._data_watchers.values(): | ||
watchers.extend(data_watchers) | ||
|
||
for persistent_watchers in self._persistent_watchers.values(): | ||
watchers.extend(persistent_watchers) | ||
|
||
for pr_watchers in self._persistent_recursive_watchers.values(): | ||
watchers.extend(pr_watchers) | ||
|
||
self._child_watchers = defaultdict(set) | ||
self._data_watchers = defaultdict(set) | ||
self._persistent_watchers = defaultdict(set) | ||
self._persistent_recursive_watchers = defaultdict(set) | ||
|
||
ev = WatchedEvent(EventType.NONE, self._state, None) | ||
for watch in watchers: | ||
|
@@ -1644,8 +1658,111 @@ def reconfig_async(self, joining, leaving, new_members, from_config): | |
|
||
return async_result | ||
|
||
def add_watch(self, path, watch, mode): | ||
"""Add a watch. | ||
|
||
This method adds persistent watches. Unlike the data and | ||
child watches which may be set by calls to | ||
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and | ||
:meth:`KazooClient.get_children`, persistent watches are not | ||
removed after being triggered. | ||
|
||
To remove a persistent watch, use | ||
:meth:`KazooClient.remove_all_watches` with an argument of | ||
:attr:`~kazoo.protocol.states.WatcherType.ANY`. | ||
|
||
The `mode` argument determines whether or not the watch is | ||
recursive. To set a persistent watch, use | ||
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT`. To set a | ||
persistent recursive watch, use | ||
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT_RECURSIVE`. | ||
|
||
:param path: Path of node to watch. | ||
:param watch: Watch callback to set for future changes | ||
to this path. | ||
:param mode: The mode to use. | ||
:type mode: int | ||
|
||
:raises: | ||
:exc:`~kazoo.exceptions.MarshallingError` if mode is | ||
unknown. | ||
|
||
:exc:`~kazoo.exceptions.ZookeeperError` if the server | ||
returns a non-zero error code. | ||
""" | ||
return self.add_watch_async(path, watch, mode).get() | ||
|
||
def add_watch_async(self, path, watch, mode): | ||
"""Asynchronously add a watch. Takes the same arguments as | ||
:meth:`add_watch`. | ||
""" | ||
if not isinstance(path, str): | ||
raise TypeError("Invalid type for 'path' (string expected)") | ||
if not callable(watch): | ||
raise TypeError("Invalid type for 'watch' (must be a callable)") | ||
if not isinstance(mode, int): | ||
raise TypeError("Invalid type for 'mode' (int expected)") | ||
if mode not in ( | ||
AddWatchMode.PERSISTENT, | ||
AddWatchMode.PERSISTENT_RECURSIVE, | ||
): | ||
raise ValueError("Invalid value for 'mode'") | ||
|
||
async_result = self.handler.async_result() | ||
self._call( | ||
AddWatch(_prefix_root(self.chroot, path), watch, mode), | ||
async_result, | ||
) | ||
return async_result | ||
|
||
def remove_all_watches(self, path, watcher_type): | ||
"""Remove watches from a path. | ||
|
||
This removes all watches of a specified type (data, child, | ||
any) from a given path. | ||
|
||
The `watcher_type` argument specifies which type to use. It | ||
may be one of: | ||
|
||
* :attr:`~kazoo.protocol.states.WatcherType.DATA` | ||
* :attr:`~kazoo.protocol.states.WatcherType.CHILDREN` | ||
* :attr:`~kazoo.protocol.states.WatcherType.ANY` | ||
|
||
To remove persistent watches, specify a watcher type of | ||
:attr:`~kazoo.protocol.states.WatcherType.ANY`. | ||
|
||
:param path: Path of watch to remove. | ||
:param watcher_type: The type of watch to remove. | ||
:type watcher_type: int | ||
""" | ||
|
||
return self.remove_all_watches_async(path, watcher_type).get() | ||
|
||
def remove_all_watches_async(self, path, watcher_type): | ||
"""Asynchronously remove watches. Takes the same arguments as | ||
:meth:`remove_all_watches`. | ||
""" | ||
if not isinstance(path, str): | ||
raise TypeError("Invalid type for 'path' (string expected)") | ||
if not isinstance(watcher_type, int): | ||
StephenSorriaux marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise TypeError("Invalid type for 'watcher_type' (int expected)") | ||
if watcher_type not in ( | ||
WatcherType.ANY, | ||
WatcherType.CHILDREN, | ||
WatcherType.DATA, | ||
): | ||
raise ValueError("Invalid value for 'watcher_type'") | ||
|
||
async_result = self.handler.async_result() | ||
self._call( | ||
RemoveWatches(_prefix_root(self.chroot, path), watcher_type), | ||
async_result, | ||
) | ||
return async_result | ||
|
||
|
||
class TransactionRequest(object): | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this blank line extraneous? I don't normally recall a blank line between class definition and docstring... |
||
"""A Zookeeper Transaction Request | ||
|
||
A Transaction provides a builder object that can be used to | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
) | ||
from kazoo.loggingsupport import BLATHER | ||
from kazoo.protocol.serialization import ( | ||
AddWatch, | ||
Auth, | ||
Close, | ||
Connect, | ||
|
@@ -28,17 +29,20 @@ | |
GetChildren2, | ||
Ping, | ||
PingInstance, | ||
RemoveWatches, | ||
ReplyHeader, | ||
SASL, | ||
Transaction, | ||
Watch, | ||
int_struct, | ||
) | ||
from kazoo.protocol.states import ( | ||
AddWatchMode, | ||
Callback, | ||
KeeperState, | ||
WatchedEvent, | ||
EVENT_TYPE_MAP, | ||
WatcherType, | ||
) | ||
from kazoo.retry import ( | ||
ForceRetryError, | ||
|
@@ -363,6 +367,18 @@ def _write(self, msg, timeout): | |
raise ConnectionDropped("socket connection broken") | ||
sent += bytes_sent | ||
|
||
def _find_persistent_recursive_watchers(self, path): | ||
parts = path.split("/") | ||
watchers = [] | ||
for count in range(len(parts)): | ||
candidate = "/".join(parts[: count + 1]) | ||
if not candidate: | ||
continue | ||
watchers.extend( | ||
self.client._persistent_recursive_watchers.get(candidate, []) | ||
) | ||
return watchers | ||
|
||
def _read_watch_event(self, buffer, offset): | ||
client = self.client | ||
watch, offset = Watch.deserialize(buffer, offset) | ||
|
@@ -374,9 +390,13 @@ def _read_watch_event(self, buffer, offset): | |
|
||
if watch.type in (CREATED_EVENT, CHANGED_EVENT): | ||
watchers.extend(client._data_watchers.pop(path, [])) | ||
watchers.extend(client._persistent_watchers.get(path, [])) | ||
watchers.extend(self._find_persistent_recursive_watchers(path)) | ||
elif watch.type == DELETED_EVENT: | ||
watchers.extend(client._data_watchers.pop(path, [])) | ||
watchers.extend(client._child_watchers.pop(path, [])) | ||
watchers.extend(client._persistent_watchers.get(path, [])) | ||
watchers.extend(self._find_persistent_recursive_watchers(path)) | ||
elif watch.type == CHILD_EVENT: | ||
watchers.extend(client._child_watchers.pop(path, [])) | ||
else: | ||
|
@@ -448,13 +468,35 @@ def _read_response(self, header, buffer, offset): | |
|
||
async_object.set(response) | ||
|
||
# Determine if watchers should be registered | ||
watcher = getattr(request, "watcher", None) | ||
if not client._stopped.is_set() and watcher: | ||
if isinstance(request, (GetChildren, GetChildren2)): | ||
client._child_watchers[request.path].add(watcher) | ||
else: | ||
client._data_watchers[request.path].add(watcher) | ||
# Determine if watchers should be registered or unregistered | ||
if not client._stopped.is_set(): | ||
watcher = getattr(request, "watcher", None) | ||
if watcher: | ||
if isinstance(request, AddWatch): | ||
if request.mode == AddWatchMode.PERSISTENT: | ||
client._persistent_watchers[request.path].add( | ||
watcher | ||
) | ||
elif request.mode == AddWatchMode.PERSISTENT_RECURSIVE: | ||
client._persistent_recursive_watchers[ | ||
request.path | ||
].add(watcher) | ||
Comment on lines
+476
to
+483
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this might benefit from an I realize it's a no-op, but it still took me a moment to realize that's because it might be a non-persistant watch that's being added... so might be easier for a reader to understand with a code comment... But code comments can go out of sync, so a simple assertion that it's the expected mode might be useful like: else:
if request.mode not in [expected modes]:
raise <relevant exception> that way both clarifies the code and also catches stupid errors if somehow a case was missed down the road. Alternatively, this might read even simpler as a switch statement with a default case... but I realize those require Python 3.10 so maybe leave a: # TODO: switch these if/else clauses to switch statements where appropriate once we drop Python 3.9 support |
||
elif isinstance(request, (GetChildren, GetChildren2)): | ||
client._child_watchers[request.path].add(watcher) | ||
else: | ||
client._data_watchers[request.path].add(watcher) | ||
if isinstance(request, RemoveWatches): | ||
if request.watcher_type == WatcherType.CHILDREN: | ||
client._child_watchers.pop(request.path, None) | ||
elif request.watcher_type == WatcherType.DATA: | ||
client._data_watchers.pop(request.path, None) | ||
elif request.watcher_type == WatcherType.ANY: | ||
Comment on lines
+489
to
+493
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, NVM, just realized switch statements won't work in Python 3.8/3.9 😢 |
||
client._child_watchers.pop(request.path, None) | ||
client._data_watchers.pop(request.path, None) | ||
client._persistent_watchers.pop(request.path, None) | ||
client._persistent_recursive_watchers.pop( | ||
request.path, None | ||
) | ||
|
||
if isinstance(request, Close): | ||
self.logger.log(BLATHER, "Read close response") | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -251,3 +251,44 @@ def data_length(self): | |||||
@property | ||||||
def children_count(self): | ||||||
return self.numChildren | ||||||
|
||||||
|
||||||
class AddWatchMode(object): | ||||||
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch` | ||||||
|
||||||
.. attribute:: PERSISTENT | ||||||
|
||||||
The watch is not removed when trigged. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
.. attribute:: PERSISTENT_RECURSIVE | ||||||
|
||||||
The watch is not removed when trigged, and applies to all | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
paths underneath the supplied path as well. | ||||||
""" | ||||||
|
||||||
PERSISTENT = 0 | ||||||
PERSISTENT_RECURSIVE = 1 | ||||||
|
||||||
|
||||||
class WatcherType(object): | ||||||
"""Watcher types for use with | ||||||
:meth:`~kazoo.client.KazooClient.remove_all_watches` | ||||||
|
||||||
.. attribute:: CHILDREN | ||||||
|
||||||
Child watches. | ||||||
|
||||||
.. attribute:: DATA | ||||||
|
||||||
Data watches. | ||||||
|
||||||
.. attribute:: ANY | ||||||
|
||||||
Any type of watch (child, data, persistent, or persistent | ||||||
recursive). | ||||||
|
||||||
""" | ||||||
|
||||||
CHILDREN = 1 | ||||||
DATA = 2 | ||||||
ANY = 3 |
Uh oh!
There was an error while loading. Please reload this page.