Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support virtual bucketing on $path for hive unbucketed table #12099

Closed
wants to merge 1 commit into from

Conversation

jessesleeping
Copy link
Contributor

For unbucketed table, providing an option to virtually bucketing them
give us the ability to have addressable splits. This will be useful
when we introduce partial failure recovery in the future. User can
use hive connector session property "virtual_bucket_count" to control
this behavior.

@@ -68,6 +70,11 @@

private HiveBucketing() {}

public static int getVirtualBucket(int bucketCount, Path path)
{
return (hashBytes(0, wrappedBuffer(path.toString().getBytes())) & Integer.MAX_VALUE) % bucketCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never use getBytes() since that uses the platform character set. Instead, use getBytes(UTF_8). However, Slices has a shortcut method utf8Slice() that takes a String.

But taking a step back, why not just do path.hashCode()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using the same hash function in hive bucketing, virtual bucketed table will have the same behavior (e.g. hash distribution) as table that are bucketed on a single VARCHAR column. Although I am not sure what the actual benefit keeping them align but I think it's good to be consistent if the extra cost is not that significant.

Copy link
Contributor

@electrum electrum Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable. Please add a comment explaining

// this is equivalent to bucketing the table on a VARCHAR column containing $path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also update to use utf8Slice()

// List all files in the partition and assign virtual bucket number to each of them
List<InternalHiveSplit> splitList = new ArrayList<>();
try {
new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, FAIL).forEachRemaining(file -> splitFactory.createInternalHiveSplit(file, getVirtualBucket(bucketCount, file.getPath()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wrapping is hard to read since the ifPresent() is part of the lambda above. I'd write it like

new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, FAIL).forEachRemaining(file ->
    splitFactory.createInternalHiveSplit(file, getVirtualBucket(bucketCount, file.getPath()))
            .ifPresent(splitList::add));

Copy link
Contributor

@electrum electrum Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or write this as a stream

return stream(new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, FAIL))
        .map(file -> splitFactory.createInternalHiveSplit(file, getVirtualBucket(bucketCount, file.getPath()))
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(toImmutableList());

Using stream() from com.google.common.collect.Streams

@@ -429,15 +456,15 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu

// convert files internal splits
List<InternalHiveSplit> splitList = new ArrayList<>();
for (int bucketNumber = 0; bucketNumber < Math.max(readBucketCount, partitionBucketCount); bucketNumber++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This static import change seems unrelated. It's fine to do, but please put it in a separate commit.

.ifPresent(splitList::add));
}
catch (NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
Copy link
Contributor

@electrum electrum Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems incorrect, since Hive doesn't have this virtual bucket behavior. This should probably be NOT_SUPPORTED or a new HIVE_SUBDIRECTORY_NOT_SUPPORTED with a message like

"Hive table '%s' partition '%s' contains sub-directory, which is not supported for virtual bucketing"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't see why we need to fail here, since sub-directories will work fine with virtual bucketing. We should use createInternalHiveSplitIterator() which will have the normal behavior for directory walking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we should let the file iterator recursively walk down the partition directory with nested directory policy set to RECURSE instead of FAIL. Will fix it.

@@ -352,6 +355,11 @@ private void invokeNoMoreSplitsIfNecessary()

// Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping
if (tableBucketInfo.isPresent()) {
if (isVirtuallyBucketed(tableBucketInfo.get().getBucketColumns())) {
// For virtual bucket, bucket conversion must not be present because there is no physical partition bucket count
checkState(!bucketConversion.isPresent());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a message here so we know what happened if this ever throws

@sopel39
Copy link
Contributor

sopel39 commented Dec 19, 2018

How does this relates to: #11876?

What is the advantage of bucketing by $path as it has very limited use case within queries to utilize bucket-by-bucket execution?

@jessesleeping
Copy link
Contributor Author

@sopel39 I didn't know about #11876 before but I think they are not really targeting on the same problem.

This PR is to provide a way for Hive unbucketed table to have addressable splits. For bucketed tables, a bucket can be considered as an execution unit as well as a failure recovery unit (although we don't have partial recovery yet). For unbucketed table, we need a similar way to group splits and identify these groups when needed. Whatever mapping that's fixed will do the work but "$path" seems more natural for Hive (bucketing is essentially based on order of file paths under the partition directory). We don't expect any query will benefit from virtual bucketing from a performance or resource usage perspective.

I think #11876 is aiming at providing grouped execution across partitions and reduces memory footprint.

@sopel39
Copy link
Contributor

sopel39 commented Dec 20, 2018

For bucketed tables, a bucket can be considered as an execution unit as well as a failure recovery unit (although we don't have partial recovery yet). For unbucketed table, we need a similar way to group splits and identify these groups when needed.

So the idea is to use $path virtual bucketing to provide failure-recovery unit?
But in order to really utilize such virtual bucket, queries have to be aggregated or joined on the $path column. If that is not the case, then the query will just execute as a non-bucketed query and the execution unit would be entire table. Please correct me if I'm wrong.

@wenleix
Copy link
Contributor

wenleix commented Dec 20, 2018

@sopel39 : You are right, virtual buckets doesn't help failure recovery with join/aggregations.

It serves as two purpose:

  • It make ScanFilterProject only query from un-bucketed table failure recoverable -- each "bucket" corresponding to a set of files.
  • It opens opportunities for recoverable shuffle, think about how MapReduce works. Of course, there are other alternatives to recoverable shuffle, such as existing in-memory shuffle + checkpoint.

In general, we just need some deterministic way of partitioning the input table. Computing hash over $path is one way.

For unbucketed table, providing an option to virtually bucketing them
give us the ability to have addressable splits. This will be useful
when we introduce partial failure recovery in the future. User can
use hive connector session property "virtual_bucket_count" to control
this behavior.
@sopel39
Copy link
Contributor

sopel39 commented Dec 27, 2018

It make ScanFilterProject only query from un-bucketed table failure recoverable -- each "bucket" corresponding to a set of files.

Is there such mechanism at the moment?
I don't think PlanFragmenter.GroupedExecutionTagger will trigger for $path bucket, will it?
Why one cannot just restart split processing if no data was produced? Do we need $path virtual bucket for it?

@shixuan-fan
Copy link
Contributor

Superceded by #13008

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants