Skip to content

Commit

Permalink
Worker run method (#118)
Browse files Browse the repository at this point in the history
* Add worker run command

* Test that the run function accepts and uses the wait parameter

* Add docs

* Move redundant patches to fixtures

* Rename function to run_forever.
Add a wait_forever function

* Update basics guide
  • Loading branch information
andrewgy8 authored Nov 22, 2019
1 parent 2b4c2c2 commit 7bf0d96
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 25 deletions.
4 changes: 1 addition & 3 deletions docs/guides/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ and will begin to pull the messages from the topic.
config.credentials,
config.ack_deadline,
)
worker.setup()
worker.start()
sleep(120)
worker.run_forever()
Once the sub and worker are created, we can start our worker by running ``python worker.py``.
Expand Down
10 changes: 1 addition & 9 deletions rele/management/commands/runrele.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import signal
import time

from django.conf import settings
from django.core.management import BaseCommand
Expand Down Expand Up @@ -37,23 +36,16 @@ def handle(self, *args, **options):
self.config.credentials,
self.config.ack_deadline,
)
worker.setup()
worker.start()

signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, worker.stop)
signal.signal(signal.SIGTSTP, worker.stop)

self._wait_forever()
worker.run_forever(sleep_interval=None)

def _autodiscover_subs(self):
return rele.config.load_subscriptions_from_paths(
discover_subs_modules(),
self.config.sub_prefix,
settings.RELE.get("FILTER_SUBS_BY"),
)

def _wait_forever(self):
self.stdout.write("Consuming subscriptions...")
while True:
time.sleep(1)
17 changes: 16 additions & 1 deletion rele/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import sys
import time

from .client import Subscriber
from .middleware import run_middleware_hook
Expand Down Expand Up @@ -34,7 +35,7 @@ def setup(self):
def start(self):
"""Begin consuming all subscriptions.
When consuming a subscription, a `StreamingPullFuture` is returned from
When consuming a subscription, a ``StreamingPullFuture`` is returned from
the Google PubSub client library. This future can be used to
manage the background stream.
Expand All @@ -50,6 +51,15 @@ def start(self):
)
run_middleware_hook("post_worker_start")

def run_forever(self, sleep_interval=1):
"""Shortcut for calling setup, start, and _wait_forever.
:param sleep_interval: Number of seconds to sleep in the ``while True`` loop
"""
self.setup()
self.start()
self._wait_forever(sleep_interval=sleep_interval)

def stop(self, signal=None, frame=None):
"""Manage the shutdown process of the worker.
Expand All @@ -73,3 +83,8 @@ def stop(self, signal=None, frame=None):

run_middleware_hook("post_worker_stop")
sys.exit(0)

def _wait_forever(self, sleep_interval):
logger.info("Consuming subscriptions...")
while True:
time.sleep(sleep_interval)
12 changes: 5 additions & 7 deletions tests/commands/test_runrele.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import pytest
from django.core.management import call_command

from rele.management.commands.runrele import Command
from rele import Worker


class TestRunReleCommand:
@pytest.fixture(autouse=True)
def wait_forever(self):
with patch.object(Command, "_wait_forever", return_value=None) as p:
def worker_wait_forever(self):
with patch.object(Worker, "_wait_forever", return_value=None) as p:
yield p

@pytest.fixture
Expand All @@ -21,8 +21,7 @@ def test_calls_worker_start_and_setup_when_runrele(self, mock_worker):
call_command("runrele")

mock_worker.assert_called_with([], "SOME-PROJECT-ID", ANY, 60)
mock_worker.return_value.setup.assert_called()
mock_worker.return_value.start.assert_called()
mock_worker.return_value.run_forever.assert_called()

def test_prints_warning_when_conn_max_age_not_set_to_zero(
self, mock_worker, capsys, settings
Expand All @@ -37,5 +36,4 @@ def test_prints_warning_when_conn_max_age_not_set_to_zero(
"be exhausted." in err
)
mock_worker.assert_called_with([], "SOME-PROJECT-ID", ANY, 60)
mock_worker.return_value.setup.assert_called()
mock_worker.return_value.start.assert_called()
mock_worker.return_value.run_forever.assert_called()
42 changes: 37 additions & 5 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from unittest.mock import ANY, patch

import pytest
Expand All @@ -18,8 +17,19 @@ def worker(project_id, credentials):
return Worker(subscriptions, project_id, credentials, 60)


@pytest.fixture
def mock_consume():
with patch.object(Subscriber, "consume") as m:
yield m


@pytest.fixture
def mock_create_subscription():
with patch.object(Subscriber, "create_subscription") as m:
yield m


class TestWorker:
@patch.object(Subscriber, "consume")
def test_start_subscribes_and_saves_futures_when_subscriptions_given(
self, mock_consume, worker
):
Expand All @@ -29,7 +39,6 @@ def test_start_subscribes_and_saves_futures_when_subscriptions_given(
subscription_name="rele-some-cool-topic", callback=ANY
)

@patch.object(Subscriber, "create_subscription")
def test_setup_creates_subscription_when_topic_given(
self, mock_create_subscription, worker
):
Expand All @@ -39,6 +48,29 @@ def test_setup_creates_subscription_when_topic_given(
subscription = "rele-some-cool-topic"
mock_create_subscription.assert_called_once_with(subscription, topic)

@patch.object(Worker, "_wait_forever")
def test_run_sets_up_and_creates_subscriptions_when_called(
self, mock_wait_forever, mock_consume, mock_create_subscription, worker
):
worker.run_forever()

topic = "some-cool-topic"
subscription = "rele-some-cool-topic"
mock_create_subscription.assert_called_once_with(subscription, topic)
mock_consume.assert_called_once_with(
subscription_name="rele-some-cool-topic", callback=ANY
)
mock_wait_forever.assert_called_once()

@patch.object(Worker, "_wait_forever")
@pytest.mark.usefixtures("mock_consume", "mock_create_subscription")
def test_wait_forevers_for_custom_time_period_when_called_with_argument(
self, mock_wait_forever, worker
):
worker.run_forever(sleep_interval=127)

mock_wait_forever.assert_called_once()

@patch("rele.contrib.django_db_middleware.db.connections.close_all")
def test_stop_closes_db_connections(self, mock_db_close_all, config, worker):
config.middleware = ["rele.contrib.DjangoDBMiddleware"]
Expand All @@ -49,9 +81,9 @@ def test_stop_closes_db_connections(self, mock_db_close_all, config, worker):

mock_db_close_all.assert_called_once()

@patch.object(Subscriber, "create_subscription")
@pytest.mark.usefixtures("mock_create_subscription")
def test_creates_subscription_with_custom_ack_deadline_from_environment(
self, mock_create_subscription, project_id, credentials
self, project_id, credentials
):
subscriptions = (sub_stub,)
custom_ack_deadline = 234
Expand Down

0 comments on commit 7bf0d96

Please sign in to comment.