Skip to content

Commit

Permalink
Ack messages when data not json serializable (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobami authored Dec 31, 2019
1 parent 56966a3 commit 0f2e678
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
15 changes: 13 additions & 2 deletions rele/subscription.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import logging
import time
from concurrent import futures

from .middleware import run_middleware_hook

Expand Down Expand Up @@ -64,7 +63,19 @@ def __call__(self, message):
run_middleware_hook("pre_process_message", self._subscription, message)
start_time = time.time()

data = json.loads(message.data.decode("utf-8"))
try:
data = json.loads(message.data.decode("utf-8"))
except json.JSONDecodeError as e:
message.ack()
run_middleware_hook(
"post_process_message_failure",
self._subscription,
e,
start_time,
message,
)
run_middleware_hook("post_process_message")
return

try:
res = self._subscription(data, **dict(message.attributes))
Expand Down
52 changes: 50 additions & 2 deletions tests/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,33 @@ def message_wrapper(self, published_at):
message_id="1",
)

return pubsub_v1.subscriber.message.Message(rele_message, "ack-id", MagicMock())
message = pubsub_v1.subscriber.message.Message(
rele_message, "ack-id", MagicMock()
)
message.ack = MagicMock(autospec=True)
return message

@pytest.fixture
def message_wrapper_empty(self):
rele_message = pubsub_v1.types.PubsubMessage(
data=b"", attributes={"lang": "es"}, message_id="1"
)
return pubsub_v1.subscriber.message.Message(rele_message, "ack-id", MagicMock())
message = pubsub_v1.subscriber.message.Message(
rele_message, "ack-id", MagicMock()
)
message.ack = MagicMock(autospec=True)
return message

@pytest.fixture
def message_wrapper_invalid_json(self):
rele_message = pubsub_v1.types.PubsubMessage(
data=b"foobar", attributes={}, message_id="1"
)
message = pubsub_v1.subscriber.message.Message(
rele_message, "ack-id", MagicMock()
)
message.ack = MagicMock(autospec=True)
return message

def test_log_start_processing_when_callback_called(
self, caplog, message_wrapper, published_at
Expand Down Expand Up @@ -129,6 +148,7 @@ def test_acks_message_when_execution_successful(self, caplog, message_wrapper):
res = callback(message_wrapper)

assert res == 123
message_wrapper.ack.assert_called_once()
assert len(caplog.records) == 3
message_wrapper_log = caplog.records[1]
assert message_wrapper_log.message == (
Expand Down Expand Up @@ -168,6 +188,7 @@ def crashy_sub_stub(data, **kwargs):
res = callback(message_wrapper)

assert res is None
message_wrapper.ack.assert_not_called()
failed_log = caplog.records[-1]
assert failed_log.message == (
"Exception raised while processing "
Expand All @@ -187,6 +208,33 @@ def crashy_sub_stub(data, **kwargs):
}
assert failed_log.subscription_message == message_wrapper

def test_log_acks_called_message_when_not_json_serializable(
self, caplog, message_wrapper_invalid_json, published_at
):
callback = Callback(sub_stub)
res = callback(message_wrapper_invalid_json)

assert res is None
message_wrapper_invalid_json.ack.assert_called_once()
failed_log = caplog.records[-1]
assert failed_log.message == (
"Exception raised while processing "
"message for rele-some-cool-topic - "
"sub_stub: JSONDecodeError"
)
assert failed_log.metrics == {
"name": "subscriptions",
"data": {
"agent": "rele",
"topic": "some-cool-topic",
"status": "failed",
"subscription": "rele-some-cool-topic",
"duration_seconds": pytest.approx(0.5, abs=0.5),
"attributes": {},
},
}
assert failed_log.subscription_message == message_wrapper_invalid_json

def test_published_time_as_message_attribute(self, message_wrapper, caplog):
callback = Callback(sub_published_time_type)
callback(message_wrapper)
Expand Down

0 comments on commit 0f2e678

Please sign in to comment.