Skip to content

Commit

Permalink
Re-factoring Operation base class.
Browse files Browse the repository at this point in the history
This is in preparation to support JSON/HTTP operations
as well and also to ensure that **all** of the operation
PB is parsed when polling.
  • Loading branch information
dhermes committed Oct 31, 2016
1 parent e1fd690 commit 5e26112
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 79 deletions.
8 changes: 4 additions & 4 deletions bigtable/google/cloud/bigtable/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
bigtable_instance_admin_pb2 as messages_v2_pb2)
from google.cloud.operation import Operation
from google.cloud.operation import _compute_type_url
from google.cloud.operation import _register_type_url
from google.cloud.operation import register_type_url


_CLUSTER_NAME_RE = re.compile(r'^projects/(?P<project>[^/]+)/'
Expand All @@ -36,7 +36,7 @@

_UPDATE_CLUSTER_METADATA_URL = _compute_type_url(
messages_v2_pb2.UpdateClusterMetadata)
_register_type_url(
register_type_url(
_UPDATE_CLUSTER_METADATA_URL, messages_v2_pb2.UpdateClusterMetadata)


Expand Down Expand Up @@ -218,7 +218,7 @@ def create(self):

operation = Operation.from_pb(operation_pb, client)
operation.target = self
operation.metadata['request_type'] = 'CreateCluster'
operation.caller_metadata['request_type'] = 'CreateCluster'
return operation

def update(self):
Expand Down Expand Up @@ -249,7 +249,7 @@ def update(self):

operation = Operation.from_pb(operation_pb, client)
operation.target = self
operation.metadata['request_type'] = 'UpdateCluster'
operation.caller_metadata['request_type'] = 'UpdateCluster'
return operation

def delete(self):
Expand Down
8 changes: 5 additions & 3 deletions bigtable/google/cloud/bigtable/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from google.cloud.bigtable.table import Table
from google.cloud.operation import Operation
from google.cloud.operation import _compute_type_url
from google.cloud.operation import _register_type_url
from google.cloud.operation import register_type_url


_EXISTING_INSTANCE_LOCATION_ID = 'see-existing-cluster'
Expand All @@ -38,8 +38,10 @@

_CREATE_INSTANCE_METADATA_URL = _compute_type_url(
messages_v2_pb2.CreateInstanceMetadata)
_register_type_url(
register_type_url(
_CREATE_INSTANCE_METADATA_URL, messages_v2_pb2.CreateInstanceMetadata)
_INSTANCE_METADATA_URL = _compute_type_url(data_v2_pb2.Instance)
register_type_url(_INSTANCE_METADATA_URL, data_v2_pb2.Instance)


def _prepare_create_request(instance):
Expand Down Expand Up @@ -237,7 +239,7 @@ def create(self):

operation = Operation.from_pb(operation_pb, self._client)
operation.target = self
operation.metadata['request_type'] = 'CreateInstance'
operation.caller_metadata['request_type'] = 'CreateInstance'
return operation

def update(self):
Expand Down
12 changes: 7 additions & 5 deletions bigtable/unit_tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,9 @@ def test_create(self):
self.assertEqual(result.name, OP_NAME)
self.assertIs(result.target, cluster)
self.assertIs(result.client, client)
self.assertIsNone(result.pb_metadata)
self.assertEqual(result.metadata, {'request_type': 'CreateCluster'})
self.assertIsNone(result.metadata)
self.assertEqual(result.caller_metadata,
{'request_type': 'CreateCluster'})

self.assertEqual(len(stub.method_calls), 1)
api_name, args, kwargs = stub.method_calls[0]
Expand Down Expand Up @@ -323,10 +324,11 @@ def test_update(self):
self.assertEqual(result.name, OP_NAME)
self.assertIs(result.target, cluster)
self.assertIs(result.client, client)
self.assertIsInstance(result.pb_metadata,
self.assertIsInstance(result.metadata,
messages_v2_pb2.UpdateClusterMetadata)
self.assertEqual(result.pb_metadata.request_time, NOW_PB)
self.assertEqual(result.metadata, {'request_type': 'UpdateCluster'})
self.assertEqual(result.metadata.request_time, NOW_PB)
self.assertEqual(result.caller_metadata,
{'request_type': 'UpdateCluster'})

self.assertEqual(len(stub.method_calls), 1)
api_name, args, kwargs = stub.method_calls[0]
Expand Down
7 changes: 4 additions & 3 deletions bigtable/unit_tests/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,11 @@ def test_create(self):
self.assertEqual(result.name, self.OP_NAME)
self.assertIs(result.target, instance)
self.assertIs(result.client, client)
self.assertIsInstance(result.pb_metadata,
self.assertIsInstance(result.metadata,
messages_v2_pb2.CreateInstanceMetadata)
self.assertEqual(result.pb_metadata.request_time, NOW_PB)
self.assertEqual(result.metadata, {'request_type': 'CreateInstance'})
self.assertEqual(result.metadata.request_time, NOW_PB)
self.assertEqual(result.caller_metadata,
{'request_type': 'CreateInstance'})

self.assertEqual(len(stub.method_calls), 1)
api_name, args, kwargs = stub.method_calls[0]
Expand Down
127 changes: 99 additions & 28 deletions core/google/cloud/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _compute_type_url(klass, prefix=_GOOGLE_APIS_PREFIX):
return '%s/%s' % (prefix, name)


def _register_type_url(type_url, klass):
def register_type_url(type_url, klass):
"""Register a klass as the factory for a given type URL.
:type type_url: str
Expand All @@ -57,55 +57,102 @@ def _register_type_url(type_url, klass):
_TYPE_URL_MAP[type_url] = klass


def _from_any(any_pb):
"""Convert an ``Any`` protobuf into the actual class.
Uses the type URL to do the conversion.
.. note::
This assumes that the type URL is already registered.
:type any_pb: :class:`google.protobuf.any_pb2.Any`
:param any_pb: An any object to be converted.
:rtype: object
:returns: The instance (of the correct type) stored in the any
instance.
"""
klass = _TYPE_URL_MAP[any_pb.type_url]
return klass.FromString(any_pb.value)


class Operation(object):
"""Representation of a Google API Long-Running Operation.
.. _protobuf: https://github.com/googleapis/googleapis/blob/\
050400df0fdb16f63b63e9dee53819044bffc857/\
google/longrunning/operations.proto#L80
.. _service: https://github.com/googleapis/googleapis/blob/\
050400df0fdb16f63b63e9dee53819044bffc857/\
google/longrunning/operations.proto#L38
.. _JSON: https://cloud.google.com/speech/reference/rest/\
v1beta1/operations#Operation
This wraps an operation `protobuf`_ object and attempts to
interact with the long-running operations `service`_ (specific
to a given API). (Some services also offer a `JSON`_
API that maps the same underlying data type.)
:type name: str
:param name: The fully-qualified path naming the operation.
:type client: object: must provide ``_operations_stub`` accessor.
:param client: The client used to poll for the status of the operation.
:type pb_metadata: object
:param pb_metadata: Instance of protobuf metadata class
:type kw: dict
:param kw: caller-assigned metadata about the operation
:type caller_metadata: dict
:param caller_metadata: caller-assigned metadata about the operation
"""

target = None
"""Instance assocated with the operations: callers may set."""

def __init__(self, name, client, pb_metadata=None, **kw):
response = None
"""Response returned from completed operation.
Only one of this and :attr:`error` can be populated.
"""

error = None
"""Error that resulted from a failed (complete) operation.
Only one of this and :attr:`response` can be populated.
"""

metadata = None
"""Metadata about the current operation (as a protobuf).
Code that uses operations must register the metadata types (via
:func:`register_type_url`) to ensure that the metadata fields can be
converted into the correct types.
"""

def __init__(self, name, client, **caller_metadata):
self.name = name
self.client = client
self.pb_metadata = pb_metadata
self.metadata = kw.copy()
self.caller_metadata = caller_metadata.copy()
self._complete = False

@classmethod
def from_pb(cls, op_pb, client, **kw):
def from_pb(cls, operation_pb, client, **caller_metadata):
"""Factory: construct an instance from a protobuf.
:type op_pb: :class:`google.longrunning.operations_pb2.Operation`
:param op_pb: Protobuf to be parsed.
:type operation_pb:
:class:`~google.longrunning.operations_pb2.Operation`
:param operation_pb: Protobuf to be parsed.
:type client: object: must provide ``_operations_stub`` accessor.
:param client: The client used to poll for the status of the operation.
:type kw: dict
:param kw: caller-assigned metadata about the operation
:type caller_metadata: dict
:param caller_metadata: caller-assigned metadata about the operation
:rtype: :class:`Operation`
:returns: new instance, with attributes based on the protobuf.
"""
pb_metadata = None
if op_pb.metadata.type_url:
type_url = op_pb.metadata.type_url
md_klass = _TYPE_URL_MAP.get(type_url)
if md_klass:
pb_metadata = md_klass.FromString(op_pb.metadata.value)
return cls(op_pb.name, client, pb_metadata, **kw)
result = cls(operation_pb.name, client, **caller_metadata)
result._update_state(operation_pb)
return result

@property
def complete(self):
Expand All @@ -116,22 +163,46 @@ def complete(self):
"""
return self._complete

def _get_operation_rpc(self):
"""Polls the status of the current operation.
:rtype: :class:`~google.longrunning.operations_pb2.Operation`
:returns: The latest status of the current operation.
"""
request_pb = operations_pb2.GetOperationRequest(name=self.name)
return self.client._operations_stub.GetOperation(request_pb)

def _update_state(self, operation_pb):
"""Update the state of the current object based on operation.
:type operation_pb:
:class:`~google.longrunning.operations_pb2.Operation`
:param operation_pb: Protobuf to be parsed.
"""
if operation_pb.done:
self._complete = True

if operation_pb.HasField('metadata'):
self.metadata = _from_any(operation_pb.metadata)

result_type = operation_pb.WhichOneof('result')
if result_type == 'error':
self.error = operation_pb.error
elif result_type == 'response':
self.response = _from_any(operation_pb.response)

def poll(self):
"""Check if the operation has finished.
:rtype: bool
:returns: A boolean indicating if the current operation has completed.
:raises: :class:`ValueError <exceptions.ValueError>` if the operation
:raises: :class:`~exceptions.ValueError` if the operation
has already completed.
"""
if self.complete:
raise ValueError('The operation has completed.')

request_pb = operations_pb2.GetOperationRequest(name=self.name)
# We expect a `google.longrunning.operations_pb2.Operation`.
operation_pb = self.client._operations_stub.GetOperation(request_pb)

if operation_pb.done:
self._complete = True
operation_pb = self._get_operation_rpc()
self._update_state(operation_pb)

return self.complete
Loading

0 comments on commit 5e26112

Please sign in to comment.