-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Added] Added a Logging Middleware that does not truncate mesage payl…
…oad. (#218) * Add verbose logging middleware * Add VerboseLoggingMiddleware to contrib/__init__.py * Fix linting
- Loading branch information
Showing
4 changed files
with
185 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import json | ||
|
||
from rele.contrib.logging_middleware import LoggingMiddleware | ||
|
||
|
||
class VerboseLoggingMiddleware(LoggingMiddleware): | ||
def post_process_message_failure( | ||
self, subscription, exception, start_time, message | ||
): | ||
super().post_process_message_failure( | ||
subscription, exception, start_time, _VerboseMessage(message) | ||
) | ||
|
||
def post_publish_failure(self, topic, exception, message): | ||
super().post_publish_failure(topic, exception, _VerboseMessage(message)) | ||
|
||
|
||
class _VerboseMessage: | ||
def __init__(self, message): | ||
self._message = message | ||
self.attributes = message.attributes | ||
|
||
def __repr__(self): | ||
_MESSAGE_REPR = """\ | ||
Message {{ | ||
data: {!r} | ||
ordering_key: {!r} | ||
attributes: {} | ||
}}""" | ||
|
||
data = self._message._message.data | ||
attrs = self._message_attrs_repr() | ||
ordering_key = str(self._message.ordering_key) | ||
|
||
return _MESSAGE_REPR.format(data, ordering_key, attrs) | ||
|
||
def _message_attrs_repr(self): | ||
message_attrs = json.dumps( | ||
dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True | ||
) | ||
|
||
indented = [] | ||
for line in message_attrs.split("\n"): | ||
indented.append(" " + line) | ||
|
||
message_attrs = "\n".join(indented) | ||
message_attrs = message_attrs.lstrip() | ||
|
||
return message_attrs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
import queue | ||
from unittest.mock import MagicMock | ||
|
||
import pytest | ||
from google.cloud import pubsub_v1 | ||
from tests.subs import sub_stub | ||
|
||
from rele.contrib.logging_middleware import LoggingMiddleware | ||
from rele.contrib.verbose_logging_middleware import VerboseLoggingMiddleware | ||
|
||
|
||
@pytest.fixture | ||
def long_message_wrapper(published_at, publish_time): | ||
long_string = "A" * 100 | ||
rele_message = pubsub_v1.types.PubsubMessage( | ||
data=long_string.encode("utf-8"), | ||
attributes={"lang": "es", "published_at": str(published_at)}, | ||
message_id="1", | ||
publish_time=publish_time, | ||
) | ||
|
||
message = pubsub_v1.subscriber.message.Message( | ||
rele_message._pb, | ||
"ack-id", | ||
delivery_attempt=1, | ||
request_queue=queue.Queue(), | ||
) | ||
message.ack = MagicMock(autospec=True) | ||
return message | ||
|
||
|
||
@pytest.fixture | ||
def message_wrapper(published_at, publish_time): | ||
rele_message = pubsub_v1.types.PubsubMessage( | ||
data="ABCDE".encode("utf-8"), | ||
attributes={"lang": "es", "published_at": str(published_at)}, | ||
message_id="1", | ||
publish_time=publish_time, | ||
) | ||
|
||
message = pubsub_v1.subscriber.message.Message( | ||
rele_message._pb, | ||
"ack-id", | ||
delivery_attempt=1, | ||
request_queue=queue.Queue(), | ||
) | ||
message.ack = MagicMock(autospec=True) | ||
return message | ||
|
||
|
||
@pytest.fixture | ||
def expected_message_log(): | ||
return 'Message {\n data: b\'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\'\n ordering_key: \'\'\n attributes: {\n "lang": "es",\n "published_at": "1560244246.863829"\n }\n}' | ||
|
||
|
||
class TestVerboseLoggingMiddleware: | ||
@pytest.fixture | ||
def verbose_logging_middleware(self, config): | ||
verbose_logging_middleware = VerboseLoggingMiddleware() | ||
verbose_logging_middleware.setup(config) | ||
return verbose_logging_middleware | ||
|
||
@pytest.fixture | ||
def logging_middleware(self, config): | ||
logging_middleware = LoggingMiddleware() | ||
logging_middleware.setup(config) | ||
return logging_middleware | ||
|
||
def test_message_payload_log_is_not_truncated_on_post_process_failure( | ||
self, | ||
verbose_logging_middleware, | ||
caplog, | ||
long_message_wrapper, | ||
expected_message_log, | ||
): | ||
verbose_logging_middleware.post_process_message_failure( | ||
sub_stub, RuntimeError("💩"), 1, long_message_wrapper | ||
) | ||
|
||
message_log = caplog.records[0].subscription_message.__repr__() | ||
|
||
assert message_log == expected_message_log | ||
|
||
def test_post_process_failure_message_payload_format_matches_logging_middleware_format( | ||
self, verbose_logging_middleware, logging_middleware, caplog, message_wrapper | ||
): | ||
verbose_logging_middleware.post_process_message_failure( | ||
sub_stub, RuntimeError("💩"), 1, message_wrapper | ||
) | ||
logging_middleware.post_process_message_failure( | ||
sub_stub, RuntimeError("💩"), 1, message_wrapper | ||
) | ||
|
||
verbose_message_log = caplog.records[0].subscription_message.__repr__() | ||
message_log = caplog.records[1].subscription_message.__repr__() | ||
|
||
assert verbose_message_log == message_log | ||
|
||
def test_message_payload_log_is_not_truncated_on_post_publish_failure( | ||
self, | ||
verbose_logging_middleware, | ||
caplog, | ||
long_message_wrapper, | ||
expected_message_log, | ||
): | ||
verbose_logging_middleware.post_publish_failure( | ||
sub_stub, RuntimeError("💩"), long_message_wrapper | ||
) | ||
|
||
message_log = caplog.records[0].subscription_message.__repr__() | ||
|
||
assert message_log == expected_message_log | ||
|
||
def test_post_publish_failure_message_payload_format_matches_logging_middleware_format( | ||
self, verbose_logging_middleware, logging_middleware, caplog, message_wrapper | ||
): | ||
verbose_logging_middleware.post_publish_failure( | ||
sub_stub, RuntimeError("💩"), message_wrapper | ||
) | ||
logging_middleware.post_publish_failure( | ||
sub_stub, RuntimeError("💩"), message_wrapper | ||
) | ||
|
||
verbose_message_log = caplog.records[0].subscription_message.__repr__() | ||
message_log = caplog.records[1].subscription_message.__repr__() | ||
|
||
assert verbose_message_log == message_log |