diff --git a/docs/api/recipe/watchers.rst b/docs/api/recipe/watchers.rst index fe93a5a3..7731f626 100644 --- a/docs/api/recipe/watchers.rst +++ b/docs/api/recipe/watchers.rst @@ -15,6 +15,12 @@ Public API .. automethod:: __call__ + .. autoclass:: ExistingDataWatch + :members: + + .. automethod:: __init__ + + .. automethod:: __call__ .. autoclass:: ChildrenWatch :members: diff --git a/kazoo/client.py b/kazoo/client.py index 25baa683..928a970c 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -63,7 +63,7 @@ from kazoo.recipe.partitioner import SetPartitioner from kazoo.recipe.party import Party, ShallowParty from kazoo.recipe.queue import Queue, LockingQueue -from kazoo.recipe.watchers import ChildrenWatch, DataWatch +from kazoo.recipe.watchers import ChildrenWatch, DataWatch, ExistingDataWatch string_types = six.string_types @@ -352,6 +352,7 @@ def _retry(*args, **kwargs): self.DoubleBarrier = partial(DoubleBarrier, self) self.ChildrenWatch = partial(ChildrenWatch, self) self.DataWatch = partial(DataWatch, self) + self.ExistingDataWatch = partial(ExistingDataWatch, self) self.Election = partial(Election, self) self.NonBlockingLease = partial(NonBlockingLease, self) self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self) diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py index 96ec4fe6..2c7bb23d 100644 --- a/kazoo/recipe/watchers.py +++ b/kazoo/recipe/watchers.py @@ -217,6 +217,67 @@ def _session_watcher(self, state): self._client.handler.spawn(self._get_data) +class ExistingDataWatch(DataWatch): + """Watches a node for data updates and calls the specified + function each time it changes + + Similar to :class:`~kazoo.recipes.watchers.DataWatch`, but it does + not operate on nodes which do not exist. + + The function will also be called the very first time its + registered to get the data. + + Returning `False` from the registered function will disable future + data change calls. If the client connection is closed (using the + close command), the DataWatch will no longer get updates. + + If the function supplied takes three arguments, then the third one + will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will + only be set if the change to the data occurs as a result of the + server notifying the watch that there has been a change. Events + like reconnection or the first call will not include an event. + + If the node does not exist on creation then the function will be + called with ``None`` for all values and no futher callbacks will + occur. If the node is deleted after the watch is created, the + function will be called with the event argument indicating a + delete event and no further callbacks will occur. + """ + + @_ignore_closed + def _get_data(self, event=None): + # Ensure this runs one at a time, possible because the session + # watcher may trigger a run + with self._run_lock: + if self._stopped: + return + + initial_version = self._version + + try: + data, stat = self._retry(self._client.get, + self._path, self._watcher) + except NoNodeError: + data = stat = None + + # No node data, clear out version + if stat is None: + self._version = None + else: + self._version = stat.mzxid + + # Call our function if its the first time ever, or if the + # version has changed + if initial_version != self._version or not self._ever_called: + self._log_func_exception(data, stat, event) + + # If the node doesn't exist, we won't be watching any more + if stat is None: + self._stopped = True + self._func = None + self._client.remove_listener(self._session_watcher) + + class ChildrenWatch(object): """Watches a node for children updates and calls the specified function each time it changes diff --git a/kazoo/tests/test_watchers.py b/kazoo/tests/test_watchers.py index 69d8fce5..5c0249a0 100644 --- a/kazoo/tests/test_watchers.py +++ b/kazoo/tests/test_watchers.py @@ -279,6 +279,82 @@ def changed(val, stat): assert b is False +class KazooExistingDataWatcherTests(KazooTestCase): + def setUp(self): + super(KazooExistingDataWatcherTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + self.client.ensure_path(self.path) + + def test_data_watcher_non_existent_path(self): + update = threading.Event() + data = [True] + + # Make it a non-existent path + self.path += 'f' + + @self.client.ExistingDataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + assert data == [None] + update.clear() + + # We should not get an update + self.client.create(self.path, b'fred') + update.wait(0.2) + assert data == [None] + update.clear() + + def test_data_watcher_existing_path(self): + update = threading.Event() + data = [True] + + # Make it an existing path + self.path += 'f' + self.client.create(self.path, b'fred') + + @self.client.ExistingDataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + assert data[0] == b'fred' + update.clear() + + def test_data_watcher_delete(self): + update = threading.Event() + data = [True] + + # Make it an existing path + self.path += 'f' + self.client.create(self.path, b'fred') + + @self.client.ExistingDataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + assert data[0] == b'fred' + update.clear() + + self.client.delete(self.path) + update.wait(10) + assert data == [None] + update.clear() + + self.client.create(self.path, b'ginger') + update.wait(0.2) + assert data == [None] + update.clear() + + class KazooChildrenWatcherTests(KazooTestCase): def setUp(self): super(KazooChildrenWatcherTests, self).setUp()