Skip to content

Commit

Permalink
Allow passing explicit connection to Subscription API methods.
Browse files Browse the repository at this point in the history
See #825.
  • Loading branch information
tseaver committed May 4, 2015
1 parent 865191b commit b60cf16
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 53 deletions.
105 changes: 73 additions & 32 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,15 @@ def path(self):
project = self.topic.project
return '/projects/%s/subscriptions/%s' % (project, self.name)

def create(self):
def create(self, connection=None):
"""API call: create the subscription via a PUT request
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
data = {'topic': self.topic.full_name}

Expand All @@ -96,36 +100,48 @@ def create(self):
if self.push_endpoint is not None:
data['pushConfig'] = {'pushEndpoint': self.push_endpoint}

conn = self.topic.connection
conn.api_request(method='PUT', path=self.path, data=data)
if connection is None:
connection = self.topic.connection

connection.api_request(method='PUT', path=self.path, data=data)

def exists(self):
def exists(self, connection=None):
"""API call: test existence of the subscription via a GET request
See
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
conn = self.topic.connection
if connection is None:
connection = self.topic.connection
try:
conn.api_request(method='GET', path=self.path)
connection.api_request(method='GET', path=self.path)
except NotFound:
return False
else:
return True

def reload(self):
def reload(self, connection=None):
"""API call: sync local subscription configuration via a GET request
See
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
conn = self.topic.connection
data = conn.api_request(method='GET', path=self.path)
if connection is None:
connection = self.topic.connection
data = connection.api_request(method='GET', path=self.path)
self.ack_deadline = data.get('ackDeadline')
push_config = data.get('pushConfig', {})
self.push_endpoint = push_config.get('pushEndpoint')

def modify_push_configuration(self, push_endpoint):
def modify_push_configuration(self, push_endpoint, connection=None):
"""API call: update the push endpoint for the subscription.
See:
Expand All @@ -135,18 +151,23 @@ def modify_push_configuration(self, push_endpoint):
:param push_endpoint: URL to which messages will be pushed by the
back-end. If None, the application must pull
messages.
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
if connection is None:
connection = self.topic.connection
data = {}
config = data['pushConfig'] = {}
if push_endpoint is not None:
config['pushEndpoint'] = push_endpoint
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:modifyPushConfig' % self.path,
data=data)
connection.api_request(method='POST',
path='%s:modifyPushConfig' % self.path,
data=data)
self.push_endpoint = push_endpoint

def pull(self, return_immediately=False, max_messages=1):
def pull(self, return_immediately=False, max_messages=1, connection=None):
"""API call: retrieve messages for the subscription.
See:
Expand All @@ -161,36 +182,46 @@ def pull(self, return_immediately=False, max_messages=1):
:type max_messages: int
:param max_messages: the maximum number of messages to return.
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
:rtype: list of (ack_id, message) tuples
:returns: sequence of tuples: ``ack_id`` is the ID to be used in a
subsequent call to :meth:`acknowledge`, and ``message``
is an instance of :class:`gcloud.pubsub.message.Message`.
"""
if connection is None:
connection = self.topic.connection
data = {'returnImmediately': return_immediately,
'maxMessages': max_messages}
conn = self.topic.connection
response = conn.api_request(method='POST',
path='%s:pull' % self.path,
data=data)
response = connection.api_request(method='POST',
path='%s:pull' % self.path,
data=data)
return [(info['ackId'], Message.from_api_repr(info['message']))
for info in response['receivedMessages']]

def acknowledge(self, ack_ids):
def acknowledge(self, ack_ids, connection=None):
"""API call: acknowledge retrieved messages for the subscription.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
if connection is None:
connection = self.topic.connection
data = {'ackIds': ack_ids}
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:acknowledge' % self.path,
data=data)
connection.api_request(method='POST',
path='%s:acknowledge' % self.path,
data=data)

def modify_ack_deadline(self, ack_id, ack_deadline):
def modify_ack_deadline(self, ack_id, ack_deadline, connection=None):
"""API call: update acknowledgement deadline for a retrieved message.
See:
Expand All @@ -201,18 +232,28 @@ def modify_ack_deadline(self, ack_id, ack_deadline):
:type ack_deadline: int
:param ack_deadline: new deadline for the message, in seconds
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
if connection is None:
connection = self.topic.connection
data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline}
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:modifyAckDeadline' % self.path,
data=data)
connection.api_request(method='POST',
path='%s:modifyAckDeadline' % self.path,
data=data)

def delete(self):
def delete(self, connection=None):
"""API call: delete the subscription via a DELETE request.
See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
conn = self.topic.connection
conn.api_request(method='DELETE', path=self.path)
if connection is None:
connection = self.topic.connection
connection.api_request(method='DELETE', path=self.path)
Loading

0 comments on commit b60cf16

Please sign in to comment.