Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job for extracting tables to CloudStorage #1068

Merged
merged 7 commits into from
Aug 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions gcloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from gcloud.bigquery.connection import Connection
from gcloud.bigquery.dataset import Dataset
from gcloud.bigquery.job import CopyJob
from gcloud.bigquery.job import ExtractTableToStorageJob
from gcloud.bigquery.job import LoadTableFromStorageJob


Expand Down Expand Up @@ -133,3 +134,22 @@ def copy_table(self, name, destination, *sources):
:returns: a new ``CopyJob`` instance
"""
return CopyJob(name, destination, sources, client=self)

def extract_table_to_storage(self, name, source, *destination_uris):
"""Construct a job for extracting a table into Cloud Storage files.

:type name: string
:param name: Name of the job.

:type source: :class:`gcloud.bigquery.table.Table`
:param source: table to be extracted.

:type destination_uris: sequence of string
:param destination_uris: URIs of CloudStorage file(s) into which
table data is to be extracted.

:rtype: :class:`gcloud.bigquery.job.ExtractTableToStorageJob`
:returns: a new ``ExtractTableToStorageJob`` instance

This comment was marked as spam.

This comment was marked as spam.

"""
return ExtractTableToStorageJob(name, source, destination_uris,
client=self)
205 changes: 203 additions & 2 deletions gcloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ def _populate_config_resource(self, configuration):
configuration['writeDisposition'] = self.write_disposition

def _build_resource(self):
"""Generate a resource for ``begin``."""
"""Generate a resource for :meth:`begin`."""
resource = {
'jobReference': {
'projectId': self.project,
Expand Down Expand Up @@ -911,7 +911,7 @@ def _populate_config_resource(self, configuration):
configuration['writeDisposition'] = self.write_disposition

def _build_resource(self):
"""Generate a resource for ``begin``."""
"""Generate a resource for :meth:`begin`."""

source_refs = [{
'projectId': table.project,
Expand Down Expand Up @@ -939,3 +939,204 @@ def _build_resource(self):
self._populate_config_resource(configuration)

return resource


class _ExtractConfiguration(object):

This comment was marked as spam.

"""User-settable configuration options for extract jobs."""
# None -> use server default.
_compression = None
_destination_format = None
_field_delimiter = None
_print_header = None


class Compression(_Enum):
"""Pseudo-enum for allowed values for ``compression`` properties.
"""
GZIP = 'GZIP'
NONE = 'NONE'
ALLOWED = (GZIP, NONE)


class DestinationFormat(_Enum):
"""Pseudo-enum for allowed values for ``destination_format`` properties.
"""
CSV = 'CSV'
NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON'
AVRO = 'AVRO'
ALLOWED = (CSV, NEWLINE_DELIMITED_JSON, AVRO)


class ExtractTableToStorageJob(_BaseJob):
"""Asynchronous job: extract data from a BQ table into Cloud Storage.

:type name: string
:param name: the name of the job

:type source: :class:`gcloud.bigquery.table.Table`
:param source: Table into which data is to be loaded.

:type destination_uris: list of string
:param destination_uris: URIs describing Cloud Storage blobs into which
extracted data will be written.

:type client: :class:`gcloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, name, source, destination_uris, client):
super(ExtractTableToStorageJob, self).__init__(name, client)
self.source = source
self.destination_uris = destination_uris
self._configuration = _ExtractConfiguration()

@property
def compression(self):
"""Compression to apply to destination blobs.

See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.compression

:rtype: string, or ``NoneType``
:returns: The value as set by the user, or None (the default).
"""
return self._configuration._compression

@compression.setter
def compression(self, value):
"""Update compression.

:type value: string
:param value: allowed value for :class:`Compression`.
"""
Compression.validate(value) # raises ValueError if invalie
self._configuration._compression = value

@compression.deleter
def compression(self):
"""Delete compression."""
del self._configuration._compression

@property
def destination_format(self):
"""Handling for missing destination table.

See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.destinationFormat

:rtype: string, or ``NoneType``
:returns: The value as set by the user, or None (the default).
"""
return self._configuration._destination_format

@destination_format.setter
def destination_format(self, value):
"""Update destination_format.

:type value: string
:param value: allowed value for :class:`DestinationFormat`.
"""
DestinationFormat.validate(value) # raises ValueError if invalid
self._configuration._destination_format = value

@destination_format.deleter
def destination_format(self):
"""Delete destination_format."""
del self._configuration._destination_format

@property
def field_delimiter(self):
"""Allow rows with missing trailing commas for optional fields.

See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.fieldDelimiter

:rtype: string, or ``NoneType``
:returns: The value as set by the user, or None (the default).
"""
return self._configuration._field_delimiter

@field_delimiter.setter
def field_delimiter(self, value):
"""Update field_delimiter.

:type value: string
:param value: new field delimiter

:raises: ValueError for invalid value types.
"""
if not isinstance(value, six.string_types):
raise ValueError("Pass a string")
self._configuration._field_delimiter = value

@field_delimiter.deleter
def field_delimiter(self):
"""Delete field_delimiter."""
del self._configuration._field_delimiter

@property
def print_header(self):
"""Write a header row into destination blobs.

See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.printHeader

:rtype: boolean, or ``NoneType``
:returns: The value as set by the user, or None (the default).
"""
return self._configuration._print_header

@print_header.setter
def print_header(self, value):
"""Update print_header.

:type value: boolean
:param value: new print_header

:raises: ValueError for invalid value types.
"""
if not isinstance(value, bool):
raise ValueError("Pass a boolean")
self._configuration._print_header = value

@print_header.deleter
def print_header(self):
"""Delete print_header."""
del self._configuration._print_header

def _populate_config_resource(self, configuration):
"""Helper for _build_resource: copy config properties to resource"""
if self.compression is not None:
configuration['compression'] = self.compression
if self.destination_format is not None:
configuration['destinationFormat'] = self.destination_format
if self.field_delimiter is not None:
configuration['fieldDelimiter'] = self.field_delimiter
if self.print_header is not None:
configuration['printHeader'] = self.print_header

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""

source_ref = {
'projectId': self.source.project,
'datasetId': self.source.dataset_name,
'tableId': self.source.name,
}

resource = {
'jobReference': {
'projectId': self.project,
'jobId': self.name,
},
'configuration': {
'extract': {
'sourceTable': source_ref,
'destinationUris': self.destination_uris,
},
},
}
configuration = resource['configuration']['extract']
self._populate_config_resource(configuration)

return resource
19 changes: 19 additions & 0 deletions gcloud/bigquery/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,25 @@ def test_copy_table(self):
self.assertEqual(list(job.sources), [source])
self.assertTrue(job.destination is destination)

def test_extract_table_to_storage(self):
from gcloud.bigquery.job import ExtractTableToStorageJob
PROJECT = 'PROJECT'
JOB = 'job_name'
DATASET = 'dataset_name'
SOURCE = 'source_table'
DESTINATION = 'gs://bucket_name/object_name'
creds = _Credentials()
http = object()
client = self._makeOne(project=PROJECT, credentials=creds, http=http)
dataset = client.dataset(DATASET)
source = dataset.table(SOURCE)
job = client.extract_table_to_storage(JOB, source, DESTINATION)
self.assertTrue(isinstance(job, ExtractTableToStorageJob))
self.assertTrue(job._client is client)
self.assertEqual(job.name, JOB)
self.assertEqual(job.source, source)
self.assertEqual(list(job.destination_uris), [DESTINATION])


class _Credentials(object):

Expand Down
Loading