From 3cb395c4b1ed16050f898ce5f948da1f83789b98 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 16:16:59 -0400 Subject: [PATCH 01/10] Factor out common job test scaffolding. --- gcloud/bigquery/test_job.py | 108 ++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 41 deletions(-) diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 56a517d73d42..55d170072acf 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -14,18 +14,13 @@ import unittest2 - -class TestLoadTableFromStorageJob(unittest2.TestCase): +class _Base(object): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' DS_NAME = 'datset_name' TABLE_NAME = 'table_name' JOB_NAME = 'job_name' - def _getTargetClass(self): - from gcloud.bigquery.job import LoadTableFromStorageJob - return LoadTableFromStorageJob - def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) @@ -40,21 +35,17 @@ def _setUpConstants(self): self.JOB_ID = '%s:%s' % (self.PROJECT, self.JOB_NAME) self.RESOURCE_URL = 'http://example.com/path/to/resource' self.USER_EMAIL = 'phred@example.com' - self.INPUT_FILES = 2 - self.INPUT_BYTES = 12345 - self.OUTPUT_BYTES = 23456 - self.OUTPUT_ROWS = 345 def _makeResource(self, started=False, ended=False): self._setUpConstants() resource = { 'configuration': { - 'load': { + self.JOB_TYPE: { }, }, 'statistics': { 'creationTime': self.WHEN_TS * 1000, - 'load': { + self.JOB_TYPE: { } }, 'etag': self.ETAG, @@ -72,43 +63,90 @@ def _makeResource(self, started=False, ended=False): if ended: resource['statistics']['endTime'] = (self.WHEN_TS + 1000) * 1000 - resource['statistics']['load']['inputFiles'] = self.INPUT_FILES - resource['statistics']['load']['inputFileBytes'] = self.INPUT_BYTES - resource['statistics']['load']['outputBytes'] = self.OUTPUT_BYTES - resource['statistics']['load']['outputRows'] = self.OUTPUT_ROWS return resource + def _verifyInitialReadonlyProperties(self, job): + # root elements of resource + self.assertEqual(job.etag, None) + self.assertEqual(job.job_id, None) + self.assertEqual(job.self_link, None) + self.assertEqual(job.user_email, None) + + # derived from resource['statistics'] + self.assertEqual(job.created, None) + self.assertEqual(job.started, None) + self.assertEqual(job.ended, None) + + # derived from resource['status'] + self.assertEqual(job.error_result, None) + self.assertEqual(job.errors, None) + self.assertEqual(job.state, None) + def _verifyReadonlyResourceProperties(self, job, resource): from datetime import timedelta self.assertEqual(job.job_id, self.JOB_ID) - if 'creationTime' in resource.get('statistics', {}): + statistics = resource.get('statistics', {}) + + if 'creationTime' in statistics: self.assertEqual(job.created, self.WHEN) else: self.assertEqual(job.created, None) - if 'startTime' in resource.get('statistics', {}): + + if 'startTime' in statistics: self.assertEqual(job.started, self.WHEN) else: self.assertEqual(job.started, None) - if 'endTime' in resource.get('statistics', {}): + + if 'endTime' in statistics: self.assertEqual(job.ended, self.WHEN + timedelta(seconds=1000)) else: self.assertEqual(job.ended, None) + if 'etag' in resource: self.assertEqual(job.etag, self.ETAG) else: self.assertEqual(job.etag, None) + if 'selfLink' in resource: self.assertEqual(job.self_link, self.RESOURCE_URL) else: self.assertEqual(job.self_link, None) + if 'user_email' in resource: self.assertEqual(job.user_email, self.USER_EMAIL) else: self.assertEqual(job.user_email, None) + +class TestLoadTableFromStorageJob(unittest2.TestCase, _Base): + JOB_TYPE = 'load' + + def _getTargetClass(self): + from gcloud.bigquery.job import LoadTableFromStorageJob + return LoadTableFromStorageJob + + def _setUpConstants(self): + super(TestLoadTableFromStorageJob, self)._setUpConstants() + self.INPUT_FILES = 2 + self.INPUT_BYTES = 12345 + self.OUTPUT_BYTES = 23456 + self.OUTPUT_ROWS = 345 + + def _makeResource(self, started=False, ended=False): + resource = super(TestLoadTableFromStorageJob, self)._makeResource( + started, ended) + + if ended: + resource['statistics']['load']['inputFiles'] = self.INPUT_FILES + resource['statistics']['load']['inputFileBytes'] = self.INPUT_BYTES + resource['statistics']['load']['outputBytes'] = self.OUTPUT_BYTES + resource['statistics']['load']['outputRows'] = self.OUTPUT_ROWS + + return resource + def _verifyBooleanConfigProperties(self, job, config): if 'allowJaggedRows' in config: self.assertEqual(job.allow_jagged_rows, @@ -188,6 +226,16 @@ def test_ctor(self): job.path, '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) self.assertEqual(job.schema, []) + + self._verifyInitialReadonlyProperties(job) + + # derived from resource['statistics']['load'] + self.assertEqual(job.input_file_bytes, None) + self.assertEqual(job.input_files, None) + self.assertEqual(job.output_bytes, None) + self.assertEqual(job.output_rows, None) + + # set/read from resource['configuration']['load'] self.assertTrue(job.allow_jagged_rows is None) self.assertTrue(job.allow_quoted_newlines is None) self.assertTrue(job.create_disposition is None) @@ -200,28 +248,6 @@ def test_ctor(self): self.assertTrue(job.source_format is None) self.assertTrue(job.write_disposition is None) - # root elements of resource - self.assertEqual(job.etag, None) - self.assertEqual(job.job_id, None) - self.assertEqual(job.self_link, None) - self.assertEqual(job.user_email, None) - - # derived from resource['statistics'] - self.assertEqual(job.created, None) - self.assertEqual(job.started, None) - self.assertEqual(job.ended, None) - - # derived from resource['statistics']['load'] - self.assertEqual(job.input_file_bytes, None) - self.assertEqual(job.input_files, None) - self.assertEqual(job.output_bytes, None) - self.assertEqual(job.output_rows, None) - - # derived from resource['status'] - self.assertEqual(job.error_result, None) - self.assertEqual(job.errors, None) - self.assertEqual(job.state, None) - def test_ctor_w_schema(self): from gcloud.bigquery.table import SchemaField client = _Client(self.PROJECT) From 6a18db52890afa1b14df2f87b0875f5ab10afdd7 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 16:33:57 -0400 Subject: [PATCH 02/10] Factor out common job features. --- gcloud/bigquery/job.py | 174 +++++++++++++++++++++--------------- gcloud/bigquery/test_job.py | 15 ++-- 2 files changed, 112 insertions(+), 77 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index c48a3bc96cab..c3a8385ebfad 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -86,33 +86,20 @@ class _LoadConfiguration(object): _write_disposition = None -class LoadTableFromStorageJob(object): - """Asynchronous job for loading data into a BQ table from CloudStorage. +class _BaseJob(object): + """Base class for asynchronous jobs. :type name: string :param name: the name of the job - :type destination: :class:`gcloud.bigquery.table.Table` - :param destination: Table into which data is to be loaded. - - :type source_uris: sequence of string - :param source_uris: URIs of data files to be loaded. - :type client: :class:`gcloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). - - :type schema: list of :class:`gcloud.bigquery.table.SchemaField` - :param schema: The job's schema """ - def __init__(self, name, destination, source_uris, client, schema=()): + def __init__(self, name, client): self.name = name - self.destination = destination self._client = client - self.source_uris = source_uris - self.schema = schema self._properties = {} - self._configuration = _LoadConfiguration() @property def project(self): @@ -132,29 +119,6 @@ def path(self): """ return '/projects/%s/jobs/%s' % (self.project, self.name) - @property - def schema(self): - """Table's schema. - - :rtype: list of :class:`SchemaField` - :returns: fields describing the schema - """ - return list(self._schema) - - @schema.setter - def schema(self, value): - """Update table's schema - - :type value: list of :class:`SchemaField` - :param value: fields describing the schema - - :raises: TypeError if 'value' is not a sequence, or ValueError if - any item in the sequence is not a SchemaField - """ - if not all(isinstance(field, SchemaField) for field in value): - raise ValueError('Schema items must be fields') - self._schema = tuple(value) - @property def etag(self): """ETag for the job resource. @@ -230,6 +194,105 @@ def ended(self): if millis is not None: return _datetime_from_microseconds(millis * 1000.0) + @property + def error_result(self): + """Error information about the job as a whole. + + :rtype: mapping, or ``NoneType`` + :returns: the error information (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status.get('errorResult') + + @property + def errors(self): + """Information about individual errors generated by the job. + + :rtype: list of mappings, or ``NoneType`` + :returns: the error information (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status.get('errors') + + @property + def state(self): + """Status of the job. + + :rtype: string, or ``NoneType`` + :returns: the state (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status.get('state') + + +class _LoadConfiguration(object): + """User-settable configuration options for load jobs.""" + # None -> use server default. + _allow_jagged_rows = None + _allow_quoted_newlines = None + _create_disposition = None + _encoding = None + _field_delimiter = None + _ignore_unknown_values = None + _max_bad_records = None + _quote_character = None + _skip_leading_rows = None + _source_format = None + _write_disposition = None + + +class LoadTableFromStorageJob(_BaseJob): + """Asynchronous job for loading data into a BQ table from CloudStorage. + + :type name: string + :param name: the name of the job + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be loaded. + + :type source_uris: sequence of string + :param source_uris: URIs of data files to be loaded. + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + + :type schema: list of :class:`gcloud.bigquery.table.SchemaField` + :param schema: The job's schema + """ + def __init__(self, name, destination, source_uris, client, schema=()): + super(LoadTableFromStorageJob, self).__init__(name, client) + self.destination = destination + self.source_uris = source_uris + self.schema = schema + self._configuration = _LoadConfiguration() + + @property + def schema(self): + """Table's schema. + + :rtype: list of :class:`SchemaField` + :returns: fields describing the schema + """ + return list(self._schema) + + @schema.setter + def schema(self, value): + """Update table's schema + + :type value: list of :class:`SchemaField` + :param value: fields describing the schema + + :raises: TypeError if 'value' is not a sequence, or ValueError if + any item in the sequence is not a SchemaField + """ + if not all(isinstance(field, SchemaField) for field in value): + raise ValueError('Schema items must be fields') + self._schema = tuple(value) + @property def input_file_bytes(self): """Count of bytes loaded from source files. @@ -274,39 +337,6 @@ def output_rows(self): if statistics is not None: return int(statistics['load']['outputRows']) - @property - def error_result(self): - """Error information about the job as a whole. - - :rtype: mapping, or ``NoneType`` - :returns: the error information (None until set from the server). - """ - status = self._properties.get('status') - if status is not None: - return status['errorResult'] - - @property - def errors(self): - """Information about individual errors generated by the job. - - :rtype: list of mappings, or ``NoneType`` - :returns: the error information (None until set from the server). - """ - status = self._properties.get('status') - if status is not None: - return status['errors'] - - @property - def state(self): - """Status of the job. - - :rtype: string, or ``NoneType`` - :returns: the state (None until set from the server). - """ - status = self._properties.get('status') - if status is not None: - return status['state'] - @property def allow_jagged_rows(self): """Allow rows with missing trailing commas for optional fields. diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 55d170072acf..8609ee9e5f30 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -319,11 +319,6 @@ def test_props_set_by_server(self): load_stats['outputBytes'] = 23456 load_stats['outputRows'] = 345 - status = job._properties['status'] = {} - status['errorResult'] = ERROR_RESULT - status['errors'] = [ERROR_RESULT] - status['state'] = 'STATE' - self.assertEqual(job.etag, 'ETAG') self.assertEqual(job.job_id, JOB_ID) self.assertEqual(job.self_link, URL) @@ -338,6 +333,16 @@ def test_props_set_by_server(self): self.assertEqual(job.output_bytes, 23456) self.assertEqual(job.output_rows, 345) + status = job._properties['status'] = {} + + self.assertEqual(job.error_result, None) + self.assertEqual(job.errors, None) + self.assertEqual(job.state, None) + + status['errorResult'] = ERROR_RESULT + status['errors'] = [ERROR_RESULT] + status['state'] = 'STATE' + self.assertEqual(job.error_result, ERROR_RESULT) self.assertEqual(job.errors, [ERROR_RESULT]) self.assertEqual(job.state, 'STATE') From ba46b2c82d376f1f1370b59087d40af401ad3c56 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 16:38:16 -0400 Subject: [PATCH 03/10] Add 'job.CopyJob' w/ appropriate config properties. See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy --- gcloud/bigquery/job.py | 79 +++++++++++++++++++++++++++++++++++++ gcloud/bigquery/test_job.py | 57 ++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index c3a8385ebfad..61bf1ea697bc 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -776,3 +776,82 @@ def cancel(self, client=None): api_response = client.connection.api_request( method='POST', path='%s/cancel' % self.path) self._set_properties(api_response) + + +class _CopyConfiguration(object): + """User-settable configuration options for copy jobs.""" + # None -> use server default. + _create_disposition = None + _write_disposition = None + + +class CopyJob(_BaseJob): + """Asynchronous job: copy data into a BQ table from other tables. + + :type name: string + :param name: the name of the job + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be loaded. + + :type sources: list of :class:`gcloud.bigquery.table.Table` + :param sources: Table into which data is to be loaded. + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + """ + def __init__(self, name, destination, sources, client): + super(CopyJob, self).__init__(name, client) + self.destination = destination + self.sources = sources + self._configuration = _CopyConfiguration() + + @property + def create_disposition(self): + """Handling for missing destination table. + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._create_disposition + + @create_disposition.setter + def create_disposition(self, value): + """Update create_disposition. + + :type value: boolean + :param value: new create_disposition: one of "CREATE_IF_NEEDED" or + "CREATE_NEVER" + """ + CreateDisposition.validate(value) # raises ValueError if invalid + self._configuration._create_disposition = value + + @create_disposition.deleter + def create_disposition(self): + """Delete create_disposition.""" + del self._configuration._create_disposition + + @property + def write_disposition(self): + """Allow rows with missing trailing commas for optional fields. + + :rtype: boolean, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._write_disposition + + @write_disposition.setter + def write_disposition(self, value): + """Update write_disposition. + + :type value: string + :param value: allowed values for :class:`WriteDisposition`. + """ + WriteDisposition.validate(value) # raises ValueError if invalid + self._configuration._write_disposition = value + + @write_disposition.deleter + def write_disposition(self): + """Delete write_disposition.""" + del self._configuration._write_disposition diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 8609ee9e5f30..ed7ae0ef6d4b 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -734,6 +734,63 @@ def test_cancel_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) +class TestCopyJob(unittest2.TestCase, _Base): + JOB_TYPE = 'copy' + + def _getTargetClass(self): + from gcloud.bigquery.job import CopyJob + return CopyJob + + def test_ctor(self): + client = _Client(self.PROJECT) + source, destination = _Table(), _Table() + job = self._makeOne(self.JOB_NAME, destination, [source], client) + self.assertTrue(job.destination is destination) + self.assertEqual(job.sources, [source]) + self.assertTrue(job._client is client) + self.assertEqual( + job.path, + '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) + + self._verifyInitialReadonlyProperties(job) + + # set/read from resource['configuration']['copy'] + self.assertTrue(job.create_disposition is None) + self.assertTrue(job.write_disposition is None) + + def test_create_disposition_setter_bad_value(self): + client = _Client(self.PROJECT) + source, destination = _Table(), _Table() + job = self._makeOne(self.JOB_NAME, destination, [source], client) + with self.assertRaises(ValueError): + job.create_disposition = 'BOGUS' + + def test_create_disposition_setter_deleter(self): + client = _Client(self.PROJECT) + source, destination = _Table(), _Table() + job = self._makeOne(self.JOB_NAME, destination, [source], client) + job.create_disposition = 'CREATE_IF_NEEDED' + self.assertEqual(job.create_disposition, 'CREATE_IF_NEEDED') + del job.create_disposition + self.assertTrue(job.create_disposition is None) + + def test_write_disposition_setter_bad_value(self): + client = _Client(self.PROJECT) + source, destination = _Table(), _Table() + job = self._makeOne(self.JOB_NAME, destination, [source], client) + with self.assertRaises(ValueError): + job.write_disposition = 'BOGUS' + + def test_write_disposition_setter_deleter(self): + client = _Client(self.PROJECT) + source, destination = _Table(), _Table() + job = self._makeOne(self.JOB_NAME, destination, [source], client) + job.write_disposition = 'WRITE_TRUNCATE' + self.assertEqual(job.write_disposition, 'WRITE_TRUNCATE') + del job.write_disposition + self.assertTrue(job.write_disposition is None) + + class _Client(object): def __init__(self, project='project', connection=None): From 3ef5ed1c1446e0b0e6ef8775c99f60bf92344efd Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 17:00:26 -0400 Subject: [PATCH 04/10] Add API method support for CopyJob. Factor guts out from LoadFromStorageJob. --- gcloud/bigquery/job.py | 245 +++++++++++++++++++++--------------- gcloud/bigquery/test_job.py | 204 ++++++++++++++++++++++++++++-- 2 files changed, 342 insertions(+), 107 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 61bf1ea697bc..1866fe62d9bd 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -227,6 +227,112 @@ def state(self): if status is not None: return status.get('state') + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + + :rtype: :class:`gcloud.bigquery.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self._client + return client + + def _scrub_local_properties(self, cleaned): + """Helper: handle subclass properties in cleaned.""" + pass + + def _set_properties(self, api_response): + """Update properties from resource in body of ``api_response`` + + :type api_response: httplib2.Response + :param api_response: response returned from an API call + """ + cleaned = api_response.copy() + self._scrub_local_properties(cleaned) + + statistics = cleaned.get('statistics', {}) + if 'creationTime' in statistics: + statistics['creationTime'] = float(statistics['creationTime']) + if 'startTime' in statistics: + statistics['startTime'] = float(statistics['startTime']) + if 'endTime' in statistics: + statistics['endTime'] = float(statistics['endTime']) + + self._properties.clear() + self._properties.update(cleaned) + + def begin(self, client=None): + """API call: begin the job via a POST request + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + path = '/projects/%s/jobs' % (self.project,) + api_response = client.connection.api_request( + method='POST', path=path, data=self._build_resource()) + self._set_properties(api_response) + + def exists(self, client=None): + """API call: test for the existence of the job via a GET request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + try: + client.connection.api_request(method='GET', path=self.path, + query_params={'fields': 'id'}) + except NotFound: + return False + else: + return True + + def reload(self, client=None): + """API call: refresh job properties via a GET request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + api_response = client.connection.api_request( + method='GET', path=self.path) + self._set_properties(api_response) + + def cancel(self, client=None): + """API call: cancel job via a POST request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/cancel + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + api_response = client.connection.api_request( + method='POST', path='%s/cancel' % self.path) + self._set_properties(api_response) + class _LoadConfiguration(object): """User-settable configuration options for load jobs.""" @@ -622,20 +728,6 @@ def write_disposition(self): """Delete write_disposition.""" del self._configuration._write_disposition - def _require_client(self, client): - """Check client or verify over-ride. - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - - :rtype: :class:`gcloud.bigquery.client.Client` - :returns: The client passed in or the currently bound client. - """ - if client is None: - client = self._client - return client - def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" if self.allow_jagged_rows is not None: @@ -688,95 +780,11 @@ def _build_resource(self): return resource - def _set_properties(self, api_response): - """Update properties from resource in body of ``api_response`` - - :type api_response: httplib2.Response - :param api_response: response returned from an API call - """ - self._properties.clear() - cleaned = api_response.copy() + def _scrub_local_properties(self, cleaned): + """Helper: handle subclass properties in cleaned.""" schema = cleaned.pop('schema', {'fields': ()}) self.schema = _parse_schema_resource(schema) - statistics = cleaned.get('statistics', {}) - if 'creationTime' in statistics: - statistics['creationTime'] = float(statistics['creationTime']) - if 'startTime' in statistics: - statistics['startTime'] = float(statistics['startTime']) - if 'endTime' in statistics: - statistics['endTime'] = float(statistics['endTime']) - - self._properties.update(cleaned) - - def begin(self, client=None): - """API call: begin the job via a POST request - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - """ - client = self._require_client(client) - path = '/projects/%s/jobs' % (self.project,) - api_response = client.connection.api_request( - method='POST', path=path, data=self._build_resource()) - self._set_properties(api_response) - - def exists(self, client=None): - """API call: test for the existence of the job via a GET request - - See - https://cloud.google.com/bigquery/docs/reference/v2/jobs/get - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - """ - client = self._require_client(client) - - try: - client.connection.api_request(method='GET', path=self.path, - query_params={'fields': 'id'}) - except NotFound: - return False - else: - return True - - def reload(self, client=None): - """API call: refresh job properties via a GET request - - See - https://cloud.google.com/bigquery/docs/reference/v2/jobs/get - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - """ - client = self._require_client(client) - - api_response = client.connection.api_request( - method='GET', path=self.path) - self._set_properties(api_response) - - def cancel(self, client=None): - """API call: cancel job via a POST request - - See - https://cloud.google.com/bigquery/docs/reference/v2/jobs/cancel - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - """ - client = self._require_client(client) - - api_response = client.connection.api_request( - method='POST', path='%s/cancel' % self.path) - self._set_properties(api_response) - class _CopyConfiguration(object): """User-settable configuration options for copy jobs.""" @@ -855,3 +863,40 @@ def write_disposition(self, value): def write_disposition(self): """Delete write_disposition.""" del self._configuration._write_disposition + + def _populate_config_resource(self, configuration): + + if self.create_disposition is not None: + configuration['createDisposition'] = self.create_disposition + if self.write_disposition is not None: + configuration['writeDisposition'] = self.write_disposition + + def _build_resource(self): + """Generate a resource for ``begin``.""" + + source_refs = [{ + 'projectId': table.project, + 'datasetId': table.dataset_name, + 'tableId': table.name, + } for table in self.sources] + + resource = { + 'jobReference': { + 'projectId': self.project, + 'jobId': self.name, + }, + 'configuration': { + 'copy': { + 'sourceTables': source_refs, + 'destinationTable': { + 'projectId': self.destination.project, + 'datasetId': self.destination.dataset_name, + 'tableId': self.destination.name, + }, + }, + }, + } + configuration = resource['configuration']['copy'] + self._populate_config_resource(configuration) + + return resource diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index ed7ae0ef6d4b..edf621ad5740 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -14,6 +14,7 @@ import unittest2 + class _Base(object): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' @@ -736,14 +737,34 @@ def test_cancel_w_alternate_client(self): class TestCopyJob(unittest2.TestCase, _Base): JOB_TYPE = 'copy' + SOURCE_TABLE = 'source_table' + DESTINATION_TABLE = 'destination_table' def _getTargetClass(self): from gcloud.bigquery.job import CopyJob return CopyJob + def _verifyResourceProperties(self, job, resource): + self._verifyReadonlyResourceProperties(job, resource) + + config = resource.get('configuration', {}).get('copy') + + if 'createDisposition' in config: + self.assertEqual(job.create_disposition, + config['createDisposition']) + else: + self.assertTrue(job.create_disposition is None) + + if 'writeDisposition' in config: + self.assertEqual(job.write_disposition, + config['writeDisposition']) + else: + self.assertTrue(job.write_disposition is None) + def test_ctor(self): client = _Client(self.PROJECT) - source, destination = _Table(), _Table() + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) job = self._makeOne(self.JOB_NAME, destination, [source], client) self.assertTrue(job.destination is destination) self.assertEqual(job.sources, [source]) @@ -760,14 +781,16 @@ def test_ctor(self): def test_create_disposition_setter_bad_value(self): client = _Client(self.PROJECT) - source, destination = _Table(), _Table() + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) job = self._makeOne(self.JOB_NAME, destination, [source], client) with self.assertRaises(ValueError): job.create_disposition = 'BOGUS' def test_create_disposition_setter_deleter(self): client = _Client(self.PROJECT) - source, destination = _Table(), _Table() + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) job = self._makeOne(self.JOB_NAME, destination, [source], client) job.create_disposition = 'CREATE_IF_NEEDED' self.assertEqual(job.create_disposition, 'CREATE_IF_NEEDED') @@ -776,20 +799,185 @@ def test_create_disposition_setter_deleter(self): def test_write_disposition_setter_bad_value(self): client = _Client(self.PROJECT) - source, destination = _Table(), _Table() + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) job = self._makeOne(self.JOB_NAME, destination, [source], client) with self.assertRaises(ValueError): job.write_disposition = 'BOGUS' def test_write_disposition_setter_deleter(self): client = _Client(self.PROJECT) - source, destination = _Table(), _Table() + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) job = self._makeOne(self.JOB_NAME, destination, [source], client) job.write_disposition = 'WRITE_TRUNCATE' self.assertEqual(job.write_disposition, 'WRITE_TRUNCATE') del job.write_disposition self.assertTrue(job.write_disposition is None) + def test_begin_w_bound_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'copy': { + 'sourceTables': [{ + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE + }], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.DESTINATION_TABLE, + }, + }, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_begin_w_alternate_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource(ended=True) + COPY_CONFIGURATION = { + 'sourceTables': [{ + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE, + }], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.DESTINATION_TABLE, + }, + 'createDisposition': 'CREATE_NEVER', + 'writeDisposition': 'WRITE_TRUNCATE', + } + RESOURCE['configuration']['copy'] = COPY_CONFIGURATION + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client1) + + job.create_disposition = 'CREATE_NEVER' + job.write_disposition = 'WRITE_TRUNCATE' + + job.begin(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'copy': COPY_CONFIGURATION, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_exists_miss_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn = _Connection() + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + + self.assertFalse(job.exists()) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_exists_hit_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection({}) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client1) + + self.assertTrue(job.exists(client=client2)) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_reload_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + + job.reload() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + def test_reload_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client1) + + job.reload(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + class _Client(object): @@ -800,11 +988,13 @@ def __init__(self, project='project', connection=None): class _Table(object): - def __init__(self): - pass + def __init__(self, name=None): + self._name = name @property def name(self): + if self._name is not None: + return self._name return TestLoadTableFromStorageJob.TABLE_NAME @property From 16dba9dcf3bc9b0c19cf49fb96a788d6dd5387e1 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 17:18:18 -0400 Subject: [PATCH 05/10] Add 'Client.copy_table' factory. --- gcloud/bigquery/client.py | 18 ++++++++++++++++++ gcloud/bigquery/test_client.py | 20 ++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 25b5cdcd1567..748b60a4d48b 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -18,6 +18,7 @@ from gcloud.client import JSONClient from gcloud.bigquery.connection import Connection from gcloud.bigquery.dataset import Dataset +from gcloud.bigquery.job import CopyJob from gcloud.bigquery.job import LoadTableFromStorageJob @@ -115,3 +116,20 @@ def load_table_from_storage(self, name, destination, *source_uris): """ return LoadTableFromStorageJob(name, destination, source_uris, client=self) + + def copy_table(self, name, destination, *sources): + """Construct a job for copying one or more tables into another table. + + :type name: string + :param name: Name of the job. + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be copied. + + :type sources: sequence of :class:`gcloud.bigquery.table.Table` + :param sources: tables to be copied. + + :rtype: :class:`gcloud.bigquery.job.CopyJob` + :returns: a new ``CopyJob`` instance + """ + return CopyJob(name, destination, sources, client=self) diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index 548814d7c744..653a964c9faf 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -147,6 +147,26 @@ def test_load_table_from_storage(self): self.assertEqual(list(job.source_uris), [SOURCE_URI]) self.assertTrue(job.destination is destination) + def test_copy_table(self): + from gcloud.bigquery.job import CopyJob + PROJECT = 'PROJECT' + JOB = 'job_name' + DATASET = 'dataset_name' + SOURCE = 'source_table' + DESTINATION = 'destination_table' + creds = _Credentials() + http = object() + client = self._makeOne(project=PROJECT, credentials=creds, http=http) + dataset = client.dataset(DATASET) + source = dataset.table(SOURCE) + destination = dataset.table(DESTINATION) + job = client.copy_table(JOB, destination, source) + self.assertTrue(isinstance(job, CopyJob)) + self.assertTrue(job._client is client) + self.assertEqual(job.name, JOB) + self.assertEqual(list(job.sources), [source]) + self.assertTrue(job.destination is destination) + class _Credentials(object): From 99cde2b9c69406968943fa7fe7547e434f5edf91 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 17 Aug 2015 18:46:10 -0400 Subject: [PATCH 06/10] Fix merge copy-pasta. --- gcloud/bigquery/job.py | 110 ++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 63 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 1866fe62d9bd..afa58e6d7093 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -23,69 +23,6 @@ from gcloud.bigquery.table import _parse_schema_resource -class _Enum(object): - """Psedo-enumeration class. - - Subclasses must define ``ALLOWED`` as a class-level constant: it must - be a sequence of strings. - """ - @classmethod - def validate(cls, value): - """Check that ``value`` is one of the allowed values. - - :raises: ValueError if value is not allowed. - """ - if value not in cls.ALLOWED: - raise ValueError('Pass one of: %s' ', '.join(cls.ALLOWED)) - - -class CreateDisposition(_Enum): - """Pseudo-enum for allowed values for ``create_disposition`` properties. - """ - CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' - CREATE_NEVER = 'CREATE_NEVER' - ALLOWED = (CREATE_IF_NEEDED, CREATE_NEVER) - - -class Encoding(_Enum): - """Pseudo-enum for allowed values for ``encoding`` properties.""" - UTF_8 = 'UTF-8' - ISO_8559_1 = 'ISO-8559-1' - ALLOWED = (UTF_8, ISO_8559_1) - - -class SourceFormat(_Enum): - """Pseudo-enum for allowed values for ``source_format`` properties.""" - CSV = 'CSV' - DATASTORE_BACKUP = 'DATASTORE_BACKUP' - NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' - ALLOWED = (CSV, DATASTORE_BACKUP, NEWLINE_DELIMITED_JSON) - - -class WriteDisposition(_Enum): - """Pseudo-enum for allowed values for ``write_disposition`` properties.""" - WRITE_APPEND = 'WRITE_APPEND' - WRITE_TRUNCATE = 'WRITE_TRUNCATE' - WRITE_EMPTY = 'WRITE_EMPTY' - ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) - - -class _LoadConfiguration(object): - """User-settable configuration options for load jobs.""" - # None -> use server default. - _allow_jagged_rows = None - _allow_quoted_newlines = None - _create_disposition = None - _encoding = None - _field_delimiter = None - _ignore_unknown_values = None - _max_bad_records = None - _quote_character = None - _skip_leading_rows = None - _source_format = None - _write_disposition = None - - class _BaseJob(object): """Base class for asynchronous jobs. @@ -334,6 +271,53 @@ def cancel(self, client=None): self._set_properties(api_response) +class _Enum(object): + """Psedo-enumeration class. + + Subclasses must define ``ALLOWED`` as a class-level constant: it must + be a sequence of strings. + """ + @classmethod + def validate(cls, value): + """Check that ``value`` is one of the allowed values. + + :raises: ValueError if value is not allowed. + """ + if value not in cls.ALLOWED: + raise ValueError('Pass one of: %s' ', '.join(cls.ALLOWED)) + + +class CreateDisposition(_Enum): + """Pseudo-enum for allowed values for ``create_disposition`` properties. + """ + CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' + CREATE_NEVER = 'CREATE_NEVER' + ALLOWED = (CREATE_IF_NEEDED, CREATE_NEVER) + + +class Encoding(_Enum): + """Pseudo-enum for allowed values for ``encoding`` properties.""" + UTF_8 = 'UTF-8' + ISO_8559_1 = 'ISO-8559-1' + ALLOWED = (UTF_8, ISO_8559_1) + + +class SourceFormat(_Enum): + """Pseudo-enum for allowed values for ``source_format`` properties.""" + CSV = 'CSV' + DATASTORE_BACKUP = 'DATASTORE_BACKUP' + NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' + ALLOWED = (CSV, DATASTORE_BACKUP, NEWLINE_DELIMITED_JSON) + + +class WriteDisposition(_Enum): + """Pseudo-enum for allowed values for ``write_disposition`` properties.""" + WRITE_APPEND = 'WRITE_APPEND' + WRITE_TRUNCATE = 'WRITE_TRUNCATE' + WRITE_EMPTY = 'WRITE_EMPTY' + ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) + + class _LoadConfiguration(object): """User-settable configuration options for load jobs.""" # None -> use server default. From 37ae373fdd62b0d6129f00f024aaefd141ef9d46 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 17 Aug 2015 19:35:48 -0400 Subject: [PATCH 07/10] Use tuple for string formatting. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1064#discussion_r37249070 --- gcloud/bigquery/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index afa58e6d7093..c3622b5484a5 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -267,7 +267,7 @@ def cancel(self, client=None): client = self._require_client(client) api_response = client.connection.api_request( - method='POST', path='%s/cancel' % self.path) + method='POST', path='%s/cancel' % (self.path,)) self._set_properties(api_response) From 7cdcab2380e2cbd8a98f1bd6fc7c6a7fd18700dc Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 17 Aug 2015 19:37:05 -0400 Subject: [PATCH 08/10] Update 'create_disposition' docstring for clarity. --- gcloud/bigquery/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index c3622b5484a5..d5c4c65073a0 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -483,7 +483,7 @@ def allow_quoted_newlines(self): @property def create_disposition(self): - """Handling for missing destination table. + """Define how the back-end handles a missing destination table. :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). From c05c175aa2f5e2f364d5ac8864a4462c9f27d5cd Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 17 Aug 2015 19:39:40 -0400 Subject: [PATCH 09/10] Docstring for 'CopyJob._populate_config_resource'. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1064/files#r37249582 --- gcloud/bigquery/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index d5c4c65073a0..bff56fb98114 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -849,7 +849,7 @@ def write_disposition(self): del self._configuration._write_disposition def _populate_config_resource(self, configuration): - + """Helper for _build_resource: copy config properties to resource""" if self.create_disposition is not None: configuration['createDisposition'] = self.create_disposition if self.write_disposition is not None: From 4c4c3849999f560480e60be34d6e91d94eac94a0 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 17 Aug 2015 21:50:25 -0400 Subject: [PATCH 10/10] Link spec URLs for job configuration properties. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1064#discussion_r37250956 --- gcloud/bigquery/job.py | 69 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index bff56fb98114..b6abe62fc713 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -289,6 +289,10 @@ def validate(cls, value): class CreateDisposition(_Enum): """Pseudo-enum for allowed values for ``create_disposition`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.createDisposition + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.createDisposition """ CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' CREATE_NEVER = 'CREATE_NEVER' @@ -296,14 +300,22 @@ class CreateDisposition(_Enum): class Encoding(_Enum): - """Pseudo-enum for allowed values for ``encoding`` properties.""" + """Pseudo-enum for allowed values for ``encoding`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.encoding + """ UTF_8 = 'UTF-8' ISO_8559_1 = 'ISO-8559-1' ALLOWED = (UTF_8, ISO_8559_1) class SourceFormat(_Enum): - """Pseudo-enum for allowed values for ``source_format`` properties.""" + """Pseudo-enum for allowed values for ``source_format`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.sourceFormat + """ CSV = 'CSV' DATASTORE_BACKUP = 'DATASTORE_BACKUP' NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' @@ -311,7 +323,12 @@ class SourceFormat(_Enum): class WriteDisposition(_Enum): - """Pseudo-enum for allowed values for ``write_disposition`` properties.""" + """Pseudo-enum for allowed values for ``write_disposition`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.writeDisposition + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.writeDisposition + """ WRITE_APPEND = 'WRITE_APPEND' WRITE_TRUNCATE = 'WRITE_TRUNCATE' WRITE_EMPTY = 'WRITE_EMPTY' @@ -431,6 +448,9 @@ def output_rows(self): def allow_jagged_rows(self): """Allow rows with missing trailing commas for optional fields. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.allowJaggedRows + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -458,6 +478,9 @@ def allow_jagged_rows(self): def allow_quoted_newlines(self): """Allow rows with quoted newlines. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.allowQuotedNewlines + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -485,6 +508,9 @@ def allow_quoted_newlines(self): def create_disposition(self): """Define how the back-end handles a missing destination table. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.createDisposition + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -509,6 +535,9 @@ def create_disposition(self): def encoding(self): """Encoding for source data. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.encoding + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -533,6 +562,9 @@ def encoding(self): def field_delimiter(self): """Allow rows with missing trailing commas for optional fields. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.fieldDelimiter + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -560,6 +592,9 @@ def field_delimiter(self): def ignore_unknown_values(self): """Ignore rows with extra columns beyond those specified by the schema. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.ignoreUnknownValues + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -587,6 +622,9 @@ def ignore_unknown_values(self): def max_bad_records(self): """Max number of bad records to be ignored. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.maxBadRecords + :rtype: integer, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -614,6 +652,9 @@ def max_bad_records(self): def quote_character(self): """Character used to quote values. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.quote + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -641,6 +682,9 @@ def quote_character(self): def skip_leading_rows(self): """Count of leading rows to be skipped. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.skipLeadingRows + :rtype: integer, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -668,6 +712,9 @@ def skip_leading_rows(self): def source_format(self): """Format of source data files. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.sourceFormat + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -692,6 +739,9 @@ def source_format(self): def write_disposition(self): """Allow rows with missing trailing commas for optional fields. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.writeDisposition + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -803,6 +853,9 @@ def __init__(self, name, destination, sources, client): def create_disposition(self): """Handling for missing destination table. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.createDisposition + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -812,9 +865,8 @@ def create_disposition(self): def create_disposition(self, value): """Update create_disposition. - :type value: boolean - :param value: new create_disposition: one of "CREATE_IF_NEEDED" or - "CREATE_NEVER" + :type value: string + :param value: allowed values for :class:`CreateDisposition` """ CreateDisposition.validate(value) # raises ValueError if invalid self._configuration._create_disposition = value @@ -828,7 +880,10 @@ def create_disposition(self): def write_disposition(self): """Allow rows with missing trailing commas for optional fields. - :rtype: boolean, or ``NoneType`` + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.writeDisposition + + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ return self._configuration._write_disposition