Skip to content

Commit

Permalink
BigQuery & Storage: use client http for resumable media (googleapis#3705
Browse files Browse the repository at this point in the history
)

* BigQuery: Use client transport for resumable media
* Storage: Use client transport for resumable media
  • Loading branch information
Jon Wayne Parrott authored and landrito committed Aug 22, 2017
1 parent 63053c8 commit 1f945a0
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 138 deletions.
14 changes: 5 additions & 9 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import six

import google.auth.transport.requests
from google import resumable_media
from google.resumable_media.requests import MultipartUpload
from google.resumable_media.requests import ResumableUpload
Expand Down Expand Up @@ -823,8 +822,8 @@ def insert_data(self,

return errors

def _make_transport(self, client):
"""Make an authenticated transport with a client's credentials.
def _get_transport(self, client):
"""Return the client's transport.
:type client: :class:`~google.cloud.bigquery.client.Client`
:param client: The client to use.
Expand All @@ -834,10 +833,7 @@ def _make_transport(self, client):
:returns: The transport (with credentials) that will
make authenticated requests.
"""
# Create a ``requests`` transport with the client's credentials.
transport = google.auth.transport.requests.AuthorizedSession(
client._credentials)
return transport
return client._http

def _initiate_resumable_upload(self, client, stream,
metadata, num_retries):
Expand Down Expand Up @@ -865,7 +861,7 @@ def _initiate_resumable_upload(self, client, stream,
* The ``transport`` used to initiate the upload.
"""
chunk_size = _DEFAULT_CHUNKSIZE
transport = self._make_transport(client)
transport = self._get_transport(client)
headers = _get_upload_headers(client._connection.USER_AGENT)
upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project)
upload = ResumableUpload(upload_url, chunk_size, headers=headers)
Expand Down Expand Up @@ -941,7 +937,7 @@ def _do_multipart_upload(self, client, stream, metadata,
msg = _READ_LESS_THAN_SIZE.format(size, len(data))
raise ValueError(msg)

transport = self._make_transport(client)
transport = self._get_transport(client)
headers = _get_upload_headers(client._connection.USER_AGENT)

upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project)
Expand Down
65 changes: 30 additions & 35 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1561,14 +1561,14 @@ def _row_data(row):
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(req['data'], SENT)

@mock.patch('google.auth.transport.requests.AuthorizedSession')
def test__make_transport(self, session_factory):
client = mock.Mock(spec=[u'_credentials'])
def test__get_transport(self):
client = mock.Mock(spec=[u'_credentials', '_http'])
client._http = mock.sentinel.http
table = self._make_one(self.TABLE_NAME, None)
transport = table._make_transport(client)

self.assertIs(transport, session_factory.return_value)
session_factory.assert_called_once_with(client._credentials)
transport = table._get_transport(client)

self.assertIs(transport, mock.sentinel.http)

@staticmethod
def _mock_requests_response(status_code, headers, content=b''):
Expand Down Expand Up @@ -1600,8 +1600,7 @@ def _initiate_resumable_upload_helper(self, num_retries=None):
response_headers = {'location': resumable_url}
fake_transport = self._mock_transport(
http_client.OK, response_headers)
table._make_transport = mock.Mock(
return_value=fake_transport, spec=[])
client._http = fake_transport

# Create some mock arguments and call the method under test.
data = b'goodbye gudbi gootbee'
Expand Down Expand Up @@ -1640,7 +1639,6 @@ def _initiate_resumable_upload_helper(self, num_retries=None):
self.assertEqual(stream.tell(), 0)

# Check the mocks.
table._make_transport.assert_called_once_with(client)
request_headers = expected_headers.copy()
request_headers['x-upload-content-type'] = _GENERIC_CONTENT_TYPE
fake_transport.request.assert_called_once_with(
Expand Down Expand Up @@ -1668,7 +1666,7 @@ def _do_multipart_upload_success_helper(

# Create mocks to be checked for doing transport.
fake_transport = self._mock_transport(http_client.OK, {})
table._make_transport = mock.Mock(return_value=fake_transport, spec=[])
client._http = fake_transport

# Create some mock arguments.
data = b'Bzzzz-zap \x00\x01\xf4'
Expand All @@ -1682,7 +1680,6 @@ def _do_multipart_upload_success_helper(
# Check the mocks and the returned value.
self.assertIs(response, fake_transport.request.return_value)
self.assertEqual(stream.tell(), size)
table._make_transport.assert_called_once_with(client)
get_boundary.assert_called_once_with()

upload_url = (
Expand Down Expand Up @@ -1723,7 +1720,7 @@ class TestTableUpload(object):
# rather than `unittest`-style.

@staticmethod
def _make_table():
def _make_table(transport=None):
from google.cloud.bigquery import _http
from google.cloud.bigquery import client
from google.cloud.bigquery import dataset
Expand All @@ -1733,6 +1730,7 @@ def _make_table():
client = mock.create_autospec(client.Client, instance=True)
client._connection = connection
client._credentials = mock.sentinel.credentials
client._http = transport
client.project = 'project_id'

dataset = dataset.Dataset('test_dataset', client)
Expand Down Expand Up @@ -1955,57 +1953,54 @@ def _make_resumable_upload_responses(cls, size):
return [initial_response, data_response, final_response]

@staticmethod
def _make_transport_patch(table, responses=None):
"""Patch a table's _make_transport method to return given responses."""
def _make_transport(responses=None):
import google.auth.transport.requests

transport = mock.create_autospec(
google.auth.transport.requests.AuthorizedSession, instance=True)
transport.request.side_effect = responses
return mock.patch.object(
table, '_make_transport', return_value=transport, autospec=True)
return transport

def test__do_resumable_upload(self):
table = self._make_table()
file_obj = self._make_file_obj()
file_obj_len = len(file_obj.getvalue())
responses = self._make_resumable_upload_responses(file_obj_len)
transport = self._make_transport(
self._make_resumable_upload_responses(file_obj_len))
table = self._make_table(transport)

with self._make_transport_patch(table, responses) as transport:
result = table._do_resumable_upload(
table._dataset._client,
file_obj,
self.EXPECTED_CONFIGURATION,
None)
result = table._do_resumable_upload(
table._dataset._client,
file_obj,
self.EXPECTED_CONFIGURATION,
None)

content = result.content.decode('utf-8')
assert json.loads(content) == {'size': file_obj_len}

# Verify that configuration data was passed in with the initial
# request.
transport.return_value.request.assert_any_call(
transport.request.assert_any_call(
'POST',
mock.ANY,
data=json.dumps(self.EXPECTED_CONFIGURATION).encode('utf-8'),
headers=mock.ANY)

def test__do_multipart_upload(self):
table = self._make_table()
transport = self._make_transport([self._make_response(http_client.OK)])
table = self._make_table(transport)
file_obj = self._make_file_obj()
file_obj_len = len(file_obj.getvalue())
responses = [self._make_response(http_client.OK)]

with self._make_transport_patch(table, responses) as transport:
table._do_multipart_upload(
table._dataset._client,
file_obj,
self.EXPECTED_CONFIGURATION,
file_obj_len,
None)
table._do_multipart_upload(
table._dataset._client,
file_obj,
self.EXPECTED_CONFIGURATION,
file_obj_len,
None)

# Verify that configuration data was passed in with the initial
# request.
request_args = transport.return_value.request.mock_calls[0][2]
request_args = transport.request.mock_calls[0][2]
request_data = request_args['data'].decode('utf-8')
request_headers = request_args['headers']

Expand Down
16 changes: 6 additions & 10 deletions storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

from six.moves.urllib.parse import quote

import google.auth.transport.requests
from google import resumable_media
from google.resumable_media.requests import ChunkedDownload
from google.resumable_media.requests import Download
Expand Down Expand Up @@ -361,8 +360,8 @@ def delete(self, client=None):
"""
return self.bucket.delete_blob(self.name, client=client)

def _make_transport(self, client):
"""Make an authenticated transport with a client's credentials.
def _get_transport(self, client):
"""Return the client's transport.
:type client: :class:`~google.cloud.storage.client.Client`
:param client: (Optional) The client to use. If not passed, falls back
Expand All @@ -374,10 +373,7 @@ def _make_transport(self, client):
make authenticated requests.
"""
client = self._require_client(client)
# Create a ``requests`` transport with the client's credentials.
transport = google.auth.transport.requests.AuthorizedSession(
client._credentials)
return transport
return client._http

def _get_download_url(self):
"""Get the download URL for the current blob.
Expand Down Expand Up @@ -463,7 +459,7 @@ def download_to_file(self, file_obj, client=None):
"""
download_url = self._get_download_url()
headers = _get_encryption_headers(self._encryption_key)
transport = self._make_transport(client)
transport = self._get_transport(client)

try:
self._do_download(transport, file_obj, download_url, headers)
Expand Down Expand Up @@ -638,7 +634,7 @@ def _do_multipart_upload(self, client, stream, content_type,
msg = _READ_LESS_THAN_SIZE.format(size, len(data))
raise ValueError(msg)

transport = self._make_transport(client)
transport = self._get_transport(client)
info = self._get_upload_arguments(content_type)
headers, object_metadata, content_type = info

Expand Down Expand Up @@ -708,7 +704,7 @@ def _initiate_resumable_upload(self, client, stream, content_type,
if chunk_size is None:
chunk_size = self.chunk_size

transport = self._make_transport(client)
transport = self._get_transport(client)
info = self._get_upload_arguments(content_type)
headers, object_metadata, content_type = info
if extra_headers is not None:
Expand Down
Loading

0 comments on commit 1f945a0

Please sign in to comment.