Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack messages when data not json serializable #141

Merged
merged 1 commit into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions rele/subscription.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import logging
import time
from concurrent import futures

from .middleware import run_middleware_hook

Expand Down Expand Up @@ -64,7 +63,19 @@ def __call__(self, message):
run_middleware_hook("pre_process_message", self._subscription, message)
start_time = time.time()

data = json.loads(message.data.decode("utf-8"))
try:
data = json.loads(message.data.decode("utf-8"))
except json.JSONDecodeError as e:
message.ack()
run_middleware_hook(
"post_process_message_failure",
self._subscription,
e,
start_time,
message,
)
run_middleware_hook("post_process_message")
return

try:
res = self._subscription(data, **dict(message.attributes))
Expand Down
52 changes: 50 additions & 2 deletions tests/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,33 @@ def message_wrapper(self, published_at):
message_id="1",
)

return pubsub_v1.subscriber.message.Message(rele_message, "ack-id", MagicMock())
message = pubsub_v1.subscriber.message.Message(
rele_message, "ack-id", MagicMock()
)
message.ack = MagicMock(autospec=True)
return message

@pytest.fixture
def message_wrapper_empty(self):
rele_message = pubsub_v1.types.PubsubMessage(
data=b"", attributes={"lang": "es"}, message_id="1"
)
return pubsub_v1.subscriber.message.Message(rele_message, "ack-id", MagicMock())
message = pubsub_v1.subscriber.message.Message(
rele_message, "ack-id", MagicMock()
)
message.ack = MagicMock(autospec=True)
return message

@pytest.fixture
def message_wrapper_invalid_json(self):
rele_message = pubsub_v1.types.PubsubMessage(
data=b"foobar", attributes={}, message_id="1"
)
message = pubsub_v1.subscriber.message.Message(
rele_message, "ack-id", MagicMock()
)
message.ack = MagicMock(autospec=True)
return message

def test_log_start_processing_when_callback_called(
self, caplog, message_wrapper, published_at
Expand Down Expand Up @@ -129,6 +148,7 @@ def test_acks_message_when_execution_successful(self, caplog, message_wrapper):
res = callback(message_wrapper)

assert res == 123
message_wrapper.ack.assert_called_once()
assert len(caplog.records) == 3
message_wrapper_log = caplog.records[1]
assert message_wrapper_log.message == (
Expand Down Expand Up @@ -168,6 +188,7 @@ def crashy_sub_stub(data, **kwargs):
res = callback(message_wrapper)

assert res is None
message_wrapper.ack.assert_not_called()
failed_log = caplog.records[-1]
assert failed_log.message == (
"Exception raised while processing "
Expand All @@ -187,6 +208,33 @@ def crashy_sub_stub(data, **kwargs):
}
assert failed_log.subscription_message == message_wrapper

def test_log_acks_called_message_when_not_json_serializable(
self, caplog, message_wrapper_invalid_json, published_at
):
callback = Callback(sub_stub)
res = callback(message_wrapper_invalid_json)

assert res is None
message_wrapper_invalid_json.ack.assert_called_once()
failed_log = caplog.records[-1]
assert failed_log.message == (
"Exception raised while processing "
"message for rele-some-cool-topic - "
"sub_stub: JSONDecodeError"
)
assert failed_log.metrics == {
"name": "subscriptions",
"data": {
"agent": "rele",
"topic": "some-cool-topic",
"status": "failed",
"subscription": "rele-some-cool-topic",
"duration_seconds": pytest.approx(0.5, abs=0.5),
"attributes": {},
},
}
assert failed_log.subscription_message == message_wrapper_invalid_json

def test_published_time_as_message_attribute(self, message_wrapper, caplog):
callback = Callback(sub_published_time_type)
callback(message_wrapper)
Expand Down