From 1b0fd821e0c7bc9494135718009dfc6fdf4ae610 Mon Sep 17 00:00:00 2001 From: Jordi Chulia Date: Wed, 13 Apr 2022 09:37:00 +0200 Subject: [PATCH] [Added] Added a Logging Middleware that does not truncate mesage payload. (#218) * Add verbose logging middleware * Add VerboseLoggingMiddleware to contrib/__init__.py * Fix linting --- rele/contrib/__init__.py | 1 + rele/contrib/verbose_logging_middleware.py | 49 +++++++ tests/conftest.py | 8 ++ .../test_verbose_logging_middleware.py | 127 ++++++++++++++++++ 4 files changed, 185 insertions(+) create mode 100644 rele/contrib/verbose_logging_middleware.py create mode 100644 tests/contrib/test_verbose_logging_middleware.py diff --git a/rele/contrib/__init__.py b/rele/contrib/__init__.py index 8bea450..b47f00f 100644 --- a/rele/contrib/__init__.py +++ b/rele/contrib/__init__.py @@ -1,6 +1,7 @@ from .flask_middleware import FlaskMiddleware # noqa from .logging_middleware import LoggingMiddleware # noqa from .unrecoverable_middleware import UnrecoverableMiddleWare # noqa +from .verbose_logging_middleware import VerboseLoggingMiddleware # noqa try: from .django_db_middleware import DjangoDBMiddleware # noqa diff --git a/rele/contrib/verbose_logging_middleware.py b/rele/contrib/verbose_logging_middleware.py new file mode 100644 index 0000000..f3ee01b --- /dev/null +++ b/rele/contrib/verbose_logging_middleware.py @@ -0,0 +1,49 @@ +import json + +from rele.contrib.logging_middleware import LoggingMiddleware + + +class VerboseLoggingMiddleware(LoggingMiddleware): + def post_process_message_failure( + self, subscription, exception, start_time, message + ): + super().post_process_message_failure( + subscription, exception, start_time, _VerboseMessage(message) + ) + + def post_publish_failure(self, topic, exception, message): + super().post_publish_failure(topic, exception, _VerboseMessage(message)) + + +class _VerboseMessage: + def __init__(self, message): + self._message = message + self.attributes = message.attributes + + def __repr__(self): + _MESSAGE_REPR = """\ +Message {{ + data: {!r} + ordering_key: {!r} + attributes: {} +}}""" + + data = self._message._message.data + attrs = self._message_attrs_repr() + ordering_key = str(self._message.ordering_key) + + return _MESSAGE_REPR.format(data, ordering_key, attrs) + + def _message_attrs_repr(self): + message_attrs = json.dumps( + dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True + ) + + indented = [] + for line in message_attrs.split("\n"): + indented.append(" " + line) + + message_attrs = "\n".join(indented) + message_attrs = message_attrs.lstrip() + + return message_attrs diff --git a/tests/conftest.py b/tests/conftest.py index 120493d..62e9ca3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,7 @@ import pytest from google.cloud.pubsub_v1 import PublisherClient from google.cloud.pubsub_v1.exceptions import TimeoutError +from google.protobuf import timestamp_pb2 from rele import Publisher from rele.client import Subscriber @@ -94,3 +95,10 @@ def mock_post_publish_failure(): "rele.contrib.logging_middleware.LoggingMiddleware.post_publish_failure" ) as mock: yield mock + + +@pytest.fixture +def publish_time(): + timestamp = timestamp_pb2.Timestamp() + timestamp.GetCurrentTime() + return timestamp diff --git a/tests/contrib/test_verbose_logging_middleware.py b/tests/contrib/test_verbose_logging_middleware.py new file mode 100644 index 0000000..51fda16 --- /dev/null +++ b/tests/contrib/test_verbose_logging_middleware.py @@ -0,0 +1,127 @@ +import queue +from unittest.mock import MagicMock + +import pytest +from google.cloud import pubsub_v1 +from tests.subs import sub_stub + +from rele.contrib.logging_middleware import LoggingMiddleware +from rele.contrib.verbose_logging_middleware import VerboseLoggingMiddleware + + +@pytest.fixture +def long_message_wrapper(published_at, publish_time): + long_string = "A" * 100 + rele_message = pubsub_v1.types.PubsubMessage( + data=long_string.encode("utf-8"), + attributes={"lang": "es", "published_at": str(published_at)}, + message_id="1", + publish_time=publish_time, + ) + + message = pubsub_v1.subscriber.message.Message( + rele_message._pb, + "ack-id", + delivery_attempt=1, + request_queue=queue.Queue(), + ) + message.ack = MagicMock(autospec=True) + return message + + +@pytest.fixture +def message_wrapper(published_at, publish_time): + rele_message = pubsub_v1.types.PubsubMessage( + data="ABCDE".encode("utf-8"), + attributes={"lang": "es", "published_at": str(published_at)}, + message_id="1", + publish_time=publish_time, + ) + + message = pubsub_v1.subscriber.message.Message( + rele_message._pb, + "ack-id", + delivery_attempt=1, + request_queue=queue.Queue(), + ) + message.ack = MagicMock(autospec=True) + return message + + +@pytest.fixture +def expected_message_log(): + return 'Message {\n data: b\'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\'\n ordering_key: \'\'\n attributes: {\n "lang": "es",\n "published_at": "1560244246.863829"\n }\n}' + + +class TestVerboseLoggingMiddleware: + @pytest.fixture + def verbose_logging_middleware(self, config): + verbose_logging_middleware = VerboseLoggingMiddleware() + verbose_logging_middleware.setup(config) + return verbose_logging_middleware + + @pytest.fixture + def logging_middleware(self, config): + logging_middleware = LoggingMiddleware() + logging_middleware.setup(config) + return logging_middleware + + def test_message_payload_log_is_not_truncated_on_post_process_failure( + self, + verbose_logging_middleware, + caplog, + long_message_wrapper, + expected_message_log, + ): + verbose_logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, long_message_wrapper + ) + + message_log = caplog.records[0].subscription_message.__repr__() + + assert message_log == expected_message_log + + def test_post_process_failure_message_payload_format_matches_logging_middleware_format( + self, verbose_logging_middleware, logging_middleware, caplog, message_wrapper + ): + verbose_logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, message_wrapper + ) + logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, message_wrapper + ) + + verbose_message_log = caplog.records[0].subscription_message.__repr__() + message_log = caplog.records[1].subscription_message.__repr__() + + assert verbose_message_log == message_log + + def test_message_payload_log_is_not_truncated_on_post_publish_failure( + self, + verbose_logging_middleware, + caplog, + long_message_wrapper, + expected_message_log, + ): + verbose_logging_middleware.post_publish_failure( + sub_stub, RuntimeError("💩"), long_message_wrapper + ) + + message_log = caplog.records[0].subscription_message.__repr__() + + assert message_log == expected_message_log + + def test_post_publish_failure_message_payload_format_matches_logging_middleware_format( + self, verbose_logging_middleware, logging_middleware, caplog, message_wrapper + ): + verbose_logging_middleware.post_publish_failure( + sub_stub, RuntimeError("💩"), message_wrapper + ) + logging_middleware.post_publish_failure( + sub_stub, RuntimeError("💩"), message_wrapper + ) + + verbose_message_log = caplog.records[0].subscription_message.__repr__() + message_log = caplog.records[1].subscription_message.__repr__() + + assert verbose_message_log == message_log