diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py index afe907b09cb64..85836eaf33746 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py @@ -104,6 +104,7 @@ def __init__( self.expected_msg_len = expected_msg_len or len(self.expected_msg) self.timeout = timeout self.messages = None + self.messages_all_details = None self.with_attributes = with_attributes self.strip_attributes = strip_attributes self.sleep_time = sleep_time @@ -112,7 +113,7 @@ def __init__( def _matches(self, _): if self.messages is None: - self.messages = self._wait_for_messages( + self.messages, self.messages_all_details = self._wait_for_messages( self.expected_msg_len, self.timeout) if self.expected_msg: return Counter(self.messages) == Counter(self.expected_msg) @@ -122,6 +123,7 @@ def _matches(self, _): def _wait_for_messages(self, expected_num, timeout): """Wait for messages from given subscription.""" total_messages = [] + total_messages_all_details = [] sub_client = pubsub.SubscriberClient() start_time = time.time() @@ -132,8 +134,15 @@ def _wait_for_messages(self, expected_num, timeout): timeout=self.pull_timeout) for rm in response.received_messages: msg = PubsubMessage._from_message(rm.message) + full_message = ( + msg.data, + msg.attributes, + msg.attributes, + msg.publish_time, + msg.ordering_key) if not self.with_attributes: total_messages.append(msg.data) + total_messages_all_details.append(full_message) continue if self.strip_attributes: @@ -145,6 +154,7 @@ def _wait_for_messages(self, expected_num, timeout): 'PubSubMessageMatcher error: ' 'expected attribute not found.') total_messages.append(msg) + total_messages_all_details.append(full_message) ack_ids = [rm.ack_id for rm in response.received_messages] if ack_ids: @@ -159,7 +169,7 @@ def _wait_for_messages(self, expected_num, timeout): timeout, len(total_messages), self.sub_name) - return total_messages + return total_messages, total_messages_all_details def describe_to(self, description): description.append_text('Expected %d messages.' % self.expected_msg_len) @@ -169,11 +179,25 @@ def describe_mismatch(self, _, mismatch_description): c_actual = Counter(self.messages) mismatch_description.append_text("Got %d messages. " % (len(self.messages))) if self.expected_msg: + expected = (c_expected - c_actual).items() + unexpected = (c_actual - c_expected).items() + unexpected_keys = [repr(item[0]) for item in unexpected] + if self.with_attributes: + unexpected_all_details = [ + x for x in self.messages_all_details + if 'PubsubMessage(%s, %s)' % (repr(x[0]), x[1]) in unexpected_keys + ] + else: + unexpected_all_details = [ + x for x in self.messages_all_details + if repr(x[0]) in unexpected_keys + ] mismatch_description.append_text( "Diffs (item, count):\n" " Expected but not in actual: %s\n" - " Unexpected: %s" % ((c_expected - c_actual).items(), - (c_actual - c_expected).items())) + " Unexpected: %s\n" + " Unexpected (with all details): %s" % + (expected, unexpected, unexpected_all_details)) if self.with_attributes and self.strip_attributes: mismatch_description.append_text( '\n Stripped attributes: %r' % self.strip_attributes)