diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index e526892bd4f2..548feb254283 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -82,11 +82,15 @@ def path(self): project = self.topic.project return '/projects/%s/subscriptions/%s' % (project, self.name) - def create(self): + def create(self, connection=None): """API call: create the subscription via a PUT request See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ data = {'topic': self.topic.full_name} @@ -96,36 +100,48 @@ def create(self): if self.push_endpoint is not None: data['pushConfig'] = {'pushEndpoint': self.push_endpoint} - conn = self.topic.connection - conn.api_request(method='PUT', path=self.path, data=data) + if connection is None: + connection = self.topic.connection + + connection.api_request(method='PUT', path=self.path, data=data) - def exists(self): + def exists(self, connection=None): """API call: test existence of the subscription via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ - conn = self.topic.connection + if connection is None: + connection = self.topic.connection try: - conn.api_request(method='GET', path=self.path) + connection.api_request(method='GET', path=self.path) except NotFound: return False else: return True - def reload(self): + def reload(self, connection=None): """API call: sync local subscription configuration via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ - conn = self.topic.connection - data = conn.api_request(method='GET', path=self.path) + if connection is None: + connection = self.topic.connection + data = connection.api_request(method='GET', path=self.path) self.ack_deadline = data.get('ackDeadline') push_config = data.get('pushConfig', {}) self.push_endpoint = push_config.get('pushEndpoint') - def modify_push_configuration(self, push_endpoint): + def modify_push_configuration(self, push_endpoint, connection=None): """API call: update the push endpoint for the subscription. See: @@ -135,18 +151,23 @@ def modify_push_configuration(self, push_endpoint): :param push_endpoint: URL to which messages will be pushed by the back-end. If None, the application must pull messages. + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ + if connection is None: + connection = self.topic.connection data = {} config = data['pushConfig'] = {} if push_endpoint is not None: config['pushEndpoint'] = push_endpoint - conn = self.topic.connection - conn.api_request(method='POST', - path='%s:modifyPushConfig' % self.path, - data=data) + connection.api_request(method='POST', + path='%s:modifyPushConfig' % self.path, + data=data) self.push_endpoint = push_endpoint - def pull(self, return_immediately=False, max_messages=1): + def pull(self, return_immediately=False, max_messages=1, connection=None): """API call: retrieve messages for the subscription. See: @@ -161,21 +182,26 @@ def pull(self, return_immediately=False, max_messages=1): :type max_messages: int :param max_messages: the maximum number of messages to return. + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. + :rtype: list of (ack_id, message) tuples :returns: sequence of tuples: ``ack_id`` is the ID to be used in a subsequent call to :meth:`acknowledge`, and ``message`` is an instance of :class:`gcloud.pubsub.message.Message`. """ + if connection is None: + connection = self.topic.connection data = {'returnImmediately': return_immediately, 'maxMessages': max_messages} - conn = self.topic.connection - response = conn.api_request(method='POST', - path='%s:pull' % self.path, - data=data) + response = connection.api_request(method='POST', + path='%s:pull' % self.path, + data=data) return [(info['ackId'], Message.from_api_repr(info['message'])) for info in response['receivedMessages']] - def acknowledge(self, ack_ids): + def acknowledge(self, ack_ids, connection=None): """API call: acknowledge retrieved messages for the subscription. See: @@ -183,14 +209,19 @@ def acknowledge(self, ack_ids): :type ack_ids: list of string :param ack_ids: ack IDs of messages being acknowledged + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ + if connection is None: + connection = self.topic.connection data = {'ackIds': ack_ids} - conn = self.topic.connection - conn.api_request(method='POST', - path='%s:acknowledge' % self.path, - data=data) + connection.api_request(method='POST', + path='%s:acknowledge' % self.path, + data=data) - def modify_ack_deadline(self, ack_id, ack_deadline): + def modify_ack_deadline(self, ack_id, ack_deadline, connection=None): """API call: update acknowledgement deadline for a retrieved message. See: @@ -201,18 +232,28 @@ def modify_ack_deadline(self, ack_id, ack_deadline): :type ack_deadline: int :param ack_deadline: new deadline for the message, in seconds + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ + if connection is None: + connection = self.topic.connection data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline} - conn = self.topic.connection - conn.api_request(method='POST', - path='%s:modifyAckDeadline' % self.path, - data=data) + connection.api_request(method='POST', + path='%s:modifyAckDeadline' % self.path, + data=data) - def delete(self): + def delete(self, connection=None): """API call: delete the subscription via a DELETE request. See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ - conn = self.topic.connection - conn.api_request(method='DELETE', path=self.path) + if connection is None: + connection = self.topic.connection + connection.api_request(method='DELETE', path=self.path) diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index d074bc168564..df1a384ef388 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -120,7 +120,7 @@ def test_from_api_repr_w_topics_w_topic_match(self): self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) - def test_create_pull_wo_ack_deadline(self): + def test_create_pull_wo_ack_deadline_w_connection_attr(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -137,7 +137,7 @@ def test_create_pull_wo_ack_deadline(self): self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req['data'], BODY) - def test_create_push_w_ack_deadline(self): + def test_create_push_w_ack_deadline_w_passed_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -149,16 +149,16 @@ def test_create_push_w_ack_deadline(self): 'ackDeadline': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}} conn = _Connection({'name': SUB_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic, DEADLINE, ENDPOINT) - subscription.create() + subscription.create(connection=conn) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req['data'], BODY) - def test_exists_miss(self): + def test_exists_miss_w_connection_attr(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -173,23 +173,23 @@ def test_exists_miss(self): self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req.get('query_params'), None) - def test_exists_hit(self): + def test_exists_hit_w_passed_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - self.assertTrue(subscription.exists()) + self.assertTrue(subscription.exists(connection=conn)) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req.get('query_params'), None) - def test_reload(self): + def test_reload_w_connection_attr(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -211,7 +211,29 @@ def test_reload(self): self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) - def test_modify_push_config_w_endpoint(self): + def test_reload_w_passed_connection(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + conn = _Connection({'name': SUB_PATH, + 'topic': TOPIC_PATH, + 'ackDeadline': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.reload(connection=conn) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + + def test_modify_push_config_w_endpoint_w_connection_attr(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -229,16 +251,17 @@ def test_modify_push_config_w_endpoint(self): self.assertEqual(req['data'], {'pushConfig': {'pushEndpoint': ENDPOINT}}) - def test_modify_push_config_wo_endpoint(self): + def test_modify_push_config_wo_endpoint_w_passed_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' ENDPOINT = 'https://api.example.com/push' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic, push_endpoint=ENDPOINT) - subscription.modify_push_configuration(push_endpoint=None) + subscription.modify_push_configuration(push_endpoint=None, + connection=conn) self.assertEqual(subscription.push_endpoint, None) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -246,7 +269,7 @@ def test_modify_push_config_wo_endpoint(self): self.assertEqual(req['path'], '/%s:modifyPushConfig' % SUB_PATH) self.assertEqual(req['data'], {'pushConfig': {}}) - def test_pull_wo_return_immediately_wo_max_messages(self): + def test_pull_wo_return_immediately_wo_max_messages_w_conn_attr(self): import base64 from gcloud.pubsub.message import Message PROJECT = 'PROJECT' @@ -277,7 +300,7 @@ def test_pull_wo_return_immediately_wo_max_messages(self): self.assertEqual(req['data'], {'returnImmediately': False, 'maxMessages': 1}) - def test_pull_w_return_immediately_w_max_messages(self): + def test_pull_w_return_immediately_w_max_messages_w_passed_conn(self): import base64 from gcloud.pubsub.message import Message PROJECT = 'PROJECT' @@ -291,9 +314,10 @@ def test_pull_w_return_immediately_w_max_messages(self): MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} conn = _Connection({'receivedMessages': [REC_MESSAGE]}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - pulled = subscription.pull(return_immediately=True, max_messages=3) + pulled = subscription.pull(return_immediately=True, max_messages=3, + connection=conn) self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) @@ -308,7 +332,7 @@ def test_pull_w_return_immediately_w_max_messages(self): self.assertEqual(req['data'], {'returnImmediately': True, 'maxMessages': 3}) - def test_acknowledge(self): + def test_acknowledge_w_connection_attr(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -325,7 +349,24 @@ def test_acknowledge(self): self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) - def test_modify_ack_deadline(self): + def test_acknowledge_w_passed_connection(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + conn = _Connection({}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.acknowledge([ACK_ID1, ACK_ID2], connection=conn) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) + self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) + + def test_modify_ack_deadline_w_connection_attr(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -343,7 +384,25 @@ def test_modify_ack_deadline(self): self.assertEqual(req['data'], {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) - def test_delete(self): + def test_modify_ack_deadline_w_passed_connection(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID = 'DEADBEEF' + DEADLINE = 42 + conn = _Connection({}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.modify_ack_deadline(ACK_ID, DEADLINE, connection=conn) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:modifyAckDeadline' % SUB_PATH) + self.assertEqual(req['data'], + {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) + + def test_delete_w_connection_attr(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -357,6 +416,20 @@ def test_delete(self): self.assertEqual(req['method'], 'DELETE') self.assertEqual(req['path'], '/%s' % SUB_PATH) + def test_delete_w_passed_connection(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + conn = _Connection({}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.delete(connection=conn) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + class _Connection(object): @@ -378,7 +451,7 @@ def api_request(self, **kw): class _Topic(object): - def __init__(self, name, project, connection): + def __init__(self, name, project, connection=None): self.name = name self.project = project self.connection = connection