From 062d5bb4613636e3b071326e1b0897d63e5d2ecb Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 13:21:29 -0400 Subject: [PATCH 01/16] Match order of CUT methods. --- gcloud/bigquery/test_client.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index 554314fa9515..c3e50778e879 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -34,18 +34,6 @@ def test_ctor(self): self.assertTrue(client.connection.credentials is creds) self.assertTrue(client.connection.http is http) - def test_dataset(self): - from gcloud.bigquery.dataset import Dataset - PROJECT = 'PROJECT' - DATASET = 'dataset_name' - creds = _Credentials() - http = object() - client = self._makeOne(project=PROJECT, credentials=creds, http=http) - dataset = client.dataset(DATASET) - self.assertTrue(isinstance(dataset, Dataset)) - self.assertEqual(dataset.name, DATASET) - self.assertTrue(dataset._client is client) - def test_list_datasets_defaults(self): from gcloud.bigquery.dataset import Dataset PROJECT = 'PROJECT' @@ -128,6 +116,18 @@ def test_list_datasets_explicit(self): self.assertEqual(req['query_params'], {'all': True, 'maxResults': 3, 'pageToken': TOKEN}) + def test_dataset(self): + from gcloud.bigquery.dataset import Dataset + PROJECT = 'PROJECT' + DATASET = 'dataset_name' + creds = _Credentials() + http = object() + client = self._makeOne(project=PROJECT, credentials=creds, http=http) + dataset = client.dataset(DATASET) + self.assertTrue(isinstance(dataset, Dataset)) + self.assertEqual(dataset.name, DATASET) + self.assertTrue(dataset._client is client) + class _Credentials(object): From 14fbb6c27467ddd4b06c53d77fe0771020fa5de9 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 15:58:51 -0400 Subject: [PATCH 02/16] Add 'bigquery.job.LoadFromStorageJob. Models job request to load a BQ table from file(s) in CloudStorage. --- gcloud/bigquery/job.py | 554 ++++++++++++++++++++++++++++++++++++ gcloud/bigquery/test_job.py | 356 +++++++++++++++++++++++ 2 files changed, 910 insertions(+) create mode 100644 gcloud/bigquery/job.py create mode 100644 gcloud/bigquery/test_job.py diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py new file mode 100644 index 000000000000..a5feb9f90f06 --- /dev/null +++ b/gcloud/bigquery/job.py @@ -0,0 +1,554 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define API Jobs.""" + +import six + +from gcloud.bigquery._helpers import _datetime_from_prop +from gcloud.bigquery.table import SchemaField + + +class _LoadConfiguration(object): + """User-settable configuration options for load jobs.""" + # None -> use server default. + _allow_jagged_rows = None + _allow_quoted_newlines = None + _create_disposition = None + _encoding = None + _field_delimiter = None + _ignore_unknown_values = None + _max_bad_records = None + _quote_character = None + _skip_leading_rows = None + _source_format = None + _write_disposition = None + + +class LoadFromStorageJob(object): + """Asynchronous job for loading data into a BQ table from CloudStorage. + + :type name: string + :param name: the name of the job + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be loaded. + + :type source_uris: sequence of string + :param source_uris: URIs of data files to be loaded. + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + + :type schema: list of :class:`gcloud.bigquery.table.SchemaField` + :param schema: The job's schema + """ + def __init__(self, name, destination, source_uris, client, schema=()): + self.name = name + self.destination = destination + self._client = client + self.source_uris = source_uris + self.schema = schema + self._properties = {} + self._configuration = _LoadConfiguration() + + @property + def project(self): + """Project bound to the job. + + :rtype: string + :returns: the project (derived from the client). + """ + return self._client.project + + @property + def path(self): + """URL path for the job's APIs. + + :rtype: string + :returns: the path based on project and job name. + """ + return '/projects/%s/jobs/%s' % (self.project, self.name) + + @property + def schema(self): + """Table's schema. + + :rtype: list of :class:`SchemaField` + :returns: fields describing the schema + """ + return list(self._schema) + + @schema.setter + def schema(self, value): + """Update table's schema + + :type value: list of :class:`SchemaField` + :param value: fields describing the schema + + :raises: TypeError if 'value' is not a sequence, or ValueError if + any item in the sequence is not a SchemaField + """ + if not all(isinstance(field, SchemaField) for field in value): + raise ValueError('Schema items must be fields') + self._schema = tuple(value) + + @property + def etag(self): + """ETag for the job resource. + + :rtype: string, or ``NoneType`` + :returns: the ETag (None until set from the server). + """ + return self._properties.get('etag') + + @property + def job_id(self): + """ID for the job resource. + + :rtype: string, or ``NoneType`` + :returns: the ID (None until set from the server). + """ + return self._properties.get('id') + + @property + def self_link(self): + """URL for the job resource. + + :rtype: string, or ``NoneType`` + :returns: the URL (None until set from the server). + """ + return self._properties.get('selfLink') + + @property + def user_email(self): + """E-mail address of user who submitted the job. + + :rtype: string, or ``NoneType`` + :returns: the URL (None until set from the server). + """ + return self._properties.get('user_email') + + @property + def created(self): + """Datetime at which the job was created. + + :rtype: ``datetime.datetime``, or ``NoneType`` + :returns: the creation time (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + return _datetime_from_prop(statistics.get('creationTime')) + + @property + def started(self): + """Datetime at which the job was started. + + :rtype: ``datetime.datetime``, or ``NoneType`` + :returns: the start time (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + return _datetime_from_prop(statistics.get('startTime')) + + @property + def ended(self): + """Datetime at which the job finished. + + :rtype: ``datetime.datetime``, or ``NoneType`` + :returns: the end time (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + return _datetime_from_prop(statistics.get('endTime')) + + @property + def input_file_bytes(self): + """Count of bytes loaded from source files. + + :rtype: integer, or ``NoneType`` + :returns: the count (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + return int(statistics['load']['inputFileBytes']) + + @property + def input_files(self): + """Count of source files. + + :rtype: integer, or ``NoneType`` + :returns: the count (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + return int(statistics['load']['inputFiles']) + + @property + def output_bytes(self): + """Count of bytes saved to destination table. + + :rtype: integer, or ``NoneType`` + :returns: the count (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + return int(statistics['load']['outputBytes']) + + @property + def output_rows(self): + """Count of rows saved to destination table. + + :rtype: integer, or ``NoneType`` + :returns: the count (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + return int(statistics['load']['outputRows']) + + @property + def error_result(self): + """Error information about the job as a whole. + + :rtype: mapping, or ``NoneType`` + :returns: the error information (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status['errorResult'] + + @property + def errors(self): + """Information about individual errors generated by the job. + + :rtype: list of mappings, or ``NoneType`` + :returns: the error information (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status['errors'] + + @property + def state(self): + """Status of the job. + + :rtype: string, or ``NoneType`` + :returns: the state (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status['state'] + + @property + def allow_jagged_rows(self): + """Allow rows with missing trailing commas for optional fields. + + :rtype: boolean, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._allow_jagged_rows + + @allow_jagged_rows.setter + def allow_jagged_rows(self, value): + """Update allow_jagged_rows. + + :type value: boolean + :param value: new allow_jagged_rows + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, bool): + raise ValueError("Pass a boolean") + self._configuration._allow_jagged_rows = value + + @allow_jagged_rows.deleter + def allow_jagged_rows(self): + """Delete allow_jagged_rows.""" + del self._configuration._allow_jagged_rows + + @property + def allow_quoted_newlines(self): + """Allow rows with quoted newlines. + + :rtype: boolean, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._allow_quoted_newlines + + @allow_quoted_newlines.setter + def allow_quoted_newlines(self, value): + """Update allow_quoted_newlines. + + :type value: boolean + :param value: new allow_quoted_newlines + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, bool): + raise ValueError("Pass a boolean") + self._configuration._allow_quoted_newlines = value + + @allow_quoted_newlines.deleter + def allow_quoted_newlines(self): + """Delete allow_quoted_newlines.""" + del self._configuration._allow_quoted_newlines + + @property + def create_disposition(self): + """Handling for missing destination table. + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._create_disposition + + @create_disposition.setter + def create_disposition(self, value): + """Update create_disposition. + + :type value: boolean + :param value: new create_disposition: one of "CREATE_IF_NEEDED" or + "CREATE_NEVER" + + :raises: ValueError for invalid value. + """ + if value not in ('CREATE_IF_NEEDED', 'CREATE_NEVER'): + raise ValueError("Pass 'CREATE_IF_NEEDED' or 'CREATE_NEVER'") + self._configuration._create_disposition = value + + @create_disposition.deleter + def create_disposition(self): + """Delete create_disposition.""" + del self._configuration._create_disposition + + @property + def encoding(self): + """Encoding for source data. + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._encoding + + @encoding.setter + def encoding(self, value): + """Update encoding. + + :type value: string + :param value: new encoding: one of 'UTF-8' or 'ISO-8859-1'. + + :raises: ValueError for invalid value. + """ + if value not in ('UTF-8', 'ISO-8559-1'): + raise ValueError("Pass 'UTF-8' or 'ISO-8559-1'") + self._configuration._encoding = value + + @encoding.deleter + def encoding(self): + """Delete encoding.""" + del self._configuration._encoding + + @property + def field_delimiter(self): + """Allow rows with missing trailing commas for optional fields. + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._field_delimiter + + @field_delimiter.setter + def field_delimiter(self, value): + """Update field_delimiter. + + :type value: string + :param value: new field delimiter + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, six.string_types): + raise ValueError("Pass a string") + self._configuration._field_delimiter = value + + @field_delimiter.deleter + def field_delimiter(self): + """Delete field_delimiter.""" + del self._configuration._field_delimiter + + @property + def ignore_unknown_values(self): + """Ignore rows with extra columns beyond those specified by the schema. + + :rtype: boolean, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._ignore_unknown_values + + @ignore_unknown_values.setter + def ignore_unknown_values(self, value): + """Update ignore_unknown_values. + + :type value: boolean + :param value: new ignore_unknown_values + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, bool): + raise ValueError("Pass a boolean") + self._configuration._ignore_unknown_values = value + + @ignore_unknown_values.deleter + def ignore_unknown_values(self): + """Delete ignore_unknown_values.""" + del self._configuration._ignore_unknown_values + + @property + def max_bad_records(self): + """Max number of bad records to be ignored. + + :rtype: integer, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._max_bad_records + + @max_bad_records.setter + def max_bad_records(self, value): + """Update max_bad_records. + + :type value: integer + :param value: new max_bad_records + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, six.integer_types): + raise ValueError("Pass an integer") + self._configuration._max_bad_records = value + + @max_bad_records.deleter + def max_bad_records(self): + """Delete max_bad_records.""" + del self._configuration._max_bad_records + + @property + def quote_character(self): + """Character used to quote values. + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._quote_character + + @quote_character.setter + def quote_character(self, value): + """Update quote_character. + + :type value: string + :param value: new quote_character + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, six.string_types): + raise ValueError("Pass a string") + self._configuration._quote_character = value + + @quote_character.deleter + def quote_character(self): + """Delete quote_character.""" + del self._configuration._quote_character + + @property + def skip_leading_rows(self): + """Count of leading rows to be skipped. + + :rtype: integer, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._skip_leading_rows + + @skip_leading_rows.setter + def skip_leading_rows(self, value): + """Update skip_leading_rows. + + :type value: integer + :param value: new skip_leading_rows + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, six.integer_types): + raise ValueError("Pass a boolean") + self._configuration._skip_leading_rows = value + + @skip_leading_rows.deleter + def skip_leading_rows(self): + """Delete skip_leading_rows.""" + del self._configuration._skip_leading_rows + + @property + def source_format(self): + """Format of source data files. + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._source_format + + @source_format.setter + def source_format(self, value): + """Update source_format. + + :type value: string + :param value: new source_format: one of "CSV", "DATASTORE_BACKUP", + or "NEWLINE_DELIMITED_JSON" + + :raises: ValueError for invalid values. + """ + if value not in ('CSV', 'DATASTORE_BACKUP', 'NEWLINE_DELIMITED_JSON'): + raise ValueError( + "Pass 'CSV', 'DATASTORE_BACKUP' or 'NEWLINE_DELIMITED_JSON'") + self._configuration._source_format = value + + @source_format.deleter + def source_format(self): + """Delete source_format.""" + del self._configuration._source_format + + @property + def write_disposition(self): + """Allow rows with missing trailing commas for optional fields. + + :rtype: boolean, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._write_disposition + + @write_disposition.setter + def write_disposition(self, value): + """Update write_disposition. + + :type value: string + :param value: new write_disposition: one of "WRITE_APPEND", + "WRITE_TRUNCATE", or "WRITE_EMPTY" + + :raises: ValueError for invalid value types. + """ + if value not in ('WRITE_APPEND', 'WRITE_TRUNCATE', 'WRITE_EMPTY'): + raise ValueError( + "Pass 'WRITE_APPEND', 'WRITE_TRUNCATE' or 'WRITE_EMPTY'") + self._configuration._write_disposition = value + + @write_disposition.deleter + def write_disposition(self): + """Delete write_disposition.""" + del self._configuration._write_disposition diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py new file mode 100644 index 000000000000..668eb9e772a8 --- /dev/null +++ b/gcloud/bigquery/test_job.py @@ -0,0 +1,356 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestLoadFromStorageJob(unittest2.TestCase): + PROJECT = 'project' + SOURCE1 = 'http://example.com/source1.csv' + JOB_NAME = 'job_name' + + def _getTargetClass(self): + from gcloud.bigquery.job import LoadFromStorageJob + return LoadFromStorageJob + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + self.assertTrue(job.destination is table) + self.assertEqual(list(job.source_uris), [self.SOURCE1]) + self.assertTrue(job._client is client) + self.assertEqual( + job.path, + '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) + self.assertEqual(job.schema, []) + self.assertTrue(job.allow_jagged_rows is None) + self.assertTrue(job.allow_quoted_newlines is None) + self.assertTrue(job.create_disposition is None) + self.assertTrue(job.encoding is None) + self.assertTrue(job.field_delimiter is None) + self.assertTrue(job.ignore_unknown_values is None) + self.assertTrue(job.max_bad_records is None) + self.assertTrue(job.quote_character is None) + self.assertTrue(job.skip_leading_rows is None) + self.assertTrue(job.source_format is None) + self.assertTrue(job.write_disposition is None) + + # root elements of resource + self.assertEqual(job.etag, None) + self.assertEqual(job.job_id, None) + self.assertEqual(job.self_link, None) + self.assertEqual(job.user_email, None) + + # derived from resource['statistics'] + self.assertEqual(job.created, None) + self.assertEqual(job.started, None) + self.assertEqual(job.ended, None) + + # derived from resource['statistics']['load'] + self.assertEqual(job.input_file_bytes, None) + self.assertEqual(job.input_files, None) + self.assertEqual(job.output_bytes, None) + self.assertEqual(job.output_rows, None) + + # derived from resource['status'] + self.assertEqual(job.error_result, None) + self.assertEqual(job.errors, None) + self.assertEqual(job.state, None) + + def test_ctor_w_schema(self): + from gcloud.bigquery.table import SchemaField + client = _Client(self.PROJECT) + table = _Table() + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client, + schema=[full_name, age]) + self.assertEqual(job.schema, [full_name, age]) + + def test_schema_setter_non_list(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(TypeError): + job.schema = object() + + def test_schema_setter_invalid_field(self): + from gcloud.bigquery.table import SchemaField + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + with self.assertRaises(ValueError): + job.schema = [full_name, object()] + + def test_schema_setter(self): + from gcloud.bigquery.table import SchemaField + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + job.schema = [full_name, age] + self.assertEqual(job.schema, [full_name, age]) + + def test_props_set_by_server(self): + import datetime + from gcloud._helpers import UTC + from gcloud.bigquery._helpers import _millis + + CREATED = datetime.datetime(2015, 8, 11, 12, 13, 22, tzinfo=UTC) + STARTED = datetime.datetime(2015, 8, 11, 13, 47, 15, tzinfo=UTC) + ENDED = datetime.datetime(2015, 8, 11, 14, 47, 15, tzinfo=UTC) + JOB_ID = '%s:%s' % (self.PROJECT, self.JOB_NAME) + URL = 'http://example.com/projects/%s/jobs/%s' % ( + self.PROJECT, self.JOB_NAME) + EMAIL = 'phred@example.com' + ERROR_RESULT = {'debugInfo': 'DEBUG', + 'location': 'LOCATION', + 'message': 'MESSAGE', + 'reason': 'REASON'} + + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job._properties['etag'] = 'ETAG' + job._properties['id'] = JOB_ID + job._properties['selfLink'] = URL + job._properties['user_email'] = EMAIL + + statistics = job._properties['statistics'] = {} + statistics['creationTime'] = _millis(CREATED) + statistics['startTime'] = _millis(STARTED) + statistics['endTime'] = _millis(ENDED) + load_stats = statistics['load'] = {} + load_stats['inputFileBytes'] = 12345 + load_stats['inputFiles'] = 1 + load_stats['outputBytes'] = 23456 + load_stats['outputRows'] = 345 + + status = job._properties['status'] = {} + status['errorResult'] = ERROR_RESULT + status['errors'] = [ERROR_RESULT] + status['state'] = 'STATE' + + self.assertEqual(job.etag, 'ETAG') + self.assertEqual(job.job_id, JOB_ID) + self.assertEqual(job.self_link, URL) + self.assertEqual(job.user_email, EMAIL) + + self.assertEqual(job.created, CREATED) + self.assertEqual(job.started, STARTED) + self.assertEqual(job.ended, ENDED) + + self.assertEqual(job.input_file_bytes, 12345) + self.assertEqual(job.input_files, 1) + self.assertEqual(job.output_bytes, 23456) + self.assertEqual(job.output_rows, 345) + + self.assertEqual(job.error_result, ERROR_RESULT) + self.assertEqual(job.errors, [ERROR_RESULT]) + self.assertEqual(job.state, 'STATE') + + def test_allow_jagged_rows_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.allow_jagged_rows = object() + + def test_allow_jagged_rows_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.allow_jagged_rows = True + self.assertTrue(job.allow_jagged_rows) + del job.allow_jagged_rows + self.assertTrue(job.allow_jagged_rows is None) + + def test_allow_quoted_newlines_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.allow_quoted_newlines = object() + + def test_allow_quoted_newlines_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.allow_quoted_newlines = True + self.assertTrue(job.allow_quoted_newlines) + del job.allow_quoted_newlines + self.assertTrue(job.allow_quoted_newlines is None) + + def test_create_disposition_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.create_disposition = 'BOGUS' + + def test_create_disposition_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.create_disposition = 'CREATE_IF_NEEDED' + self.assertEqual(job.create_disposition, 'CREATE_IF_NEEDED') + del job.create_disposition + self.assertTrue(job.create_disposition is None) + + def test_encoding_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.encoding = 'BOGUS' + + def test_encoding_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.encoding = 'ISO-8559-1' + self.assertEqual(job.encoding, 'ISO-8559-1') + del job.encoding + self.assertTrue(job.encoding is None) + + def test_field_delimiter_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.field_delimiter = object() + + def test_field_delimiter_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.field_delimiter = '|' + self.assertEqual(job.field_delimiter, '|') + del job.field_delimiter + self.assertTrue(job.field_delimiter is None) + + def test_ignore_unknown_values_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.ignore_unknown_values = object() + + def test_ignore_unknown_values_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.ignore_unknown_values = True + self.assertTrue(job.ignore_unknown_values) + del job.ignore_unknown_values + self.assertTrue(job.ignore_unknown_values is None) + + def test_max_bad_records_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.max_bad_records = object() + + def test_max_bad_records_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.max_bad_records = 100 + self.assertEqual(job.max_bad_records, 100) + del job.max_bad_records + self.assertTrue(job.max_bad_records is None) + + def test_quote_character_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.quote_character = object() + + def test_quote_character_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.quote_character = "'" + self.assertEqual(job.quote_character, "'") + del job.quote_character + self.assertTrue(job.quote_character is None) + + def test_skip_leading_rows_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.skip_leading_rows = object() + + def test_skip_leading_rows_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.skip_leading_rows = 2 + self.assertEqual(job.skip_leading_rows, 2) + del job.skip_leading_rows + self.assertTrue(job.skip_leading_rows is None) + + def test_source_format_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.source_format = 'BOGUS' + + def test_source_format_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.source_format = 'NEWLINE_DELIMITED_JSON' + self.assertEqual(job.source_format, 'NEWLINE_DELIMITED_JSON') + del job.source_format + self.assertTrue(job.source_format is None) + + def test_write_disposition_setter_bad_value(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + with self.assertRaises(ValueError): + job.write_disposition = 'BOGUS' + + def test_write_disposition_setter_deleter(self): + client = _Client(self.PROJECT) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + job.write_disposition = 'WRITE_TRUNCATE' + self.assertEqual(job.write_disposition, 'WRITE_TRUNCATE') + del job.write_disposition + self.assertTrue(job.write_disposition is None) + + +class _Client(object): + + def __init__(self, project='project', connection=None): + self.project = project + self.connection = connection + + +class _Table(object): + + def __init__(self): + pass From 7b31499eb650664d3f633051481e401f4c52c517 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 16:20:26 -0400 Subject: [PATCH 03/16] Add 'Client.load_from_storage' factory. Builds an instance of 'job.LoadFromStorageJob'. --- gcloud/bigquery/client.py | 18 ++++++++++++++++++ gcloud/bigquery/test_client.py | 19 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 891e773a1f0b..0e08e39e563b 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -18,6 +18,7 @@ from gcloud.client import JSONClient from gcloud.bigquery.connection import Connection from gcloud.bigquery.dataset import Dataset +from gcloud.bigquery.job import LoadFromStorageJob class Client(JSONClient): @@ -96,3 +97,20 @@ def dataset(self, name): :returns: a new ``Dataset`` instance """ return Dataset(name, client=self) + + def load_from_storage(self, name, destination, *source_uris): + """Construct a job for loading data into a table from CloudStorage. + + :type name: string + :param name: Name of the job. + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be loaded. + + :type source_uris: sequence of string + :param source_uris: URIs of data files to be loaded. + + :rtype: :class:`gcloud.bigquery.job.LoadFromStorageJob` + :returns: a new ``LoadFromStorageJob`` instance + """ + return LoadFromStorageJob(name, destination, source_uris, client=self) diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index c3e50778e879..7dbba0c103a8 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -128,6 +128,25 @@ def test_dataset(self): self.assertEqual(dataset.name, DATASET) self.assertTrue(dataset._client is client) + def test_load_from_storage(self): + from gcloud.bigquery.job import LoadFromStorageJob + PROJECT = 'PROJECT' + JOB = 'job_name' + DATASET = 'dataset_name' + DESTINATION = 'destination_table' + SOURCE_URI = 'http://example.com/source.csv' + creds = _Credentials() + http = object() + client = self._makeOne(project=PROJECT, credentials=creds, http=http) + dataset = client.dataset(DATASET) + destination = dataset.table(DESTINATION) + job = client.load_from_storage(JOB, destination, SOURCE_URI) + self.assertTrue(isinstance(job, LoadFromStorageJob)) + self.assertTrue(job._client is client) + self.assertEqual(job.name, JOB) + self.assertEqual(list(job.source_uris), [SOURCE_URI]) + self.assertTrue(job.destination is destination) + class _Credentials(object): From 0c3232c4bf547467ed9b47e140e7bc91b8a4ab5b Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 17:05:28 -0400 Subject: [PATCH 04/16] Factor out schema resource helpers for re-use. - 'table.Table._parse_schema_resource' -> 'table._parse_schema_resource' - 'table.Table._build_schema_resource' -> 'table._build_schema_resource' --- gcloud/bigquery/table.py | 92 +++++++------- gcloud/bigquery/test_table.py | 223 ++++++++++++++++++---------------- 2 files changed, 164 insertions(+), 151 deletions(-) diff --git a/gcloud/bigquery/table.py b/gcloud/bigquery/table.py index 6f1c09c2ad99..5f5134446e83 100644 --- a/gcloud/bigquery/table.py +++ b/gcloud/bigquery/table.py @@ -343,30 +343,6 @@ def _require_client(self, client): client = self._dataset._client return client - def _parse_schema_resource(self, info): - """Parse a resource fragment into a schema field. - - :type info: mapping - :param info: should contain a "fields" key to be parsed - - :rtype: list of :class:`SchemaField`, or ``NoneType`` - :returns: a list of parsed fields, or ``None`` if no "fields" key is - present in ``info``. - """ - if 'fields' not in info: - return None - - schema = [] - for r_field in info['fields']: - name = r_field['name'] - field_type = r_field['type'] - mode = r_field['mode'] - description = r_field.get('description') - sub_fields = self._parse_schema_resource(r_field) - schema.append( - SchemaField(name, field_type, mode, description, sub_fields)) - return schema - def _set_properties(self, api_response): """Update properties from resource in body of ``api_response`` @@ -376,7 +352,7 @@ def _set_properties(self, api_response): self._properties.clear() cleaned = api_response.copy() schema = cleaned.pop('schema', {'fields': ()}) - self.schema = self._parse_schema_resource(schema) + self.schema = _parse_schema_resource(schema) if 'creationTime' in cleaned: cleaned['creationTime'] = float(cleaned['creationTime']) if 'lastModifiedTime' in cleaned: @@ -385,22 +361,6 @@ def _set_properties(self, api_response): cleaned['expirationTime'] = float(cleaned['expirationTime']) self._properties.update(cleaned) - def _build_schema_resource(self, fields=None): - """Generate a resource fragment for table's schema.""" - if fields is None: - fields = self._schema - infos = [] - for field in fields: - info = {'name': field.name, - 'type': field.field_type, - 'mode': field.mode} - if field.description is not None: - info['description'] = field.description - if field.fields is not None: - info['fields'] = self._build_schema_resource(field.fields) - infos.append(info) - return infos - def _build_resource(self): """Generate a resource for ``create`` or ``update``.""" resource = { @@ -408,7 +368,7 @@ def _build_resource(self): 'projectId': self._dataset.project, 'datasetId': self._dataset.name, 'tableId': self.name}, - 'schema': {'fields': self._build_schema_resource()}, + 'schema': {'fields': _build_schema_resource(self._schema)}, } if self.description is not None: resource['description'] = self.description @@ -549,7 +509,7 @@ def patch(self, partial['schema'] = None else: partial['schema'] = { - 'fields': self._build_schema_resource(schema)} + 'fields': _build_schema_resource(schema)} api_response = client.connection.api_request( method='PATCH', path=self.path, data=partial) @@ -714,6 +674,52 @@ def insert_data(self, return errors +def _parse_schema_resource(info): + """Parse a resource fragment into a schema field. + + :type info: mapping + :param info: should contain a "fields" key to be parsed + + :rtype: list of :class:`SchemaField`, or ``NoneType`` + :returns: a list of parsed fields, or ``None`` if no "fields" key is + present in ``info``. + """ + if 'fields' not in info: + return None + + schema = [] + for r_field in info['fields']: + name = r_field['name'] + field_type = r_field['type'] + mode = r_field['mode'] + description = r_field.get('description') + sub_fields = _parse_schema_resource(r_field) + schema.append( + SchemaField(name, field_type, mode, description, sub_fields)) + return schema + + +def _build_schema_resource(fields): + """Generate a resource fragment for a schema. + + :type fields: sequence of :class:`SchemaField` + :param fields: schema to be dumped + + :rtype: mapping + :returns; a mapping describing the schema of the supplied fields. + """ + infos = [] + for field in fields: + info = {'name': field.name, + 'type': field.field_type, + 'mode': field.mode} + if field.description is not None: + info['description'] = field.description + if field.fields is not None: + info['fields'] = _build_schema_resource(field.fields) + infos.append(info) + return infos + def _not_null(value, field): return value is not None or field.mode != 'NULLABLE' diff --git a/gcloud/bigquery/test_table.py b/gcloud/bigquery/test_table.py index 530f55a9ead0..0b5799002d0d 100644 --- a/gcloud/bigquery/test_table.py +++ b/gcloud/bigquery/test_table.py @@ -62,7 +62,22 @@ def test_ctor_subfields(self): self.assertEqual(field.fields[1].fields, None) -class TestTable(unittest2.TestCase): +class _SchemaBase(object): + + def _verify_field(self, field, r_field): + self.assertEqual(field.name, r_field['name']) + self.assertEqual(field.field_type, r_field['type']) + self.assertEqual(field.mode, r_field['mode']) + + def _verifySchema(self, schema, resource): + r_fields = resource['schema']['fields'] + self.assertEqual(len(schema), len(r_fields)) + + for field, r_field in zip(schema, r_fields): + self._verify_field(field, r_field) + + +class TestTable(unittest2.TestCase, _SchemaBase): PROJECT = 'project' DS_NAME = 'dataset-name' TABLE_NAME = 'table-name' @@ -109,18 +124,6 @@ def _makeResource(self): 'type': 'TABLE', } - def _verify_field(self, field, r_field): - self.assertEqual(field.name, r_field['name']) - self.assertEqual(field.field_type, r_field['type']) - self.assertEqual(field.mode, r_field['mode']) - - def _verifySchema(self, schema, resource): - r_fields = resource['schema']['fields'] - self.assertEqual(len(schema), len(r_fields)) - - for field, r_field in zip(schema, r_fields): - self._verify_field(field, r_field) - def _verifyReadonlyResourceProperties(self, table, resource): if 'creationTime' in resource: self.assertEqual(table.created, self.WHEN) @@ -388,101 +391,6 @@ def test_from_api_repr_w_properties(self): self.assertTrue(table._dataset._client is client) self._verifyResourceProperties(table, RESOURCE) - def test__parse_schema_resource_defaults(self): - client = _Client(self.PROJECT) - dataset = _Dataset(client) - table = self._makeOne(self.TABLE_NAME, dataset) - RESOURCE = self._makeResource() - schema = table._parse_schema_resource(RESOURCE['schema']) - self._verifySchema(schema, RESOURCE) - - def test__parse_schema_resource_subfields(self): - client = _Client(self.PROJECT) - dataset = _Dataset(client) - table = self._makeOne(self.TABLE_NAME, dataset) - RESOURCE = self._makeResource() - RESOURCE['schema']['fields'].append( - {'name': 'phone', - 'type': 'RECORD', - 'mode': 'REPEATABLE', - 'fields': [{'name': 'type', - 'type': 'STRING', - 'mode': 'REQUIRED'}, - {'name': 'number', - 'type': 'STRING', - 'mode': 'REQUIRED'}]}) - schema = table._parse_schema_resource(RESOURCE['schema']) - self._verifySchema(schema, RESOURCE) - - def test__build_schema_resource_defaults(self): - from gcloud.bigquery.table import SchemaField - client = _Client(self.PROJECT) - dataset = _Dataset(client) - full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') - age = SchemaField('age', 'INTEGER', mode='REQUIRED') - table = self._makeOne(self.TABLE_NAME, dataset, - schema=[full_name, age]) - resource = table._build_schema_resource() - self.assertEqual(len(resource), 2) - self.assertEqual(resource[0], - {'name': 'full_name', - 'type': 'STRING', - 'mode': 'REQUIRED'}) - self.assertEqual(resource[1], - {'name': 'age', - 'type': 'INTEGER', - 'mode': 'REQUIRED'}) - - def test__build_schema_resource_w_description(self): - from gcloud.bigquery.table import SchemaField - client = _Client(self.PROJECT) - dataset = _Dataset(client) - DESCRIPTION = 'DESCRIPTION' - full_name = SchemaField('full_name', 'STRING', mode='REQUIRED', - description=DESCRIPTION) - age = SchemaField('age', 'INTEGER', mode='REQUIRED') - table = self._makeOne(self.TABLE_NAME, dataset, - schema=[full_name, age]) - resource = table._build_schema_resource() - self.assertEqual(len(resource), 2) - self.assertEqual(resource[0], - {'name': 'full_name', - 'type': 'STRING', - 'mode': 'REQUIRED', - 'description': DESCRIPTION}) - self.assertEqual(resource[1], - {'name': 'age', - 'type': 'INTEGER', - 'mode': 'REQUIRED'}) - - def test__build_schema_resource_w_subfields(self): - from gcloud.bigquery.table import SchemaField - client = _Client(self.PROJECT) - dataset = _Dataset(client) - full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') - ph_type = SchemaField('type', 'STRING', 'REQUIRED') - ph_num = SchemaField('number', 'STRING', 'REQUIRED') - phone = SchemaField('phone', 'RECORD', mode='REPEATABLE', - fields=[ph_type, ph_num]) - table = self._makeOne(self.TABLE_NAME, dataset, - schema=[full_name, phone]) - resource = table._build_schema_resource() - self.assertEqual(len(resource), 2) - self.assertEqual(resource[0], - {'name': 'full_name', - 'type': 'STRING', - 'mode': 'REQUIRED'}) - self.assertEqual(resource[1], - {'name': 'phone', - 'type': 'RECORD', - 'mode': 'REPEATABLE', - 'fields': [{'name': 'type', - 'type': 'STRING', - 'mode': 'REQUIRED'}, - {'name': 'number', - 'type': 'STRING', - 'mode': 'REQUIRED'}]}) - def test_create_w_bound_client(self): from gcloud.bigquery.table import SchemaField PATH = 'projects/%s/datasets/%s/tables' % (self.PROJECT, self.DS_NAME) @@ -1325,6 +1233,105 @@ def _row_data(row): self.assertEqual(req['data'], SENT) +class Test_parse_schema_resource(unittest2.TestCase, _SchemaBase): + + def _callFUT(self, resource): + from gcloud.bigquery.table import _parse_schema_resource + return _parse_schema_resource(resource) + + def _makeResource(self): + return { + 'schema': {'fields': [ + {'name': 'full_name', 'type': 'STRING', 'mode': 'REQUIRED'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'REQUIRED'}, + ]}, + } + + def test__parse_schema_resource_defaults(self): + RESOURCE = self._makeResource() + schema = self._callFUT(RESOURCE['schema']) + self._verifySchema(schema, RESOURCE) + + def test__parse_schema_resource_subfields(self): + RESOURCE = self._makeResource() + RESOURCE['schema']['fields'].append( + {'name': 'phone', + 'type': 'RECORD', + 'mode': 'REPEATABLE', + 'fields': [{'name': 'type', + 'type': 'STRING', + 'mode': 'REQUIRED'}, + {'name': 'number', + 'type': 'STRING', + 'mode': 'REQUIRED'}]}) + schema = self._callFUT(RESOURCE['schema']) + self._verifySchema(schema, RESOURCE) + + +class Test_build_schema_resource(unittest2.TestCase, _SchemaBase): + + def _callFUT(self, resource): + from gcloud.bigquery.table import _build_schema_resource + return _build_schema_resource(resource) + + def test_defaults(self): + from gcloud.bigquery.table import SchemaField + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + resource = self._callFUT([full_name, age]) + self.assertEqual(len(resource), 2) + self.assertEqual(resource[0], + {'name': 'full_name', + 'type': 'STRING', + 'mode': 'REQUIRED'}) + self.assertEqual(resource[1], + {'name': 'age', + 'type': 'INTEGER', + 'mode': 'REQUIRED'}) + + def test_w_description(self): + from gcloud.bigquery.table import SchemaField + DESCRIPTION = 'DESCRIPTION' + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED', + description=DESCRIPTION) + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + resource = self._callFUT([full_name, age]) + self.assertEqual(len(resource), 2) + self.assertEqual(resource[0], + {'name': 'full_name', + 'type': 'STRING', + 'mode': 'REQUIRED', + 'description': DESCRIPTION}) + self.assertEqual(resource[1], + {'name': 'age', + 'type': 'INTEGER', + 'mode': 'REQUIRED'}) + + def test_w_subfields(self): + from gcloud.bigquery.table import SchemaField + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + ph_type = SchemaField('type', 'STRING', 'REQUIRED') + ph_num = SchemaField('number', 'STRING', 'REQUIRED') + phone = SchemaField('phone', 'RECORD', mode='REPEATABLE', + fields=[ph_type, ph_num]) + resource = self._callFUT([full_name, phone]) + self.assertEqual(len(resource), 2) + self.assertEqual(resource[0], + {'name': 'full_name', + 'type': 'STRING', + 'mode': 'REQUIRED'}) + self.assertEqual(resource[1], + {'name': 'phone', + 'type': 'RECORD', + 'mode': 'REPEATABLE', + 'fields': [{'name': 'type', + 'type': 'STRING', + 'mode': 'REQUIRED'}, + {'name': 'number', + 'type': 'STRING', + 'mode': 'REQUIRED'}]}) + + class _Client(object): def __init__(self, project='project', connection=None): From 54607e8dd743d5ddbfe5eec0d865af28d7b09f7d Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 17:08:45 -0400 Subject: [PATCH 05/16] Surface 'Table.project'/'Table.dataset_name' properties FBO 'Job'. --- gcloud/bigquery/table.py | 18 ++++++++++++++++++ gcloud/bigquery/test_table.py | 2 ++ 2 files changed, 20 insertions(+) diff --git a/gcloud/bigquery/table.py b/gcloud/bigquery/table.py index 5f5134446e83..7a0aa42f980a 100644 --- a/gcloud/bigquery/table.py +++ b/gcloud/bigquery/table.py @@ -77,6 +77,24 @@ def __init__(self, name, dataset, schema=()): self._properties = {} self.schema = schema + @property + def project(self): + """Project bound to the table. + + :rtype: string + :returns: the project (derived from the dataset). + """ + return self._dataset.project + + @property + def dataset_name(self): + """Name of dataset containing the table. + + :rtype: string + :returns: the ID (derived from the dataset). + """ + return self._dataset.name + @property def path(self): """URL path for the table's APIs. diff --git a/gcloud/bigquery/test_table.py b/gcloud/bigquery/test_table.py index 0b5799002d0d..04f9524c232c 100644 --- a/gcloud/bigquery/test_table.py +++ b/gcloud/bigquery/test_table.py @@ -183,6 +183,8 @@ def test_ctor(self): table = self._makeOne(self.TABLE_NAME, dataset) self.assertEqual(table.name, self.TABLE_NAME) self.assertTrue(table._dataset is dataset) + self.assertEqual(table.project, self.PROJECT) + self.assertEqual(table.dataset_name, self.DS_NAME) self.assertEqual( table.path, '/projects/%s/datasets/%s/tables/%s' % ( From cd1fdba39e30db490347cd73e03f1df2519e869f Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 19:34:07 -0400 Subject: [PATCH 06/16] Add 'LoadFromStorageJob.begin' API method. --- gcloud/bigquery/job.py | 104 ++++++++++++++ gcloud/bigquery/test_job.py | 277 ++++++++++++++++++++++++++++++++++++ 2 files changed, 381 insertions(+) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index a5feb9f90f06..580ae82cfd2f 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -18,6 +18,8 @@ from gcloud.bigquery._helpers import _datetime_from_prop from gcloud.bigquery.table import SchemaField +from gcloud.bigquery.table import _build_schema_resource +from gcloud.bigquery.table import _parse_schema_resource class _LoadConfiguration(object): @@ -552,3 +554,105 @@ def write_disposition(self, value): def write_disposition(self): """Delete write_disposition.""" del self._configuration._write_disposition + + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + + :rtype: :class:`gcloud.bigquery.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self._client + return client + + def _build_resource(self): + """Generate a resource for ``begin``.""" + resource = { + 'jobReference': { + 'projectId': self.project, + 'jobId': self.name, + }, + 'configuration': { + 'sourceUris': self.source_uris, + 'destinationTable': { + 'projectId': self.destination.project, + 'datasetId': self.destination.dataset_name, + 'tableId': self.destination.name, + }, + 'load': {}, + }, + } + configuration = resource['configuration']['load'] + + if self.allow_jagged_rows is not None: + configuration['allowJaggedRows'] = self.allow_jagged_rows + if self.allow_quoted_newlines is not None: + configuration['allowQuotedNewlines'] = self.allow_quoted_newlines + if self.create_disposition is not None: + configuration['createDisposition'] = self.create_disposition + if self.encoding is not None: + configuration['encoding'] = self.encoding + if self.field_delimiter is not None: + configuration['fieldDelimiter'] = self.field_delimiter + if self.ignore_unknown_values is not None: + configuration['ignoreUnknownValues'] = self.ignore_unknown_values + if self.max_bad_records is not None: + configuration['maxBadRecords'] = self.max_bad_records + if self.quote_character is not None: + configuration['quote'] = self.quote_character + if self.skip_leading_rows is not None: + configuration['skipLeadingRows'] = self.skip_leading_rows + if self.source_format is not None: + configuration['sourceFormat'] = self.source_format + if self.write_disposition is not None: + configuration['writeDisposition'] = self.write_disposition + + if len(self.schema) > 0: + configuration['schema'] = { + 'fields': _build_schema_resource(self.schema)} + + if len(configuration) == 0: + del resource['configuration']['load'] + + return resource + + def _set_properties(self, api_response): + """Update properties from resource in body of ``api_response`` + + :type api_response: httplib2.Response + :param api_response: response returned from an API call + """ + self._properties.clear() + cleaned = api_response.copy() + schema = cleaned.pop('schema', {'fields': ()}) + self.schema = _parse_schema_resource(schema) + + statistics = cleaned.get('statistics', {}) + if 'creationTime' in statistics: + statistics['creationTime'] = float(statistics['creationTime']) + if 'startTime' in statistics: + statistics['startTime'] = float(statistics['startTime']) + if 'endTime' in statistics: + statistics['endTime'] = float(statistics['endTime']) + + self._properties.update(cleaned) + + def begin(self, client=None): + """API call: begin the job via a POST request + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + path = '/projects/%s/jobs' % (self.project,) + api_response = client.connection.api_request( + method='POST', path=path, data=self._build_resource()) + self._set_properties(api_response) diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 668eb9e772a8..c5a7125acdcc 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -18,6 +18,8 @@ class TestLoadFromStorageJob(unittest2.TestCase): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' + DS_NAME = 'datset_name' + TABLE_NAME = 'table_name' JOB_NAME = 'job_name' def _getTargetClass(self): @@ -27,6 +29,146 @@ def _getTargetClass(self): def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) + def _setUpConstants(self): + import datetime + from gcloud._helpers import UTC + + self.WHEN_TS = 1437767599.006 + self.WHEN = datetime.datetime.utcfromtimestamp(self.WHEN_TS).replace( + tzinfo=UTC) + self.ETAG = 'ETAG' + self.JOB_ID = '%s:%s' % (self.PROJECT, self.JOB_NAME) + self.RESOURCE_URL = 'http://example.com/path/to/resource' + self.USER_EMAIL = 'phred@example.com' + self.INPUT_FILES = 2 + self.INPUT_BYTES = 12345 + self.OUTPUT_BYTES = 23456 + self.OUTPUT_ROWS = 345 + + def _makeResource(self, started=False, ended=False): + self._setUpConstants() + resource = { + 'configuration': { + 'load': { + }, + }, + 'statistics': { + 'creationTime': self.WHEN_TS * 1000, + 'load': { + } + }, + 'etag': self.ETAG, + 'id': self.JOB_ID, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'selfLink': self.RESOURCE_URL, + 'user_email': self.USER_EMAIL, + } + + if started or ended: + resource['statistics']['startTime'] = self.WHEN_TS * 1000 + + if ended: + resource['statistics']['endTime'] = (self.WHEN_TS + 1000) * 1000 + resource['statistics']['load']['inputFiles'] = self.INPUT_FILES + resource['statistics']['load']['inputFileBytes'] = self.INPUT_BYTES + resource['statistics']['load']['outputBytes'] = self.OUTPUT_BYTES + resource['statistics']['load']['outputRows'] = self.OUTPUT_ROWS + + return resource + + def _verifyReadonlyResourceProperties(self, job, resource): + from datetime import timedelta + + self.assertEqual(job.job_id, self.JOB_ID) + + if 'creationTime' in resource.get('statistics', {}): + self.assertEqual(job.created, self.WHEN) + else: + self.assertEqual(job.created, None) + if 'startTime' in resource.get('statistics', {}): + self.assertEqual(job.started, self.WHEN) + else: + self.assertEqual(job.started, None) + if 'endTime' in resource.get('statistics', {}): + self.assertEqual(job.ended, self.WHEN + timedelta(seconds=1000)) + else: + self.assertEqual(job.ended, None) + if 'etag' in resource: + self.assertEqual(job.etag, self.ETAG) + else: + self.assertEqual(job.etag, None) + if 'selfLink' in resource: + self.assertEqual(job.self_link, self.RESOURCE_URL) + else: + self.assertEqual(job.self_link, None) + if 'user_email' in resource: + self.assertEqual(job.user_email, self.USER_EMAIL) + else: + self.assertEqual(job.user_email, None) + + def _verifyResourceProperties(self, job, resource): + self._verifyReadonlyResourceProperties(job, resource) + + config = resource.get('configuration', {}).get('load') + if 'allowJaggedRows' in config: + self.assertEqual(job.allow_jagged_rows, + config['allowJaggedRows']) + else: + self.assertTrue(job.allow_jagged_rows is None) + if 'allowQuotedNewlines' in config: + self.assertEqual(job.allow_quoted_newlines, + config['allowQuotedNewlines']) + else: + self.assertTrue(job.allow_quoted_newlines is None) + if 'createDisposition' in config: + self.assertEqual(job.create_disposition, + config['createDisposition']) + else: + self.assertTrue(job.create_disposition is None) + if 'encoding' in config: + self.assertEqual(job.encoding, + config['encoding']) + else: + self.assertTrue(job.encoding is None) + if 'fieldDelimiter' in config: + self.assertEqual(job.field_delimiter, + config['fieldDelimiter']) + else: + self.assertTrue(job.field_delimiter is None) + if 'ignoreUnknownValues' in config: + self.assertEqual(job.ignore_unknown_values, + config['ignoreUnknownValues']) + else: + self.assertTrue(job.ignore_unknown_values is None) + if 'maxBadRecords' in config: + self.assertEqual(job.max_bad_records, + config['maxBadRecords']) + else: + self.assertTrue(job.max_bad_records is None) + if 'quote' in config: + self.assertEqual(job.quote_character, + config['quote']) + else: + self.assertTrue(job.quote_character is None) + if 'skipLeadingRows' in config: + self.assertEqual(job.skip_leading_rows, + config['skipLeadingRows']) + else: + self.assertTrue(job.skip_leading_rows is None) + if 'sourceFormat' in config: + self.assertEqual(job.source_format, + config['sourceFormat']) + else: + self.assertTrue(job.source_format is None) + if 'writeDisposition' in config: + self.assertEqual(job.write_disposition, + config['writeDisposition']) + else: + self.assertTrue(job.write_disposition is None) + def test_ctor(self): client = _Client(self.PROJECT) table = _Table() @@ -342,6 +484,111 @@ def test_write_disposition_setter_deleter(self): del job.write_disposition self.assertTrue(job.write_disposition is None) + def test_begin_w_bound_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'sourceUris': [self.SOURCE1], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.TABLE_NAME, + }, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_begin_w_alternate_client(self): + from gcloud.bigquery.table import SchemaField + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource(ended=True) + LOAD_CONFIGURATION = { + 'allowJaggedRows': True, + 'allowQuotedNewlines': True, + 'createDisposition': 'CREATE_NEVER', + 'encoding': 'ISO-8559-1', + 'fieldDelimiter': '|', + 'ignoreUnknownValues': True, + 'maxBadRecords': 100, + 'quote': "'", + 'skipLeadingRows': 1, + 'sourceFormat': 'CSV', + 'writeDisposition': 'WRITE_TRUNCATE', + 'schema': {'fields': [ + {'name': 'full_name', 'type': 'STRING', 'mode': 'REQUIRED'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'REQUIRED'}, + ]} + } + RESOURCE['configuration']['load'] = LOAD_CONFIGURATION + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + table = _Table() + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client1, + schema=[full_name, age]) + + job.allow_jagged_rows = True + job.allow_quoted_newlines = True + job.create_disposition = 'CREATE_NEVER' + job.encoding = 'ISO-8559-1' + job.field_delimiter = '|' + job.ignore_unknown_values = True + job.max_bad_records = 100 + job.quote_character = "'" + job.skip_leading_rows = 1 + job.source_format = 'CSV' + job.write_disposition = 'WRITE_TRUNCATE' + + job.begin(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'sourceUris': [self.SOURCE1], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.TABLE_NAME, + }, + 'load': LOAD_CONFIGURATION, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + class _Client(object): @@ -354,3 +601,33 @@ class _Table(object): def __init__(self): pass + + @property + def name(self): + return TestLoadFromStorageJob.TABLE_NAME + + @property + def project(self): + return TestLoadFromStorageJob.PROJECT + + @property + def dataset_name(self): + return TestLoadFromStorageJob.DS_NAME + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + from gcloud.exceptions import NotFound + self._requested.append(kw) + + try: + response, self._responses = self._responses[0], self._responses[1:] + except: # pragma: NO COVER temporary, until 'get()' w/ miss + raise NotFound('miss') + else: + return response From 66dd746022b5926a5508431f62de7d891ba13237 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 19:47:45 -0400 Subject: [PATCH 07/16] Add 'LoadFromStorageJob.exists' API method. --- gcloud/bigquery/job.py | 21 +++++++++++++++++++++ gcloud/bigquery/test_job.py | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 580ae82cfd2f..2962052c5ae2 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -16,6 +16,7 @@ import six +from gcloud.exceptions import NotFound from gcloud.bigquery._helpers import _datetime_from_prop from gcloud.bigquery.table import SchemaField from gcloud.bigquery.table import _build_schema_resource @@ -656,3 +657,23 @@ def begin(self, client=None): api_response = client.connection.api_request( method='POST', path=path, data=self._build_resource()) self._set_properties(api_response) + + def exists(self, client=None): + """API call: test for the existence of the job via a GET request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + try: + client.connection.api_request(method='GET', path=self.path, + query_params={'fields': 'id'}) + except NotFound: + return False + else: + return True diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index c5a7125acdcc..13203279524e 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -589,6 +589,39 @@ def test_begin_w_alternate_client(self): self.assertEqual(req['data'], SENT) self._verifyResourceProperties(job, RESOURCE) + def test_exists_miss_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn = _Connection() + client = _Client(project=self.PROJECT, connection=conn) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + + self.assertFalse(job.exists()) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_exists_hit_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection({}) + client2 = _Client(project=self.PROJECT, connection=conn2) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client1) + + self.assertTrue(job.exists(client=client2)) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + class _Client(object): @@ -627,7 +660,7 @@ def api_request(self, **kw): try: response, self._responses = self._responses[0], self._responses[1:] - except: # pragma: NO COVER temporary, until 'get()' w/ miss + except: raise NotFound('miss') else: return response From 4da9bb5734cdf1415604b6b943bd4070802d003b Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 11 Aug 2015 19:51:36 -0400 Subject: [PATCH 08/16] Add 'LoadFromStorageJob.reload' API method. --- gcloud/bigquery/job.py | 55 ++++++++++++++++--------- gcloud/bigquery/table.py | 1 + gcloud/bigquery/test_job.py | 81 ++++++++++++++++++++++++++++--------- 3 files changed, 100 insertions(+), 37 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 2962052c5ae2..1b1dc1cc2dce 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -570,24 +570,7 @@ def _require_client(self, client): client = self._client return client - def _build_resource(self): - """Generate a resource for ``begin``.""" - resource = { - 'jobReference': { - 'projectId': self.project, - 'jobId': self.name, - }, - 'configuration': { - 'sourceUris': self.source_uris, - 'destinationTable': { - 'projectId': self.destination.project, - 'datasetId': self.destination.dataset_name, - 'tableId': self.destination.name, - }, - 'load': {}, - }, - } - configuration = resource['configuration']['load'] + def _populate_config_resource(self, configuration): if self.allow_jagged_rows is not None: configuration['allowJaggedRows'] = self.allow_jagged_rows @@ -612,6 +595,26 @@ def _build_resource(self): if self.write_disposition is not None: configuration['writeDisposition'] = self.write_disposition + def _build_resource(self): + """Generate a resource for ``begin``.""" + resource = { + 'jobReference': { + 'projectId': self.project, + 'jobId': self.name, + }, + 'configuration': { + 'sourceUris': self.source_uris, + 'destinationTable': { + 'projectId': self.destination.project, + 'datasetId': self.destination.dataset_name, + 'tableId': self.destination.name, + }, + 'load': {}, + }, + } + configuration = resource['configuration']['load'] + self._populate_config_resource(configuration) + if len(self.schema) > 0: configuration['schema'] = { 'fields': _build_schema_resource(self.schema)} @@ -677,3 +680,19 @@ def exists(self, client=None): return False else: return True + + def reload(self, client=None): + """API call: refresh job properties via a GET request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + api_response = client.connection.api_request( + method='GET', path=self.path) + self._set_properties(api_response) diff --git a/gcloud/bigquery/table.py b/gcloud/bigquery/table.py index 7a0aa42f980a..838e2c8b128e 100644 --- a/gcloud/bigquery/table.py +++ b/gcloud/bigquery/table.py @@ -692,6 +692,7 @@ def insert_data(self, return errors + def _parse_schema_resource(info): """Parse a resource fragment into a schema field. diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 13203279524e..befa788024b0 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -109,10 +109,7 @@ def _verifyReadonlyResourceProperties(self, job, resource): else: self.assertEqual(job.user_email, None) - def _verifyResourceProperties(self, job, resource): - self._verifyReadonlyResourceProperties(job, resource) - - config = resource.get('configuration', {}).get('load') + def _verifyBooleanConfigProperties(self, job, config): if 'allowJaggedRows' in config: self.assertEqual(job.allow_jagged_rows, config['allowJaggedRows']) @@ -123,6 +120,13 @@ def _verifyResourceProperties(self, job, resource): config['allowQuotedNewlines']) else: self.assertTrue(job.allow_quoted_newlines is None) + if 'ignoreUnknownValues' in config: + self.assertEqual(job.ignore_unknown_values, + config['ignoreUnknownValues']) + else: + self.assertTrue(job.ignore_unknown_values is None) + + def _verifyEnumConfigProperties(self, job, config): if 'createDisposition' in config: self.assertEqual(job.create_disposition, config['createDisposition']) @@ -133,16 +137,30 @@ def _verifyResourceProperties(self, job, resource): config['encoding']) else: self.assertTrue(job.encoding is None) + if 'sourceFormat' in config: + self.assertEqual(job.source_format, + config['sourceFormat']) + else: + self.assertTrue(job.source_format is None) + if 'writeDisposition' in config: + self.assertEqual(job.write_disposition, + config['writeDisposition']) + else: + self.assertTrue(job.write_disposition is None) + + def _verifyResourceProperties(self, job, resource): + self._verifyReadonlyResourceProperties(job, resource) + + config = resource.get('configuration', {}).get('load') + + self._verifyBooleanConfigProperties(job, config) + self._verifyEnumConfigProperties(job, config) + if 'fieldDelimiter' in config: self.assertEqual(job.field_delimiter, config['fieldDelimiter']) else: self.assertTrue(job.field_delimiter is None) - if 'ignoreUnknownValues' in config: - self.assertEqual(job.ignore_unknown_values, - config['ignoreUnknownValues']) - else: - self.assertTrue(job.ignore_unknown_values is None) if 'maxBadRecords' in config: self.assertEqual(job.max_bad_records, config['maxBadRecords']) @@ -158,16 +176,6 @@ def _verifyResourceProperties(self, job, resource): config['skipLeadingRows']) else: self.assertTrue(job.skip_leading_rows is None) - if 'sourceFormat' in config: - self.assertEqual(job.source_format, - config['sourceFormat']) - else: - self.assertTrue(job.source_format is None) - if 'writeDisposition' in config: - self.assertEqual(job.write_disposition, - config['writeDisposition']) - else: - self.assertTrue(job.write_disposition is None) def test_ctor(self): client = _Client(self.PROJECT) @@ -622,6 +630,41 @@ def test_exists_hit_w_alternate_client(self): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) + def test_reload_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + + job.reload() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + def test_reload_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client1) + + job.reload(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + class _Client(object): From 15d86633dec6b6c4400c5085fbf5ee6912a73c40 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 12 Aug 2015 15:38:29 -0400 Subject: [PATCH 09/16] Accomodate datetime centralization from #1051. --- gcloud/bigquery/job.py | 14 ++++++++++---- gcloud/bigquery/test_job.py | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 1b1dc1cc2dce..353cad35bb16 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -17,7 +17,7 @@ import six from gcloud.exceptions import NotFound -from gcloud.bigquery._helpers import _datetime_from_prop +from gcloud._helpers import _datetime_from_microseconds from gcloud.bigquery.table import SchemaField from gcloud.bigquery.table import _build_schema_resource from gcloud.bigquery.table import _parse_schema_resource @@ -153,7 +153,9 @@ def created(self): """ statistics = self._properties.get('statistics') if statistics is not None: - return _datetime_from_prop(statistics.get('creationTime')) + millis = statistics.get('creationTime') + if millis is not None: + return _datetime_from_microseconds(millis * 1000.0) @property def started(self): @@ -164,7 +166,9 @@ def started(self): """ statistics = self._properties.get('statistics') if statistics is not None: - return _datetime_from_prop(statistics.get('startTime')) + millis = statistics.get('startTime') + if millis is not None: + return _datetime_from_microseconds(millis * 1000.0) @property def ended(self): @@ -175,7 +179,9 @@ def ended(self): """ statistics = self._properties.get('statistics') if statistics is not None: - return _datetime_from_prop(statistics.get('endTime')) + millis = statistics.get('endTime') + if millis is not None: + return _datetime_from_microseconds(millis * 1000.0) @property def input_file_bytes(self): diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index befa788024b0..051bdce61885 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -261,7 +261,7 @@ def test_schema_setter(self): def test_props_set_by_server(self): import datetime from gcloud._helpers import UTC - from gcloud.bigquery._helpers import _millis + from gcloud._helpers import _millis CREATED = datetime.datetime(2015, 8, 11, 12, 13, 22, tzinfo=UTC) STARTED = datetime.datetime(2015, 8, 11, 13, 47, 15, tzinfo=UTC) From e37ab0b9ab43c756a690348d73afd411860e6ed6 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 13:44:13 -0400 Subject: [PATCH 10/16] Include 'sourceUris', 'destination' in load configuration. Found while attempting to add a system test for the 'load from storage' case. --- gcloud/bigquery/job.py | 16 +++++++--------- gcloud/bigquery/test_job.py | 24 +++++++++++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 353cad35bb16..a309b8a0f8f1 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -609,13 +609,14 @@ def _build_resource(self): 'jobId': self.name, }, 'configuration': { - 'sourceUris': self.source_uris, - 'destinationTable': { - 'projectId': self.destination.project, - 'datasetId': self.destination.dataset_name, - 'tableId': self.destination.name, + 'load': { + 'sourceUris': self.source_uris, + 'destinationTable': { + 'projectId': self.destination.project, + 'datasetId': self.destination.dataset_name, + 'tableId': self.destination.name, + }, }, - 'load': {}, }, } configuration = resource['configuration']['load'] @@ -625,9 +626,6 @@ def _build_resource(self): configuration['schema'] = { 'fields': _build_schema_resource(self.schema)} - if len(configuration) == 0: - del resource['configuration']['load'] - return resource def _set_properties(self, api_response): diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 051bdce61885..397d55c77ad4 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -517,11 +517,13 @@ def test_begin_w_bound_client(self): 'jobId': self.JOB_NAME, }, 'configuration': { - 'sourceUris': [self.SOURCE1], - 'destinationTable': { - 'projectId': self.PROJECT, - 'datasetId': self.DS_NAME, - 'tableId': self.TABLE_NAME, + 'load': { + 'sourceUris': [self.SOURCE1], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.TABLE_NAME, + }, }, }, } @@ -533,6 +535,12 @@ def test_begin_w_alternate_client(self): PATH = 'projects/%s/jobs' % self.PROJECT RESOURCE = self._makeResource(ended=True) LOAD_CONFIGURATION = { + 'sourceUris': [self.SOURCE1], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.TABLE_NAME, + }, 'allowJaggedRows': True, 'allowQuotedNewlines': True, 'createDisposition': 'CREATE_NEVER', @@ -585,12 +593,6 @@ def test_begin_w_alternate_client(self): 'jobId': self.JOB_NAME, }, 'configuration': { - 'sourceUris': [self.SOURCE1], - 'destinationTable': { - 'projectId': self.PROJECT, - 'datasetId': self.DS_NAME, - 'tableId': self.TABLE_NAME, - }, 'load': LOAD_CONFIGURATION, }, } From f88dcc3649f0fc60a87ee959ac910da2360609a7 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 13:49:30 -0400 Subject: [PATCH 11/16] Add system test for 'load from storage' case. --- system_tests/bigquery.py | 66 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index 31fc8b6f3071..83edd97ae107 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -237,3 +237,69 @@ def test_load_table_then_dump_table(self): by_age = operator.itemgetter(1) self.assertEqual(sorted(rows, key=by_age), sorted(ROWS, key=by_age)) + + def test_load_table_from_storage_then_dump_table(self): + import csv + import tempfile + from gcloud.storage import Client as StorageClient + TIMESTAMP = 1000 * time.time() + BUCKET_NAME = 'bq_load_test_%d' % (TIMESTAMP,) + BLOB_NAME = 'person_ages.csv' + GS_URL = 'gs://%s/%s' % (BUCKET_NAME, BLOB_NAME) + ROWS = [ + ('Phred Phlyntstone', 32), + ('Bharney Rhubble', 33), + ('Wylma Phlyntstone', 29), + ('Bhettye Rhubble', 27), + ] + DATASET_NAME = 'system_tests' + TABLE_NAME = 'test_table' + + s_client = StorageClient() + + # In the **very** rare case the bucket name is reserved, this + # fails with a ConnectionError. + bucket = s_client.create_bucket(BUCKET_NAME) + self.to_delete.append(bucket) + + blob = bucket.blob(BLOB_NAME) + self.to_delete.insert(0, blob) + + with tempfile.TemporaryFile() as f: + writer = csv.writer(f) + writer.writerow(('Full Name', 'Age')) + writer.writerows(ROWS) + blob.upload_from_file(f, rewind=True, content_type='text/csv') + + dataset = CLIENT.dataset(DATASET_NAME) + dataset.create() + self.to_delete.append(dataset) + + full_name = bigquery.SchemaField('full_name', 'STRING', + mode='REQUIRED') + age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED') + table = dataset.table(TABLE_NAME, schema=[full_name, age]) + table.create() + self.to_delete.insert(0, table) + + job = CLIENT.load_from_storage( + 'bq_load_storage_test_%d' % (TIMESTAMP,), table, GS_URL) + job.create_disposition = 'CREATE_NEVER' + job.skip_leading_rows = 1 + job.source_format = 'CSV' + job.write_disposition = 'WRITE_EMPTY' + + job.begin() + + counter = 9 # Allow for 90 seconds of lag. + + while job.state not in ('DONE', 'done') and counter > 0: + counter -= 1 + job.reload() + if job.state not in ('DONE', 'done'): + time.sleep(10) + + rows, _, _ = table.fetch_data() + by_age = operator.itemgetter(1) + self.assertEqual(sorted(rows, key=by_age), + sorted(ROWS, key=by_age)) From 31026aa8faefccca724427dea189403f254699ce Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 17:48:02 -0400 Subject: [PATCH 12/16] Add docstring. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1056/files#r37027413 --- gcloud/bigquery/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index a309b8a0f8f1..a305ca0ce152 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -577,7 +577,7 @@ def _require_client(self, client): return client def _populate_config_resource(self, configuration): - + """Helper for _build_resource: copy config properties to resource""" if self.allow_jagged_rows is not None: configuration['allowJaggedRows'] = self.allow_jagged_rows if self.allow_quoted_newlines is not None: From 31c4b1be1e7a282c8dc8d78e40c3f24eb1c8b3ee Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 13 Aug 2015 17:50:12 -0400 Subject: [PATCH 13/16] Use a longer var name for temporary file. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1056/files#r37028059 --- system_tests/bigquery.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index 83edd97ae107..41fcbe868c41 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -265,11 +265,12 @@ def test_load_table_from_storage_then_dump_table(self): blob = bucket.blob(BLOB_NAME) self.to_delete.insert(0, blob) - with tempfile.TemporaryFile() as f: - writer = csv.writer(f) + with tempfile.TemporaryFile() as csv_file: + writer = csv.writer(csv_file) writer.writerow(('Full Name', 'Age')) writer.writerows(ROWS) - blob.upload_from_file(f, rewind=True, content_type='text/csv') + blob.upload_from_file( + csv_file, rewind=True, content_type='text/csv') dataset = CLIENT.dataset(DATASET_NAME) dataset.create() From 6549219efbea64368d55cd5e69f1241f613b50d5 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 14 Aug 2015 11:49:58 -0400 Subject: [PATCH 14/16] Implement 'cancel' API request. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1056#discussion_r37029928 --- gcloud/bigquery/job.py | 16 ++++++++++++++++ gcloud/bigquery/test_job.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index a305ca0ce152..9bd58a8d48a2 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -700,3 +700,19 @@ def reload(self, client=None): api_response = client.connection.api_request( method='GET', path=self.path) self._set_properties(api_response) + + def cancel(self, client=None): + """API call: cancel job via a POST request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/cancel + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + api_response = client.connection.api_request( + method='POST', path='%s/cancel' % self.path) + self._set_properties(api_response) diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 397d55c77ad4..0a466bd9d0ee 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -667,6 +667,41 @@ def test_reload_w_alternate_client(self): self.assertEqual(req['path'], '/%s' % PATH) self._verifyResourceProperties(job, RESOURCE) + def test_cancel_w_bound_client(self): + PATH = 'projects/%s/jobs/%s/cancel' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client) + + job.cancel() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + def test_cancel_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s/cancel' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + table = _Table() + job = self._makeOne(self.JOB_NAME, table, [self.SOURCE1], client1) + + job.cancel(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + class _Client(object): From 9260ae95cf6c10910c4a180d2d65f126abecf570 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 17 Aug 2015 16:08:54 -0400 Subject: [PATCH 15/16] Expose quasi-enum types for string values w/ choices. Users who want to avoid tripping over string literal mismatches can use class constants instead. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1056/files#r37029533 --- gcloud/bigquery/job.py | 82 ++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 26 deletions(-) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 9bd58a8d48a2..4ffbd34a717d 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -23,6 +23,53 @@ from gcloud.bigquery.table import _parse_schema_resource +class _Enum(object): + """Psedo-enumeration class. + + Subclasses must define ``ALLOWED`` as a class-level constant: it must + be a sequence of strings. + """ + @classmethod + def validate(cls, value): + """Check that ``value`` is one of the allowed values. + + :raises: ValueError if value is not allowed. + """ + if value not in cls.ALLOWED: + raise ValueError('Pass one of: %s' ', '.join(cls.ALLOWED)) + + +class CreateDisposition(_Enum): + """Pseudo-enum for allowed values for ``create_disposition`` properties. + """ + CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' + CREATE_NEVER = 'CREATE_NEVER' + ALLOWED = (CREATE_IF_NEEDED, CREATE_NEVER) + + +class Encoding(_Enum): + """Pseudo-enum for allowed values for ``encoding`` properties.""" + UTF_8 = 'UTF-8' + ISO_8559_1 = 'ISO-8559-1' + ALLOWED = (UTF_8, ISO_8559_1) + + +class SourceFormat(_Enum): + """Pseudo-enum for allowed values for ``source_format`` properties.""" + CSV = 'CSV' + DATASTORE_BACKUP = 'DATASTORE_BACKUP' + NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' + ALLOWED = (CSV, DATASTORE_BACKUP, NEWLINE_DELIMITED_JSON) + + +class WriteDisposition(_Enum): + """Pseudo-enum for allowed values for ``write_disposition`` properties.""" + WRITE_APPEND = 'WRITE_APPEND' + WRITE_TRUNCATE = 'WRITE_TRUNCATE' + WRITE_EMPTY = 'WRITE_EMPTY' + ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) + + class _LoadConfiguration(object): """User-settable configuration options for load jobs.""" # None -> use server default. @@ -327,14 +374,10 @@ def create_disposition(self): def create_disposition(self, value): """Update create_disposition. - :type value: boolean - :param value: new create_disposition: one of "CREATE_IF_NEEDED" or - "CREATE_NEVER" - - :raises: ValueError for invalid value. + :type value: string + :param value: allowed values for :class:`CreateDisposition`. """ - if value not in ('CREATE_IF_NEEDED', 'CREATE_NEVER'): - raise ValueError("Pass 'CREATE_IF_NEEDED' or 'CREATE_NEVER'") + CreateDisposition.validate(value) # raises ValueError if invalid self._configuration._create_disposition = value @create_disposition.deleter @@ -356,12 +399,9 @@ def encoding(self, value): """Update encoding. :type value: string - :param value: new encoding: one of 'UTF-8' or 'ISO-8859-1'. - - :raises: ValueError for invalid value. + :param value: allowed values for :class:`Encoding`. """ - if value not in ('UTF-8', 'ISO-8559-1'): - raise ValueError("Pass 'UTF-8' or 'ISO-8559-1'") + Encoding.validate(value) # raises ValueError if invalid self._configuration._encoding = value @encoding.deleter @@ -518,14 +558,9 @@ def source_format(self, value): """Update source_format. :type value: string - :param value: new source_format: one of "CSV", "DATASTORE_BACKUP", - or "NEWLINE_DELIMITED_JSON" - - :raises: ValueError for invalid values. + :param value: valid values for :class:`SourceFormat`. """ - if value not in ('CSV', 'DATASTORE_BACKUP', 'NEWLINE_DELIMITED_JSON'): - raise ValueError( - "Pass 'CSV', 'DATASTORE_BACKUP' or 'NEWLINE_DELIMITED_JSON'") + SourceFormat.validate(value) # raises ValueError if invalid self._configuration._source_format = value @source_format.deleter @@ -547,14 +582,9 @@ def write_disposition(self, value): """Update write_disposition. :type value: string - :param value: new write_disposition: one of "WRITE_APPEND", - "WRITE_TRUNCATE", or "WRITE_EMPTY" - - :raises: ValueError for invalid value types. + :param value: valid values for :class:`WriteDisposition`. """ - if value not in ('WRITE_APPEND', 'WRITE_TRUNCATE', 'WRITE_EMPTY'): - raise ValueError( - "Pass 'WRITE_APPEND', 'WRITE_TRUNCATE' or 'WRITE_EMPTY'") + WriteDisposition.validate(value) # raises ValueError if invalid self._configuration._write_disposition = value @write_disposition.deleter From 3e10e521c76c4a787882ff551a22a005c27bb43c Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 17 Aug 2015 16:16:16 -0400 Subject: [PATCH 16/16] Make load job name / API more explicit: - 'Client.load_from_storage' -> 'Client.load_table_from_storage'. - 'job.LoadFromStorageJob' -> 'job.LoadTableFromStorageJob'. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1056/files#r37120652 --- gcloud/bigquery/client.py | 11 ++++++----- gcloud/bigquery/job.py | 2 +- gcloud/bigquery/test_client.py | 8 ++++---- gcloud/bigquery/test_job.py | 12 ++++++------ 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 0e08e39e563b..25b5cdcd1567 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -18,7 +18,7 @@ from gcloud.client import JSONClient from gcloud.bigquery.connection import Connection from gcloud.bigquery.dataset import Dataset -from gcloud.bigquery.job import LoadFromStorageJob +from gcloud.bigquery.job import LoadTableFromStorageJob class Client(JSONClient): @@ -98,7 +98,7 @@ def dataset(self, name): """ return Dataset(name, client=self) - def load_from_storage(self, name, destination, *source_uris): + def load_table_from_storage(self, name, destination, *source_uris): """Construct a job for loading data into a table from CloudStorage. :type name: string @@ -110,7 +110,8 @@ def load_from_storage(self, name, destination, *source_uris): :type source_uris: sequence of string :param source_uris: URIs of data files to be loaded. - :rtype: :class:`gcloud.bigquery.job.LoadFromStorageJob` - :returns: a new ``LoadFromStorageJob`` instance + :rtype: :class:`gcloud.bigquery.job.LoadTableFromStorageJob` + :returns: a new ``LoadTableFromStorageJob`` instance """ - return LoadFromStorageJob(name, destination, source_uris, client=self) + return LoadTableFromStorageJob(name, destination, source_uris, + client=self) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 4ffbd34a717d..c48a3bc96cab 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -86,7 +86,7 @@ class _LoadConfiguration(object): _write_disposition = None -class LoadFromStorageJob(object): +class LoadTableFromStorageJob(object): """Asynchronous job for loading data into a BQ table from CloudStorage. :type name: string diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index 7dbba0c103a8..548814d7c744 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -128,8 +128,8 @@ def test_dataset(self): self.assertEqual(dataset.name, DATASET) self.assertTrue(dataset._client is client) - def test_load_from_storage(self): - from gcloud.bigquery.job import LoadFromStorageJob + def test_load_table_from_storage(self): + from gcloud.bigquery.job import LoadTableFromStorageJob PROJECT = 'PROJECT' JOB = 'job_name' DATASET = 'dataset_name' @@ -140,8 +140,8 @@ def test_load_from_storage(self): client = self._makeOne(project=PROJECT, credentials=creds, http=http) dataset = client.dataset(DATASET) destination = dataset.table(DESTINATION) - job = client.load_from_storage(JOB, destination, SOURCE_URI) - self.assertTrue(isinstance(job, LoadFromStorageJob)) + job = client.load_table_from_storage(JOB, destination, SOURCE_URI) + self.assertTrue(isinstance(job, LoadTableFromStorageJob)) self.assertTrue(job._client is client) self.assertEqual(job.name, JOB) self.assertEqual(list(job.source_uris), [SOURCE_URI]) diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 0a466bd9d0ee..56a517d73d42 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -15,7 +15,7 @@ import unittest2 -class TestLoadFromStorageJob(unittest2.TestCase): +class TestLoadTableFromStorageJob(unittest2.TestCase): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' DS_NAME = 'datset_name' @@ -23,8 +23,8 @@ class TestLoadFromStorageJob(unittest2.TestCase): JOB_NAME = 'job_name' def _getTargetClass(self): - from gcloud.bigquery.job import LoadFromStorageJob - return LoadFromStorageJob + from gcloud.bigquery.job import LoadTableFromStorageJob + return LoadTableFromStorageJob def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) @@ -717,15 +717,15 @@ def __init__(self): @property def name(self): - return TestLoadFromStorageJob.TABLE_NAME + return TestLoadTableFromStorageJob.TABLE_NAME @property def project(self): - return TestLoadFromStorageJob.PROJECT + return TestLoadTableFromStorageJob.PROJECT @property def dataset_name(self): - return TestLoadFromStorageJob.DS_NAME + return TestLoadTableFromStorageJob.DS_NAME class _Connection(object):