Skip to content

Commit

Permalink
CLI (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgy8 authored May 25, 2020
1 parent e0a7b3d commit 8bec6c5
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 31 deletions.
2 changes: 1 addition & 1 deletion docs/guides/unrecoverable_middleware.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. _unrecoverable_middleware:

Unrecoverable Middleware
===========
========================

To acknowledge and ignore incompatible messages that your subscription is unable to handle, you can use the `UnrecoverableMiddleware`.

Expand Down
39 changes: 39 additions & 0 deletions rele/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import argparse
import logging
import os
import sys

from rele import config, discover, subscription

from rele.worker import create_and_run

logger = logging.getLogger(__name__)


def main():
# modify path so we can import modules and packages
sys.path.insert(0, os.getcwd())

parser = argparse.ArgumentParser(
prog="Relé", description="Harness the power of Relé from the command line"
)

subparsers = parser.add_subparsers(help="Select a command", dest="command")

run_parser = subparsers.add_parser(
"run",
help="Run a Relé worker with auto-discovery of subs modules in the "
"current path. Auto-discovery will include all subs "
"and settings modules. If no settings module is discovered, "
"defaults will be used.",
)

args = parser.parse_args()

if args.command == "run":
settings, module_paths = discover.sub_modules()
configuration = config.setup(settings.RELE if settings else None)
subs = config.load_subscriptions_from_paths(
module_paths, configuration.sub_prefix, configuration.filter_by
)
create_and_run(subs, configuration)
1 change: 1 addition & 0 deletions rele/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, setting):
self._encoder_path = setting.get("ENCODER_PATH", DEFAULT_ENCODER_PATH)
self.publisher_timeout = setting.get("PUBLISHER_TIMEOUT", 3.0)
self.threads_per_subscription = setting.get("THREADS_PER_SUBSCRIPTION", 2)
self.filter_by = setting.get("FILTER_SUBS_BY")

@property
def encoder(self):
Expand Down
46 changes: 46 additions & 0 deletions rele/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging
import pkgutil
from importlib.util import find_spec as importlib_find

logger = logging.getLogger(__name__)


def module_has_submodule(package, module_name):
"""
See if 'module' is in 'package'.
Taken from https://github.com/django/django/blob/master/django/utils/module_loading.py#L63
"""
package = __import__(package)
package_name = package.__name__
package_path = package.__path__
full_module_name = package_name + "." + module_name

try:
return importlib_find(full_module_name, package_path) is not None
except (ModuleNotFoundError, AttributeError):
# When module_name is an invalid dotted path, Python raises
# ModuleNotFoundError.
return False


def sub_modules():
"""
In the current PYTHONPATH, we can traverse all modules and determine if they
have a settings.py or directory with a subs.py module. If either one of
those exists, we import it, and return the settings module, and
paths to the subs file.
If a settings module is not found, we return None.
:return: (settings module, List[string: subs module paths])
"""
settings = None
module_paths = []
for f, package, is_package in pkgutil.iter_modules(path=["."]):
if package == "settings":
settings = __import__(package)
if is_package and module_has_submodule(package, "subs"):
module = package + ".subs"
module_paths.append(module)
logger.debug(" * Discovered subs module: %r" % module)
return settings, module_paths
36 changes: 7 additions & 29 deletions rele/management/commands/runrele.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import logging
import signal

from django.conf import settings
from django.core.management import BaseCommand

import rele
from rele import Worker
from rele.config import Config

from rele.worker import create_and_run
from rele import config
from rele.management.discover import discover_subs_modules

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Start subscriber threads to consume messages from Relé topics."
config = Config(settings.RELE)
config = config.Config(settings.RELE)

def handle(self, *args, **options):
if all(map(lambda x: x.get("CONN_MAX_AGE"), settings.DATABASES.values())):
Expand All @@ -26,27 +23,8 @@ def handle(self, *args, **options):
"be exhausted."
)
)
subs = self._autodiscover_subs()
self.stdout.write(f"Configuring worker with {len(subs)} " f"subscription(s)...")
for sub in subs:
self.stdout.write(f" {sub}")
worker = Worker(
subs,
self.config.gc_project_id,
self.config.credentials,
self.config.ack_deadline,
self.config.threads_per_subscription,
)

signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, worker.stop)
signal.signal(signal.SIGTSTP, worker.stop)

worker.run_forever()

def _autodiscover_subs(self):
return rele.config.load_subscriptions_from_paths(
discover_subs_modules(),
self.config.sub_prefix,
settings.RELE.get("FILTER_SUBS_BY"),
subs = config.load_subscriptions_from_paths(
discover_subs_modules(), self.config.sub_prefix, self.config.filter_by
)
self.stdout.write(f"Configuring worker with {len(subs)} " f"subscription(s)...")
create_and_run(subs, self.config)
1 change: 1 addition & 0 deletions rele/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import time

import rele
from .middleware import run_middleware_hook

logger = logging.getLogger(__name__)
Expand Down
30 changes: 30 additions & 0 deletions rele/worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import signal
import sys
import time
from concurrent import futures
Expand Down Expand Up @@ -108,3 +109,32 @@ def _wait_forever(self, sleep_interval):
logger.info("Consuming subscriptions...")
while True:
time.sleep(sleep_interval)


def create_and_run(subs, config):
"""
Create and run a worker from a list of Subscription objects and a config
while waiting forever, until the process is stopped.
We stop a worker process on:
- SIGINT
- SIGTSTP
:param subs: List :class:`~rele.subscription.Subscription`
:param config: :class:`~rele.config.Config`
"""
print(f"Configuring worker with {len(subs)} subscription(s)...")
for sub in subs:
print(f" {sub}")
worker = Worker(
subs,
config.gc_project_id,
config.credentials,
config.ack_deadline,
config.threads_per_subscription,
)

signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, worker.stop)
signal.signal(signal.SIGTSTP, worker.stop)
worker.run_forever()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,5 @@ def get_version(*file_paths):
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
],
entry_points={"console_scripts": ["rele-cli=rele.__main__:main"]},
)
2 changes: 1 addition & 1 deletion tests/commands/test_runrele.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def worker_wait_forever(self):

@pytest.fixture
def mock_worker(self):
with patch("rele.management.commands.runrele.Worker", autospec=True) as p:
with patch("rele.worker.Worker", autospec=True) as p:
yield p

def test_calls_worker_start_and_setup_when_runrele(self, mock_worker):
Expand Down
22 changes: 22 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from rele import Subscriber, Worker, sub
from rele.middleware import register_middleware
from rele.worker import create_and_run


@sub(topic="some-cool-topic", prefix="rele")
Expand Down Expand Up @@ -130,3 +131,24 @@ def test_creates_without_config(self):

assert worker._subscriber._ack_deadline == 60
assert worker._subscriber._gc_project_id == "rele"


class TestCreateAndRun:
@pytest.fixture(autouse=True)
def worker_wait_forever(self):
with patch.object(Worker, "_wait_forever", return_value=None) as p:
yield p

@pytest.fixture
def mock_worker(self):
with patch("rele.worker.Worker", autospec=True) as p:
yield p

def test_waits_forever_when_called_with_config_and_subs(self, config, mock_worker):
subscriptions = (sub_stub,)
create_and_run(subscriptions, config)

mock_worker.assert_called_with(
subscriptions, "test-project-id", "my-credentials", 60, 2
)
mock_worker.return_value.run_forever.assert_called_once_with()

0 comments on commit 8bec6c5

Please sign in to comment.