Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(scio-smb) Support mixed FileOperations per BucketedInput #5064

Merged
merged 21 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9f8072e
Support mixed FileOperations per source in SortedBucketSource
clairemcginty Nov 10, 2023
52bd353
cleanup
clairemcginty Nov 10, 2023
b072035
Include filenameSuffix with fileOperations
clairemcginty Nov 10, 2023
87cf839
fix Coder compat check; fix test
clairemcginty Nov 10, 2023
1be2bbe
Refactor partition-compatibility check to use hashed value
clairemcginty Dec 13, 2023
95f75f9
cleanup
clairemcginty Dec 13, 2023
563fb43
Less wasteful ptransform serialization
clairemcginty Dec 13, 2023
00f41da
ParquetAvroFileOperations getCoder should match AvroFileOperations ge…
clairemcginty Dec 14, 2023
d62b24f
Test specific + generic reads
clairemcginty Dec 14, 2023
a45419c
fetchMetadata must use a List for deterministic batching
clairemcginty Dec 14, 2023
2825981
Make SortedBucketIO.Read extendable for multi-format
clairemcginty Dec 14, 2023
a4e2193
Fixup SortedBucketIO.Read signature
clairemcginty Dec 14, 2023
1925d7e
Make getInputDirectories() public for extensions
clairemcginty Dec 14, 2023
875ae0e
PR comments
clairemcginty Dec 18, 2023
ab1ec15
Encode BucketedInput as indexed map of FileOperations/FileSuffixes
clairemcginty Dec 18, 2023
25dbe46
Add BucketedInput#getInputs to unlock SMB taps
clairemcginty Dec 19, 2023
0350664
Use AvroCoder.reflect
clairemcginty Dec 19, 2023
ae11f9a
Use local variable
clairemcginty Dec 19, 2023
8f39bd1
Use SerializableCoder
clairemcginty Jan 5, 2024
4271462
cleanup
clairemcginty Jan 5, 2024
29a6295
move coders to static variables
clairemcginty Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
Expand All @@ -34,6 +36,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;

/**
* {@link org.apache.beam.sdk.extensions.smb.BucketMetadata} for Avro {@link IndexedRecord} records.
Expand Down Expand Up @@ -131,6 +134,21 @@ public Map<Class<?>, Coder<?>> coderOverrides() {
return AvroUtils.coderOverrides();
}

@Override
int hashPrimaryKeyMetadata() {
return Objects.hash(keyField, getKeyClass());
}

@Override
int hashSecondaryKeyMetadata() {
return Objects.hash(keyFieldSecondary, getKeyClassSecondary());
}

@Override
public Set<Class<? extends BucketMetadata>> compatibleMetadataTypes() {
return ImmutableSet.of(ParquetBucketMetadata.class);
}

@Override
public K1 extractKeyPrimary(V value) {
int[] path = keyPath.get();
Expand Down Expand Up @@ -175,28 +193,4 @@ public void populateDisplayData(Builder builder) {
if (keyFieldSecondary != null)
builder.add(DisplayData.item("keyFieldSecondary", keyFieldSecondary));
}

@Override
public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata o) {
if (o == null || getClass() != o.getClass()) return false;
AvroBucketMetadata<?, ?, ?> that = (AvroBucketMetadata<?, ?, ?>) o;
return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField);
}

@Override
public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata o) {
if (o == null || getClass() != o.getClass()) return false;
AvroBucketMetadata<?, ?, ?> that = (AvroBucketMetadata<?, ?, ?>) o;
boolean allSecondaryPresent =
getKeyClassSecondary() != null
&& that.getKeyClassSecondary() != null
&& keyFieldSecondary != null
&& that.keyFieldSecondary != null;
// you messed up
if (!allSecondaryPresent) return false;
return getKeyClass() == that.getKeyClass()
&& getKeyClassSecondary() == that.getKeyClassSecondary()
&& keyField.equals(that.keyField)
&& keyFieldSecondary.equals(that.keyFieldSecondary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** API for reading and writing Avro sorted-bucket files. */
public class AvroSortedBucketIO {
Expand Down Expand Up @@ -191,6 +192,11 @@ public static <K1, K2, T extends SpecificRecord> TransformOutput<K1, K2, T> tran
/** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */
@AutoValue
public abstract static class Read<T extends IndexedRecord> extends SortedBucketIO.Read<T> {
@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

@Nullable
abstract Schema getSchema();

Expand Down Expand Up @@ -242,7 +248,7 @@ public Read<T> withPredicate(Predicate<T> predicate) {
}

@Override
protected SortedBucketSource.BucketedInput<T> toBucketedInput(
public SortedBucketSource.BucketedInput<T> toBucketedInput(
final SortedBucketSource.Keying keying) {
@SuppressWarnings("unchecked")
final AvroFileOperations<T> fileOperations =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,47 @@ <K> byte[] encodeKeyBytes(K key, Coder<K> coder) {
}

// Checks for complete equality between BucketMetadatas originating from the same BucketedInput
public abstract boolean isPartitionCompatibleForPrimaryKey(BucketMetadata other);
public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata other) {
return isIntraPartitionCompatibleWith(other, false);
}

public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata other) {
return isIntraPartitionCompatibleWith(other, true);
}

public abstract boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata other);
private <MetadataT extends BucketMetadata> boolean isIntraPartitionCompatibleWith(
MetadataT other, boolean checkSecondaryKeys) {
if (other == null) {
return false;
}
final Class<? extends BucketMetadata> otherClass = other.getClass();
final Set<Class<? extends BucketMetadata>> compatibleTypes = compatibleMetadataTypes();

if (compatibleTypes.isEmpty() && other.getClass() != this.getClass()) {
return false;
} else if (this.getKeyClass() != other.getKeyClass()
&& !(compatibleTypes.contains(otherClass)
&& (other.compatibleMetadataTypes().contains(this.getClass())))) {
return false;
}

return (this.hashPrimaryKeyMetadata() == other.hashPrimaryKeyMetadata()
&& (!checkSecondaryKeys
|| this.hashSecondaryKeyMetadata() == other.hashSecondaryKeyMetadata()));
}

public Set<Class<? extends BucketMetadata>> compatibleMetadataTypes() {
return new HashSet<>();
}

public abstract K1 extractKeyPrimary(V value);

public abstract K2 extractKeySecondary(V value);

abstract int hashPrimaryKeyMetadata();

abstract int hashSecondaryKeyMetadata();
Copy link
Contributor

@farzad-sedghi farzad-sedghi Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the secondary key impl. (lexicographic) we have today is one way of implementing it, which in some cases like analytics is less efficient/common as opposed to sth like z-ordering. Might be nice to think a bit more to find out with a more future proof API e.g. support N keys?


public SortedBucketIO.ComparableKeyBytes primaryComparableKeyBytes(V value) {
return new SortedBucketIO.ComparableKeyBytes(getKeyBytesPrimary(value), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy.FileAssignment;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

Expand Down Expand Up @@ -69,15 +69,14 @@ int leastNumBuckets() {
this.batchSize = batchSize;
}

private <V> Map<ResourceId, BucketMetadata<?, ?, V>> fetchMetadata(List<String> directories) {
private <V> Map<ResourceId, BucketMetadata<?, ?, V>> fetchMetadata(List<ResourceId> directories) {
final int total = directories.size();
final Map<ResourceId, BucketMetadata<?, ?, V>> metadata = new ConcurrentHashMap<>();
int start = 0;
while (start < total) {
directories.stream()
.skip(start)
.limit(batchSize)
.map(dir -> FileSystems.matchNewResource(dir, true))
.parallel()
.forEach(dir -> metadata.put(dir, BucketMetadata.get(dir)));
start += batchSize;
Expand All @@ -86,11 +85,11 @@ int leastNumBuckets() {
}

private <V> SourceMetadata<V> getSourceMetadata(
List<String> directories,
String filenameSuffix,
Map<ResourceId, KV<String, FileOperations<V>>> directories,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is now a bit more than direcories. Would probably be nice to change naming for smth more precise like directoryOperations

BiFunction<BucketMetadata<?, ?, V>, BucketMetadata<?, ?, V>, Boolean>
compatibilityCompareFn) {
final Map<ResourceId, BucketMetadata<?, ?, V>> bucketMetadatas = fetchMetadata(directories);
final Map<ResourceId, BucketMetadata<?, ?, V>> bucketMetadatas =
fetchMetadata(new ArrayList<>(directories.keySet()));
Preconditions.checkState(!bucketMetadatas.isEmpty(), "Failed to find metadata");

Map<ResourceId, SourceMetadataValue<V>> mapping = new HashMap<>();
Expand All @@ -107,24 +106,22 @@ private <V> SourceMetadata<V> getSourceMetadata(
metadata,
first.getValue());
final FileAssignment fileAssignment =
new SMBFilenamePolicy(dir, metadata.getFilenamePrefix(), filenameSuffix)
new SMBFilenamePolicy(
dir, metadata.getFilenamePrefix(), directories.get(dir).getKey())
.forDestination();
mapping.put(dir, new SourceMetadataValue<>(metadata, fileAssignment));
});
return new SourceMetadata<>(mapping);
}

public <V> SourceMetadata<V> getPrimaryKeyedSourceMetadata(
List<String> directories, String filenameSuffix) {
return getSourceMetadata(
directories, filenameSuffix, BucketMetadata::isPartitionCompatibleForPrimaryKey);
Map<ResourceId, KV<String, FileOperations<V>>> directories) {
return getSourceMetadata(directories, BucketMetadata::isPartitionCompatibleForPrimaryKey);
}

public <V> SourceMetadata<V> getPrimaryAndSecondaryKeyedSourceMetadata(
List<String> directories, String filenameSuffix) {
Map<ResourceId, KV<String, FileOperations<V>>> directories) {
return getSourceMetadata(
directories,
filenameSuffix,
BucketMetadata::isPartitionCompatibleForPrimaryAndSecondaryKey);
directories, BucketMetadata::isPartitionCompatibleForPrimaryAndSecondaryKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.transforms.display.DisplayData;
Expand Down Expand Up @@ -148,32 +148,12 @@ public void populateDisplayData(Builder builder) {
}

@Override
public boolean isPartitionCompatibleForPrimaryKey(final BucketMetadata o) {
if (o == null || getClass() != o.getClass()) return false;
JsonBucketMetadata<?, ?> that = (JsonBucketMetadata<?, ?>) o;
return getKeyClass() == that.getKeyClass()
&& keyField.equals(that.keyField)
&& Arrays.equals(keyPath, that.keyPath);
RustedBones marked this conversation as resolved.
Show resolved Hide resolved
int hashPrimaryKeyMetadata() {
return Objects.hash(keyField, getKeyClass());
}

@Override
public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(final BucketMetadata o) {
if (o == null || getClass() != o.getClass()) return false;
JsonBucketMetadata<?, ?> that = (JsonBucketMetadata<?, ?>) o;
boolean allSecondaryPresent =
getKeyClassSecondary() != null
&& that.getKeyClassSecondary() != null
&& keyFieldSecondary != null
&& that.keyFieldSecondary != null
&& keyPathSecondary != null
&& that.keyPathSecondary != null;
// you messed up
if (!allSecondaryPresent) return false;
return getKeyClass() == that.getKeyClass()
&& getKeyClassSecondary() == that.getKeyClassSecondary()
&& keyField.equals(that.keyField)
&& keyFieldSecondary.equals(that.keyFieldSecondary)
&& Arrays.equals(keyPath, that.keyPath)
&& Arrays.equals(keyPathSecondary, that.keyPathSecondary);
int hashSecondaryKeyMetadata() {
return Objects.hash(keyFieldSecondary, getKeyClassSecondary());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** API for reading and writing BigQuery {@link TableRow} JSON sorted-bucket files. */
public class JsonSortedBucketIO {
Expand Down Expand Up @@ -107,6 +108,10 @@ public static <K1, K2> TransformOutput<K1, K2> transformOutput(
*/
@AutoValue
public abstract static class Read extends SortedBucketIO.Read<TableRow> {
@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

abstract Compression getCompression();

Expand Down Expand Up @@ -152,7 +157,7 @@ public Read withPredicate(Predicate<TableRow> predicate) {
}

@Override
protected BucketedInput<TableRow> toBucketedInput(final SortedBucketSource.Keying keying) {
public BucketedInput<TableRow> toBucketedInput(final SortedBucketSource.Keying keying) {
return BucketedInput.of(
keying,
getTupleTag(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,22 @@
*/
public class ParquetAvroFileOperations<ValueT> extends FileOperations<ValueT> {
static final CompressionCodecName DEFAULT_COMPRESSION = CompressionCodecName.ZSTD;

private final Class<ValueT> recordClass;
private final SerializableSchemaSupplier schemaSupplier;
private final CompressionCodecName compression;
private final SerializableConfiguration conf;
private final FilterPredicate predicate;

private ParquetAvroFileOperations(
Schema schema,
Class<ValueT> recordClass,
CompressionCodecName compression,
Configuration conf,
FilterPredicate predicate) {
super(Compression.UNCOMPRESSED, MimeTypes.BINARY);
this.schemaSupplier = new SerializableSchemaSupplier(schema);
this.recordClass = recordClass;
this.compression = compression;
this.conf = new SerializableConfiguration(conf);
this.predicate = predicate;
Expand All @@ -80,7 +84,7 @@ public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(

public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Schema schema, CompressionCodecName compression, Configuration conf) {
return new ParquetAvroFileOperations<>(schema, compression, conf, null);
return new ParquetAvroFileOperations<>(schema, null, compression, conf, null);
}

public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Expand All @@ -90,7 +94,7 @@ public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(

public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Schema schema, FilterPredicate predicate, Configuration conf) {
return new ParquetAvroFileOperations<>(schema, DEFAULT_COMPRESSION, conf, predicate);
return new ParquetAvroFileOperations<>(schema, null, DEFAULT_COMPRESSION, conf, predicate);
}

public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(Class<V> recordClass) {
Expand All @@ -106,7 +110,7 @@ public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Class<V> recordClass, CompressionCodecName compression, Configuration conf) {
// Use reflection to get SR schema
final Schema schema = new ReflectData(recordClass.getClassLoader()).getSchema(recordClass);
return new ParquetAvroFileOperations<>(schema, compression, conf, null);
return new ParquetAvroFileOperations<>(schema, recordClass, compression, conf, null);
}

public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Expand All @@ -118,7 +122,8 @@ public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Class<V> recordClass, FilterPredicate predicate, Configuration conf) {
// Use reflection to get SR schema
final Schema schema = new ReflectData(recordClass.getClassLoader()).getSchema(recordClass);
return new ParquetAvroFileOperations<>(schema, DEFAULT_COMPRESSION, conf, predicate);
return new ParquetAvroFileOperations<>(
schema, recordClass, DEFAULT_COMPRESSION, conf, predicate);
}

@Override
Expand All @@ -141,7 +146,9 @@ protected FileIO.Sink<ValueT> createSink() {
@SuppressWarnings("unchecked")
@Override
public Coder<ValueT> getCoder() {
return (AvroCoder<ValueT>) AvroCoder.of(getSchema());
return recordClass == null
? (AvroCoder<ValueT>) AvroCoder.of(getSchema())
: AvroCoder.reflect(recordClass);
}

Schema getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -199,6 +200,11 @@ public static <K1, K2, T extends SpecificRecord> TransformOutput<K1, K2, T> tran
/** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */
@AutoValue
public abstract static class Read<T extends IndexedRecord> extends SortedBucketIO.Read<T> {
@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

@Nullable
abstract Schema getSchema();

Expand Down Expand Up @@ -269,7 +275,7 @@ public Read<T> withConfiguration(Configuration configuration) {
}

@Override
protected BucketedInput<T> toBucketedInput(final SortedBucketSource.Keying keying) {
public BucketedInput<T> toBucketedInput(final SortedBucketSource.Keying keying) {
final Schema schema =
getRecordClass() == null
? getSchema()
Expand Down
Loading