Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8399] Add --hdfs_full_urls option #10223

Merged
merged 3 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* New highly anticipated feature Y added to JavaSDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).

### I/Os
* Python SDK: Adds support for standard HDFS URLs (with server name). ([#10223](https://github.com/apache/beam/pull/10223)).
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

### New Features / Improvements
Expand Down
85 changes: 54 additions & 31 deletions sdks/python/apache_beam/io/hadoopfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

_HDFS_PREFIX = 'hdfs:/'
_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)')
_FULL_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'/([^/]+)(/.*)*')
_COPY_BUFFER_SIZE = 2**16
_DEFAULT_BUFFER_SIZE = 20 * 1024 * 1024

Expand Down Expand Up @@ -116,42 +117,58 @@ def __init__(self, pipeline_options):
hdfs_host = hdfs_options.hdfs_host
hdfs_port = hdfs_options.hdfs_port
hdfs_user = hdfs_options.hdfs_user
self._full_urls = hdfs_options.hdfs_full_urls
else:
hdfs_host = pipeline_options.get('hdfs_host')
hdfs_port = pipeline_options.get('hdfs_port')
hdfs_user = pipeline_options.get('hdfs_user')
self._full_urls = pipeline_options.get('hdfs_full_urls', False)

if hdfs_host is None:
raise ValueError('hdfs_host is not set')
if hdfs_port is None:
raise ValueError('hdfs_port is not set')
if hdfs_user is None:
raise ValueError('hdfs_user is not set')
if not isinstance(self._full_urls, bool):
raise ValueError(
'hdfs_full_urls should be bool, got: %s', self._full_urls)
self._hdfs_client = hdfs.InsecureClient(
'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)

@classmethod
def scheme(cls):
return 'hdfs'

@staticmethod
def _parse_url(url):
def _parse_url(self, url):
"""Verifies that url begins with hdfs:// prefix, strips it and adds a
leading /.

Raises:
ValueError if url doesn't begin with hdfs://.
Parsing behavior is determined by HadoopFileSystemOptions.hdfs_full_urls.

Args:
url: A URL in the form hdfs://path/...
url: (str) A URL in the form hdfs://path/...
or in the form hdfs://server/path/...

Raises:
ValueError if the URL doesn't match the expect format.

Returns:
For an input of 'hdfs://path/...', will return '/path/...'.
(str, str) If using hdfs_full_urls, for an input of
'hdfs://server/path/...' will return (server, '/path/...').
Otherwise, for an input of 'hdfs://path/...', will return
('', '/path/...').
"""
m = _URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return m.group(1)
if not self._full_urls:
m = _URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return '', m.group(1)
else:
m = _FULL_URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return m.group(1), m.group(2) or '/'

def join(self, base_url, *paths):
"""Join two or more pathname components.
Expand All @@ -164,19 +181,24 @@ def join(self, base_url, *paths):
Returns:
Full url after combining all the passed components.
"""
basepath = self._parse_url(base_url)
return _HDFS_PREFIX + self._join(basepath, *paths)
server, basepath = self._parse_url(base_url)
return _HDFS_PREFIX + self._join(server, basepath, *paths)

def _join(self, basepath, *paths):
return posixpath.join(basepath, *paths)
def _join(self, server, basepath, *paths):
res = posixpath.join(basepath, *paths)
if server:
server = '/' + server
return server + res

def split(self, url):
rel_path = self._parse_url(url)
server, rel_path = self._parse_url(url)
if server:
server = '/' + server
udim marked this conversation as resolved.
Show resolved Hide resolved
head, tail = posixpath.split(rel_path)
return _HDFS_PREFIX + head, tail
return _HDFS_PREFIX + server + head, tail

def mkdirs(self, url):
path = self._parse_url(url)
_, path = self._parse_url(url)
if self._exists(path):
raise BeamIOError('Path already exists: %s' % path)
return self._mkdirs(path)
Expand All @@ -189,10 +211,10 @@ def has_dirs(self):

def _list(self, url):
try:
path = self._parse_url(url)
server, path = self._parse_url(url)
for res in self._hdfs_client.list(path, status=True):
yield FileMetadata(
_HDFS_PREFIX + self._join(path, res[0]),
_HDFS_PREFIX + self._join(server, path, res[0]),
res[1][_FILE_STATUS_LENGTH])
except Exception as e: # pylint: disable=broad-except
raise BeamIOError('List operation failed', {url: e})
Expand Down Expand Up @@ -222,7 +244,7 @@ def create(
Returns:
A Python File-like object.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._create(path, mime_type, compression_type)

def _create(
Expand All @@ -246,7 +268,7 @@ def open(
Returns:
A Python File-like object.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._open(path, mime_type, compression_type)

def _open(
Expand Down Expand Up @@ -293,7 +315,7 @@ def _copy_path(source, destination):

for path, dirs, files in self._hdfs_client.walk(source):
for dir in dirs:
new_dir = self._join(destination, dir)
new_dir = self._join('', destination, dir)
if not self._exists(new_dir):
self._mkdirs(new_dir)

Expand All @@ -302,13 +324,14 @@ def _copy_path(source, destination):
rel_path = ''
for file in files:
_copy_file(
self._join(path, file), self._join(destination, rel_path, file))
self._join('', path, file),
self._join('', destination, rel_path, file))

exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
_copy_path(rel_source, rel_destination)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
Expand All @@ -320,8 +343,8 @@ def rename(self, source_file_names, destination_file_names):
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
try:
self._hdfs_client.rename(rel_source, rel_destination)
except hdfs.HdfsError as e:
Expand All @@ -344,7 +367,7 @@ def exists(self, url):
Returns:
True if url exists as a file or directory in HDFS.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._exists(path)

def _exists(self, path):
Expand All @@ -356,7 +379,7 @@ def _exists(self, path):
return self._hdfs_client.status(path, strict=False) is not None

def size(self, url):
path = self._parse_url(url)
_, path = self._parse_url(url)
status = self._hdfs_client.status(path, strict=False)
if status is None:
raise BeamIOError('File not found: %s' % url)
Expand All @@ -371,7 +394,7 @@ def checksum(self, url):
Returns:
String describing the checksum.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
file_checksum = self._hdfs_client.checksum(path)
return '%s-%d-%s' % (
file_checksum[_FILE_CHECKSUM_ALGORITHM],
Expand All @@ -383,7 +406,7 @@ def delete(self, urls):
exceptions = {}
for url in urls:
try:
path = self._parse_url(url)
_, path = self._parse_url(url)
self._hdfs_client.delete(path, recursive=True)
except Exception as e: # pylint: disable=broad-except
exceptions[url] = e
Expand Down
88 changes: 75 additions & 13 deletions sdks/python/apache_beam/io/hadoopfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# patches unittest.TestCase to be python3 compatible
import future.tests.base # pylint: disable=unused-import
from future.utils import itervalues
from parameterized import parameterized_class

from apache_beam.io import hadoopfilesystem as hdfs
from apache_beam.io.filesystem import BeamIOError
Expand Down Expand Up @@ -203,6 +204,7 @@ def checksum(self, path):
return f.get_file_checksum()


@parameterized_class(('full_urls', ), [(False, ), (True, )])
class HadoopFileSystemTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand All @@ -220,7 +222,11 @@ def setUp(self):
hdfs_options.hdfs_user = ''

self.fs = hdfs.HadoopFileSystem(pipeline_options)
self.tmpdir = 'hdfs://test_dir'
self.fs._full_urls = self.full_urls
if self.full_urls:
self.tmpdir = 'hdfs://test_dir'
else:
self.tmpdir = 'hdfs://server/test_dir'

for filename in ['old_file1', 'old_file2']:
url = self.fs.join(self.tmpdir, filename)
Expand All @@ -230,22 +236,63 @@ def test_scheme(self):
self.assertEqual(self.fs.scheme(), 'hdfs')
self.assertEqual(hdfs.HadoopFileSystem.scheme(), 'hdfs')

def test_parse_url(self):
cases = [
('hdfs://', ('', '/'), False),
('hdfs://', None, True),
('hdfs://a', ('', '/a'), False),
('hdfs://a', ('a', '/'), True),
('hdfs://a/', ('', '/a/'), False),
('hdfs://a/', ('a', '/'), True),
('hdfs://a/b', ('', '/a/b'), False),
('hdfs://a/b', ('a', '/b'), True),
('hdfs://a/b/', ('', '/a/b/'), False),
('hdfs://a/b/', ('a', '/b/'), True),
('hdfs:/a/b', None, False),
('hdfs:/a/b', None, True),
('invalid', None, False),
('invalid', None, True),
]
for url, expected, full_urls in cases:
if self.full_urls != full_urls:
continue
try:
result = self.fs._parse_url(url)
except ValueError:
self.assertIsNone(expected, msg=(url, expected, full_urls))
continue
self.assertEqual(expected, result, msg=(url, expected, full_urls))

def test_url_join(self):
self.assertEqual(
'hdfs://tmp/path/to/file',
self.fs.join('hdfs://tmp/path', 'to', 'file'))
self.assertEqual(
'hdfs://tmp/path/to/file', self.fs.join('hdfs://tmp/path', 'to/file'))
self.assertEqual('hdfs://tmp/path/', self.fs.join('hdfs://tmp/path/', ''))
self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo', '/bar'))
with self.assertRaises(ValueError):
self.fs.join('/no/scheme', 'file')

if not self.full_urls:
self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo', '/bar'))
self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo/', '/bar'))
with self.assertRaises(ValueError):
self.fs.join('/no/scheme', 'file')
else:
self.assertEqual('hdfs://foo/bar', self.fs.join('hdfs://foo', '/bar'))
self.assertEqual('hdfs://foo/bar', self.fs.join('hdfs://foo/', '/bar'))

def test_url_split(self):
self.assertEqual(('hdfs://tmp/path/to', 'file'),
self.fs.split('hdfs://tmp/path/to/file'))
self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp'))
self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/'))
if not self.full_urls:
self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp'))
self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/'))
self.assertEqual(('hdfs://tmp', 'a'), self.fs.split('hdfs://tmp/a'))
else:
self.assertEqual(('hdfs://tmp/', ''), self.fs.split('hdfs://tmp'))
self.assertEqual(('hdfs://tmp/', ''), self.fs.split('hdfs://tmp/'))
self.assertEqual(('hdfs://tmp/', 'a'), self.fs.split('hdfs://tmp/a'))

self.assertEqual(('hdfs://tmp/a', ''), self.fs.split('hdfs://tmp/a/'))
with self.assertRaisesRegex(ValueError, r'parse'):
self.fs.split('tmp')

Expand Down Expand Up @@ -329,7 +376,7 @@ def test_create_success(self):
url = self.fs.join(self.tmpdir, 'new_file')
handle = self.fs.create(url)
self.assertIsNotNone(handle)
url = self.fs._parse_url(url)
_, url = self.fs._parse_url(url)
udim marked this conversation as resolved.
Show resolved Hide resolved
expected_file = FakeFile(url, 'wb')
self.assertEqual(self._fake_hdfs.files[url], expected_file)

Expand All @@ -338,7 +385,7 @@ def test_create_write_read_compressed(self):

handle = self.fs.create(url)
self.assertIsNotNone(handle)
path = self.fs._parse_url(url)
_, path = self.fs._parse_url(url)
expected_file = FakeFile(path, 'wb')
self.assertEqual(self._fake_hdfs.files[path], expected_file)
data = b'abc' * 10
Expand Down Expand Up @@ -535,7 +582,7 @@ def test_delete_error(self):
url2 = self.fs.join(self.tmpdir, 'old_file1')

self.assertTrue(self.fs.exists(url2))
path1 = self.fs._parse_url(url1)
_, path1 = self.fs._parse_url(url1)
with self.assertRaisesRegex(BeamIOError,
r'^Delete operation failed .* %s' % path1):
self.fs.delete([url1, url2])
Expand All @@ -545,21 +592,21 @@ def test_delete_error(self):
class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase):
"""Tests pipeline_options, in the form of a
RuntimeValueProvider.runtime_options object."""
def test_dict_options(self):
def setUp(self):
self._fake_hdfs = FakeHdfs()
hdfs.hdfs.InsecureClient = (lambda *args, **kwargs: self._fake_hdfs)

def test_dict_options(self):
pipeline_options = {
'hdfs_host': '',
'hdfs_port': 0,
'hdfs_user': '',
}

self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertFalse(self.fs._full_urls)

def test_dict_options_missing(self):
self._fake_hdfs = FakeHdfs()
hdfs.hdfs.InsecureClient = (lambda *args, **kwargs: self._fake_hdfs)

with self.assertRaisesRegex(ValueError, r'hdfs_host'):
self.fs = hdfs.HadoopFileSystem(
pipeline_options={
Expand All @@ -581,6 +628,21 @@ def test_dict_options_missing(self):
'hdfs_port': 0,
})

def test_dict_options_full_urls(self):
pipeline_options = {
'hdfs_host': '',
'hdfs_port': 0,
'hdfs_user': '',
'hdfs_full_urls': 'invalid',
}

with self.assertRaisesRegex(ValueError, r'hdfs_full_urls'):
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)

pipeline_options['hdfs_full_urls'] = True
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertTrue(self.fs._full_urls)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading