From 9f8072e62b2a564c49f5790d9be34e0b7e688770 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 10 Nov 2023 12:44:09 -0500 Subject: [PATCH 01/21] Support mixed FileOperations per source in SortedBucketSource --- .../extensions/smb/BucketMetadataUtil.java | 10 +-- .../extensions/smb/SortedBucketSource.java | 77 +++++++++++++------ .../smb/BucketMetadataUtilTest.java | 12 +-- 3 files changed, 65 insertions(+), 34 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java index 2483468cb4..d2ef3d1ef7 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy.FileAssignment; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; @@ -69,7 +68,7 @@ int leastNumBuckets() { this.batchSize = batchSize; } - private Map> fetchMetadata(List directories) { + private Map> fetchMetadata(List directories) { final int total = directories.size(); final Map> metadata = new ConcurrentHashMap<>(); int start = 0; @@ -77,7 +76,6 @@ int leastNumBuckets() { directories.stream() .skip(start) .limit(batchSize) - .map(dir -> FileSystems.matchNewResource(dir, true)) .parallel() .forEach(dir -> metadata.put(dir, BucketMetadata.get(dir))); start += batchSize; @@ -86,7 +84,7 @@ int leastNumBuckets() { } private SourceMetadata getSourceMetadata( - List directories, + List directories, String filenameSuffix, BiFunction, BucketMetadata, Boolean> compatibilityCompareFn) { @@ -115,13 +113,13 @@ private SourceMetadata getSourceMetadata( } public SourceMetadata getPrimaryKeyedSourceMetadata( - List directories, String filenameSuffix) { + List directories, String filenameSuffix) { return getSourceMetadata( directories, filenameSuffix, BucketMetadata::isPartitionCompatibleForPrimaryKey); } public SourceMetadata getPrimaryAndSecondaryKeyedSourceMetadata( - List directories, String filenameSuffix) { + List directories, String filenameSuffix) { return getSourceMetadata( directories, filenameSuffix, diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 1db6e727b9..7941029714 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -28,9 +28,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -42,15 +44,14 @@ import java.util.stream.IntStream; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.extensions.smb.BucketMetadataUtil.SourceMetadata; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.fs.ResourceIdCoder; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; @@ -62,6 +63,7 @@ import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,10 +195,7 @@ public Coder> getOutputCoder() { keyTypeCoder(), CoGbkResult.CoGbkResultCoder.of( coGbkResultSchema(), - UnionCoder.of( - sources.stream() - .map(i -> i.fileOperations.getCoder()) - .collect(Collectors.toList())))); + UnionCoder.of(sources.stream().map(i -> i.getCoder()).collect(Collectors.toList())))); } @Override @@ -365,7 +364,8 @@ public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) sourceMetadata = BucketMetadataUtil.get() - .getPrimaryKeyedSourceMetadata(inputDirectories, filenameSuffix); + .getPrimaryKeyedSourceMetadata( + new ArrayList<>(fileOperations.keySet()), filenameSuffix); return sourceMetadata; } } @@ -390,7 +390,8 @@ public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) sourceMetadata = BucketMetadataUtil.get() - .getPrimaryAndSecondaryKeyedSourceMetadata(inputDirectories, filenameSuffix); + .getPrimaryAndSecondaryKeyedSourceMetadata( + new ArrayList<>(fileOperations.keySet()), filenameSuffix); return sourceMetadata; } } @@ -406,8 +407,7 @@ public abstract static class BucketedInput implements Serializable { protected TupleTag tupleTag; protected String filenameSuffix; - protected FileOperations fileOperations; - protected List inputDirectories; + protected Map> fileOperations; protected Predicate predicate; protected Keying keying; // lazy, internal checks depend on what kind of iteration is requested @@ -434,11 +434,36 @@ public BucketedInput( String filenameSuffix, FileOperations fileOperations, Predicate predicate) { + this( + keying, + tupleTag, + filenameSuffix, + inputDirectories.stream() + .collect( + Collectors.toMap( + dir -> FileSystems.matchNewResource(dir, true), dir -> fileOperations)), + predicate); + } + + public BucketedInput( + Keying keying, + TupleTag tupleTag, + String filenameSuffix, + Map> fileOperations, + Predicate predicate) { + final Set> coderTypes = + fileOperations.values().stream() + .map(f -> f.getCoder().getEncodedTypeDescriptor()) + .collect(Collectors.toSet()); + + Preconditions.checkArgument( + coderTypes.size() == 1, + "All FileOperations Coders must use the same encoding type; found: " + coderTypes); + this.keying = keying; this.tupleTag = tupleTag; this.filenameSuffix = filenameSuffix; this.fileOperations = fileOperations; - this.inputDirectories = inputDirectories; this.predicate = predicate; } @@ -453,7 +478,7 @@ public Predicate getPredicate() { } public Coder getCoder() { - return fileOperations.getCoder(); + return fileOperations.entrySet().iterator().next().getValue().getCoder(); } static CoGbkResultSchema schemaOf(List> sources) { @@ -461,12 +486,10 @@ static CoGbkResultSchema schemaOf(List> sources) { sources.stream().map(BucketedInput::getTupleTag).collect(Collectors.toList())); } - private static List sampleDirectory(String directory, String filepattern) { + private static List sampleDirectory(ResourceId directory, String filepattern) { try { - final ResourceId resourceId = FileSystems.matchNewResource(directory, true); - return FileSystems.match( - resourceId.resolve(filepattern, StandardResolveOptions.RESOLVE_FILE).toString()) + directory.resolve(filepattern, StandardResolveOptions.RESOLVE_FILE).toString()) .metadata(); } catch (FileNotFoundException e) { return Collections.emptyList(); @@ -476,7 +499,8 @@ private static List sampleDirectory(String directory, String filepatte } long getOrSampleByteSize() { - return inputDirectories + return fileOperations + .keySet() .parallelStream() .mapToLong( dir -> { @@ -548,7 +572,7 @@ public KeyGroupIterator createIterator( try { Iterator> iterator = Iterators.transform( - fileOperations.iterator(file), v -> KV.of(keyFn.apply(v), v)); + fileOperations.get(dir).iterator(file), v -> KV.of(keyFn.apply(v), v)); Iterator> out = (bufferSize > 0) ? new BufferedIterator<>(iterator, bufferSize) : iterator; iterators.add(out); @@ -563,6 +587,7 @@ public KeyGroupIterator createIterator( @Override public String toString() { + List inputDirectories = new ArrayList<>(fileOperations.keySet()); return String.format( "BucketedInput[tupleTag=%s, inputDirectories=[%s]]", tupleTag.getId(), @@ -578,9 +603,12 @@ public String toString() { @SuppressWarnings("unchecked") private void writeObject(ObjectOutputStream outStream) throws IOException { SerializableCoder.of(TupleTag.class).encode(tupleTag, outStream); - ListCoder.of(StringUtf8Coder.of()).encode(inputDirectories, outStream); + outStream.writeInt(fileOperations.size()); + for (Map.Entry> entry : fileOperations.entrySet()) { + ResourceIdCoder.of().encode(entry.getKey(), outStream); + outStream.writeObject(entry.getValue()); + } outStream.writeUTF(filenameSuffix); - outStream.writeObject(fileOperations); outStream.writeObject(predicate); outStream.writeObject(keying); outStream.flush(); @@ -589,9 +617,14 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { @SuppressWarnings("unchecked") private void readObject(ObjectInputStream inStream) throws ClassNotFoundException, IOException { this.tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream); - this.inputDirectories = ListCoder.of(StringUtf8Coder.of()).decode(inStream); + final int numDirectories = inStream.readInt(); + this.fileOperations = new HashMap<>(); + for (int i = 0; i < numDirectories; i++) { + fileOperations.put( + ResourceIdCoder.of().decode(inStream), (FileOperations) inStream.readObject()); + } + System.out.println("Read " + numDirectories + " dirs into " + fileOperations); this.filenameSuffix = inStream.readUTF(); - this.fileOperations = (FileOperations) inStream.readObject(); this.predicate = (Predicate) inStream.readObject(); this.keying = (Keying) inStream.readObject(); } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java index 43d08ee09d..bdeddd2f54 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java @@ -72,8 +72,8 @@ public void testIncompatibleMetadata() throws Exception { private void testIncompatibleMetadata(List metadataList, int badIdx) throws Exception { - final List directories = new ArrayList<>(); - final List goodDirectories = new ArrayList<>(); + final List directories = new ArrayList<>(); + final List goodDirectories = new ArrayList<>(); // all but one metadata are source-compatible, the one at badIdx is incompatible for (int i = 0; i < metadataList.size(); i++) { @@ -85,9 +85,9 @@ private void testIncompatibleMetadata(List metadataList, int "application/json")); BucketMetadata.to(metadataList.get(i), outputStream); - directories.add(dest.getAbsolutePath()); + directories.add(LocalResources.fromString(dest.getAbsolutePath(), true)); if (i != badIdx) { - goodDirectories.add(dest.getAbsolutePath()); + goodDirectories.add(LocalResources.fromString(dest.getAbsolutePath(), true)); } } @@ -122,13 +122,13 @@ public void testMissingMetadata() throws Exception { private void testMissingMetadata(List> metadataList) throws Exception { - final List directories = new ArrayList<>(); + final List directories = new ArrayList<>(); // all but one metadata are compatible ResourceId missingMetadataDir = null; for (int i = 0; i < metadataList.size(); i++) { final File dest = folder.newFolder(String.valueOf(i)); - directories.add(dest.getAbsolutePath()); + directories.add(LocalResources.fromString(dest.getAbsolutePath(), true)); if (!metadataList.get(i).isPresent()) { missingMetadataDir = LocalResources.fromFile(dest, true); From 52bd353b7a97c777a7aca0056a2b6b060c96df7e Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 10 Nov 2023 12:45:32 -0500 Subject: [PATCH 02/21] cleanup --- .../org/apache/beam/sdk/extensions/smb/SortedBucketSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 7941029714..f1ba8eda3c 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -623,7 +623,6 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio fileOperations.put( ResourceIdCoder.of().decode(inStream), (FileOperations) inStream.readObject()); } - System.out.println("Read " + numDirectories + " dirs into " + fileOperations); this.filenameSuffix = inStream.readUTF(); this.predicate = (Predicate) inStream.readObject(); this.keying = (Keying) inStream.readObject(); From b072035cd957ea98a402424f94e2ee677c6f0e02 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 10 Nov 2023 13:56:51 -0500 Subject: [PATCH 03/21] Include filenameSuffix with fileOperations --- .../extensions/smb/BucketMetadataUtil.java | 23 ++-- .../extensions/smb/SortedBucketSource.java | 97 +++++++++------- .../smb/BucketMetadataUtilTest.java | 25 ++++- .../smb/SortedBucketSourceTest.java | 105 +++++++++++++++--- 4 files changed, 183 insertions(+), 67 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java index d2ef3d1ef7..9b09c53139 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java @@ -24,6 +24,7 @@ import java.util.function.BiFunction; import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy.FileAssignment; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; @@ -68,7 +69,7 @@ int leastNumBuckets() { this.batchSize = batchSize; } - private Map> fetchMetadata(List directories) { + private Map> fetchMetadata(Set directories) { final int total = directories.size(); final Map> metadata = new ConcurrentHashMap<>(); int start = 0; @@ -84,11 +85,11 @@ int leastNumBuckets() { } private SourceMetadata getSourceMetadata( - List directories, - String filenameSuffix, + Map>> directories, BiFunction, BucketMetadata, Boolean> compatibilityCompareFn) { - final Map> bucketMetadatas = fetchMetadata(directories); + final Map> bucketMetadatas = + fetchMetadata(directories.keySet()); Preconditions.checkState(!bucketMetadatas.isEmpty(), "Failed to find metadata"); Map> mapping = new HashMap<>(); @@ -105,7 +106,8 @@ private SourceMetadata getSourceMetadata( metadata, first.getValue()); final FileAssignment fileAssignment = - new SMBFilenamePolicy(dir, metadata.getFilenamePrefix(), filenameSuffix) + new SMBFilenamePolicy( + dir, metadata.getFilenamePrefix(), directories.get(dir).getKey()) .forDestination(); mapping.put(dir, new SourceMetadataValue<>(metadata, fileAssignment)); }); @@ -113,16 +115,13 @@ private SourceMetadata getSourceMetadata( } public SourceMetadata getPrimaryKeyedSourceMetadata( - List directories, String filenameSuffix) { - return getSourceMetadata( - directories, filenameSuffix, BucketMetadata::isPartitionCompatibleForPrimaryKey); + Map>> directories) { + return getSourceMetadata(directories, BucketMetadata::isPartitionCompatibleForPrimaryKey); } public SourceMetadata getPrimaryAndSecondaryKeyedSourceMetadata( - List directories, String filenameSuffix) { + Map>> directories) { return getSourceMetadata( - directories, - filenameSuffix, - BucketMetadata::isPartitionCompatibleForPrimaryAndSecondaryKey); + directories, BucketMetadata::isPartitionCompatibleForPrimaryAndSecondaryKey); } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index f1ba8eda3c..f8aba2adcc 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.smb; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Functions; import com.google.common.base.Preconditions; import java.io.FileNotFoundException; import java.io.IOException; @@ -357,15 +358,25 @@ public PrimaryKeyedBucketedInput( String filenameSuffix, FileOperations fileOperations, Predicate predicate) { - super(Keying.PRIMARY, tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate); + this( + tupleTag, + inputDirectories.stream() + .collect( + Collectors.toMap( + Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))), + predicate); + } + + public PrimaryKeyedBucketedInput( + TupleTag tupleTag, + Map>> directories, + Predicate predicate) { + super(Keying.PRIMARY, tupleTag, directories, predicate); } public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) - sourceMetadata = - BucketMetadataUtil.get() - .getPrimaryKeyedSourceMetadata( - new ArrayList<>(fileOperations.keySet()), filenameSuffix); + sourceMetadata = BucketMetadataUtil.get().getPrimaryKeyedSourceMetadata(directories); return sourceMetadata; } } @@ -377,21 +388,26 @@ public PrimaryAndSecondaryKeyedBucktedInput( String filenameSuffix, FileOperations fileOperations, Predicate predicate) { - super( - Keying.PRIMARY_AND_SECONDARY, + this( tupleTag, - inputDirectories, - filenameSuffix, - fileOperations, + inputDirectories.stream() + .collect( + Collectors.toMap( + Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))), predicate); } + public PrimaryAndSecondaryKeyedBucktedInput( + TupleTag tupleTag, + Map>> directories, + Predicate predicate) { + super(Keying.PRIMARY_AND_SECONDARY, tupleTag, directories, predicate); + } + public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) sourceMetadata = - BucketMetadataUtil.get() - .getPrimaryAndSecondaryKeyedSourceMetadata( - new ArrayList<>(fileOperations.keySet()), filenameSuffix); + BucketMetadataUtil.get().getPrimaryAndSecondaryKeyedSourceMetadata(directories); return sourceMetadata; } } @@ -406,8 +422,7 @@ public abstract static class BucketedInput implements Serializable { private static final Pattern BUCKET_PATTERN = Pattern.compile("(\\d+)-of-(\\d+)"); protected TupleTag tupleTag; - protected String filenameSuffix; - protected Map> fileOperations; + protected Map>> directories; protected Predicate predicate; protected Keying keying; // lazy, internal checks depend on what kind of iteration is requested @@ -437,23 +452,21 @@ public BucketedInput( this( keying, tupleTag, - filenameSuffix, inputDirectories.stream() .collect( Collectors.toMap( - dir -> FileSystems.matchNewResource(dir, true), dir -> fileOperations)), + Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))), predicate); } public BucketedInput( Keying keying, TupleTag tupleTag, - String filenameSuffix, - Map> fileOperations, + Map>> directories, Predicate predicate) { final Set> coderTypes = - fileOperations.values().stream() - .map(f -> f.getCoder().getEncodedTypeDescriptor()) + directories.values().stream() + .map(f -> f.getValue().getCoder().getEncodedTypeDescriptor()) .collect(Collectors.toSet()); Preconditions.checkArgument( @@ -462,8 +475,11 @@ public BucketedInput( this.keying = keying; this.tupleTag = tupleTag; - this.filenameSuffix = filenameSuffix; - this.fileOperations = fileOperations; + this.directories = + directories.entrySet().stream() + .collect( + Collectors.toMap( + e -> FileSystems.matchNewResource(e.getKey(), true), Map.Entry::getValue)); this.predicate = predicate; } @@ -478,7 +494,7 @@ public Predicate getPredicate() { } public Coder getCoder() { - return fileOperations.entrySet().iterator().next().getValue().getCoder(); + return directories.entrySet().iterator().next().getValue().getValue().getCoder(); } static CoGbkResultSchema schemaOf(List> sources) { @@ -499,18 +515,20 @@ private static List sampleDirectory(ResourceId directory, String filep } long getOrSampleByteSize() { - return fileOperations - .keySet() + return directories + .entrySet() .parallelStream() .mapToLong( - dir -> { + entry -> { // Take at most 10 buckets from the directory to sample // Check for single-shard filenames template first, then multi-shard List sampledFiles = - sampleDirectory(dir, "*-0000?-of-?????" + filenameSuffix); + sampleDirectory(entry.getKey(), "*-0000?-of-?????" + entry.getValue().getKey()); if (sampledFiles.isEmpty()) { sampledFiles = - sampleDirectory(dir, "*-0000?-of-*-shard-00000-of-?????" + filenameSuffix); + sampleDirectory( + entry.getKey(), + "*-0000?-of-*-shard-00000-of-?????" + entry.getValue().getKey()); } int numBuckets = 0; @@ -532,7 +550,8 @@ long getOrSampleByteSize() { sampledBytes += metadata.sizeBytes(); } if (numBuckets == 0) { - throw new IllegalArgumentException("Directory " + dir + " has no bucket files"); + throw new IllegalArgumentException( + "Directory " + entry.getKey() + " has no bucket files"); } if (seenBuckets.size() < numBuckets) { return (long) (sampledBytes * (numBuckets / (seenBuckets.size() * 1.0))); @@ -572,7 +591,8 @@ public KeyGroupIterator createIterator( try { Iterator> iterator = Iterators.transform( - fileOperations.get(dir).iterator(file), v -> KV.of(keyFn.apply(v), v)); + directories.get(dir).getValue().iterator(file), + v -> KV.of(keyFn.apply(v), v)); Iterator> out = (bufferSize > 0) ? new BufferedIterator<>(iterator, bufferSize) : iterator; iterators.add(out); @@ -587,7 +607,7 @@ public KeyGroupIterator createIterator( @Override public String toString() { - List inputDirectories = new ArrayList<>(fileOperations.keySet()); + List inputDirectories = new ArrayList<>(directories.keySet()); return String.format( "BucketedInput[tupleTag=%s, inputDirectories=[%s]]", tupleTag.getId(), @@ -603,12 +623,11 @@ public String toString() { @SuppressWarnings("unchecked") private void writeObject(ObjectOutputStream outStream) throws IOException { SerializableCoder.of(TupleTag.class).encode(tupleTag, outStream); - outStream.writeInt(fileOperations.size()); - for (Map.Entry> entry : fileOperations.entrySet()) { + outStream.writeInt(directories.size()); + for (Map.Entry>> entry : directories.entrySet()) { ResourceIdCoder.of().encode(entry.getKey(), outStream); outStream.writeObject(entry.getValue()); } - outStream.writeUTF(filenameSuffix); outStream.writeObject(predicate); outStream.writeObject(keying); outStream.flush(); @@ -618,12 +637,12 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { private void readObject(ObjectInputStream inStream) throws ClassNotFoundException, IOException { this.tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream); final int numDirectories = inStream.readInt(); - this.fileOperations = new HashMap<>(); + this.directories = new HashMap<>(); for (int i = 0; i < numDirectories; i++) { - fileOperations.put( - ResourceIdCoder.of().decode(inStream), (FileOperations) inStream.readObject()); + directories.put( + ResourceIdCoder.of().decode(inStream), + (KV>) inStream.readObject()); } - this.filenameSuffix = inStream.readUTF(); this.predicate = (Predicate) inStream.readObject(); this.keying = (Keying) inStream.readObject(); } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java index bdeddd2f54..1d2404002e 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtilTest.java @@ -30,6 +30,8 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.LocalResources; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.values.KV; +import org.apache.curator.shaded.com.google.common.base.Functions; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -92,10 +94,21 @@ private void testIncompatibleMetadata(List metadataList, int } final SourceMetadata sourceMetadata = - util.getPrimaryKeyedSourceMetadata(goodDirectories, ".txt"); + util.getPrimaryKeyedSourceMetadata( + goodDirectories.stream() + .collect( + Collectors.toMap( + Functions.identity(), dir -> KV.of(".txt", new TestFileOperations())))); Assert.assertEquals(goodDirectories.size(), sourceMetadata.mapping.size()); Assert.assertThrows( - IllegalStateException.class, () -> util.getPrimaryKeyedSourceMetadata(directories, ".txt")); + IllegalStateException.class, + () -> + util.getPrimaryKeyedSourceMetadata( + directories.stream() + .collect( + Collectors.toMap( + Functions.identity(), + dir -> KV.of(".txt", new TestFileOperations()))))); folder.delete(); } @@ -147,7 +160,13 @@ private void testMissingMetadata(List> metadataList Assert.assertThrows( "Could not find SMB metadata for source directory " + missingMetadataDir, RuntimeException.class, - () -> util.getPrimaryKeyedSourceMetadata(directories, ".txt")); + () -> + util.getPrimaryKeyedSourceMetadata( + directories.stream() + .collect( + Collectors.toMap( + Functions.identity(), + dir -> KV.of(".txt", new TestFileOperations()))))); folder.delete(); } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java index 746509341b..4c6673b26f 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java @@ -26,15 +26,7 @@ import java.io.File; import java.io.OutputStream; import java.nio.channels.Channels; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.ToIntFunction; @@ -692,6 +684,85 @@ public void testUniqueTupleTagIdSecondaryKey() { Assert.assertThrows(IllegalArgumentException.class, () -> sb.and(read2)); } + @Test + public void testMixedInputTypesInSource() throws Exception { + final AvroGeneratedUser avroRecord = + AvroGeneratedUser.newBuilder() + .setName("foo") + .setFavoriteColor("avro-color") + .setFavoriteNumber(1) + .build(); + final AvroGeneratedUser parquetRecord = + AvroGeneratedUser.newBuilder() + .setName("foo") + .setFavoriteColor("parquet-color") + .setFavoriteNumber(2) + .build(); + + write( + lhsPolicy.forDestination(), + new AvroBucketMetadata( + 1, + 1, + String.class, + "name", + null, + null, + BucketMetadata.HashType.MURMUR3_32, + "bucket-", + AvroGeneratedUser.class), + ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of(avroRecord)), + AvroFileOperations.of(AvroGeneratedUser.class)); + + write( + rhsPolicy.forDestination(), + new ParquetBucketMetadata( + 1, + 1, + String.class, + "name", + null, + null, + BucketMetadata.HashType.MURMUR3_32, + "bucket-", + AvroGeneratedUser.class), + ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of(parquetRecord)), + ParquetAvroFileOperations.of(AvroGeneratedUser.class)); + + final PCollection> output = + pipeline.apply( + Read.from( + new SortedBucketPrimaryKeyedSource<>( + String.class, + ImmutableList.of( + new PrimaryKeyedBucketedInput<>( + new TupleTag(), + ImmutableMap.of( + TestUtils.fromFolder(lhsFolder).toString(), + KV.of(".avro", AvroFileOperations.of(AvroGeneratedUser.class)), + TestUtils.fromFolder(rhsFolder).toString(), + KV.of( + ".parquet", + ParquetAvroFileOperations.of(AvroGeneratedUser.class))), + null)), + TargetParallelism.max(), + "metrics-key"))); + + PAssert.thatMap(output) + .satisfies( + m -> { + Assert.assertEquals(1, m.keySet().size()); + Assert.assertEquals( + ImmutableList.of(avroRecord, parquetRecord), + m.entrySet() + .iterator() + .next() + .getValue() + .getAll(new TupleTag())); + return null; + }); + } + @SuppressWarnings("unchecked") private static List> splitAndSort( SortedBucketSource source, int desiredByteSize) throws Exception { @@ -1094,6 +1165,15 @@ private static void write( BucketMetadata metadata, Map> input) throws Exception { + write(fileAssignment, metadata, input, new TestFileOperations()); + } + + private static void write( + FileAssignment fileAssignment, + BucketMetadata metadata, + Map> input, + FileOperations fileOperations) + throws Exception { // Write bucket metadata BucketMetadata.to( metadata, @@ -1101,11 +1181,10 @@ private static void write( FileSystems.create(fileAssignment.forMetadata(), "application/json"))); // Write bucket files - final TestFileOperations fileOperations = new TestFileOperations(); - for (Map.Entry> entry : input.entrySet()) { - Writer writer = + for (Map.Entry> entry : input.entrySet()) { + Writer writer = fileOperations.createWriter(fileAssignment.forBucket(entry.getKey(), metadata)); - for (String s : entry.getValue()) { + for (V s : entry.getValue()) { writer.write(s); } writer.close(); From 87cf83981d38131c41843fce161809a8b98e3710 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 10 Nov 2023 14:35:29 -0500 Subject: [PATCH 04/21] fix Coder compat check; fix test --- .../extensions/smb/AvroBucketMetadata.java | 48 +++++++++++++------ .../extensions/smb/ParquetBucketMetadata.java | 48 +++++++++++++------ .../extensions/smb/SortedBucketSource.java | 11 +---- .../smb/SortedBucketSourceTest.java | 34 ++++++++----- 4 files changed, 90 insertions(+), 51 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java index 9902262efd..7da69a7791 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java @@ -40,11 +40,11 @@ */ public class AvroBucketMetadata extends BucketMetadata { - @JsonProperty private final String keyField; + @JsonProperty final String keyField; @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) - private final String keyFieldSecondary; + final String keyFieldSecondary; @JsonIgnore private final AtomicReference keyPath = new AtomicReference<>(); @JsonIgnore private final AtomicReference keyPathSecondary = new AtomicReference<>(); @@ -178,25 +178,45 @@ public void populateDisplayData(Builder builder) { @Override public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - AvroBucketMetadata that = (AvroBucketMetadata) o; - return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); + if (o == null) return false; + if (o instanceof AvroBucketMetadata) { + AvroBucketMetadata that = (AvroBucketMetadata) o; + return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); + } else if (o instanceof ParquetBucketMetadata) { + ParquetBucketMetadata that = (ParquetBucketMetadata) o; + return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); + } else { + return false; + } } @Override public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - AvroBucketMetadata that = (AvroBucketMetadata) o; + if (o == null) return false; + String keyField; + String keyFieldSecondary; + if (o instanceof AvroBucketMetadata) { + AvroBucketMetadata that = (AvroBucketMetadata) o; + keyField = that.keyField; + keyFieldSecondary = that.keyFieldSecondary; + } else if (o instanceof ParquetBucketMetadata) { + ParquetBucketMetadata that = (ParquetBucketMetadata) o; + keyField = that.keyField; + keyFieldSecondary = that.keyFieldSecondary; + } else { + return false; + } + boolean allSecondaryPresent = getKeyClassSecondary() != null - && that.getKeyClassSecondary() != null - && keyFieldSecondary != null - && that.keyFieldSecondary != null; + && o.getKeyClassSecondary() != null + && this.keyFieldSecondary != null + && keyFieldSecondary != null; // you messed up if (!allSecondaryPresent) return false; - return getKeyClass() == that.getKeyClass() - && getKeyClassSecondary() == that.getKeyClassSecondary() - && keyField.equals(that.keyField) - && keyFieldSecondary.equals(that.keyFieldSecondary); + return getKeyClass() == o.getKeyClass() + && getKeyClassSecondary() == o.getKeyClassSecondary() + && this.keyField.equals(keyField) + && this.keyFieldSecondary.equals(keyFieldSecondary); } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java index 00e34cb0ce..44b1812a5d 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java @@ -39,11 +39,11 @@ public class ParquetBucketMetadata extends BucketMetadata { - @JsonProperty private final String keyField; + @JsonProperty final String keyField; @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) - private final String keyFieldSecondary; + final String keyFieldSecondary; @JsonIgnore private final AtomicReference> keyGettersPrimary = new AtomicReference<>(); @@ -190,26 +190,46 @@ public void populateDisplayData(DisplayData.Builder builder) { @Override public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - ParquetBucketMetadata that = (ParquetBucketMetadata) o; - return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); + if (o == null) return false; + if (o instanceof AvroBucketMetadata) { + AvroBucketMetadata that = (AvroBucketMetadata) o; + return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); + } else if (o instanceof ParquetBucketMetadata) { + ParquetBucketMetadata that = (ParquetBucketMetadata) o; + return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); + } else { + return false; + } } @Override public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - ParquetBucketMetadata that = (ParquetBucketMetadata) o; + if (o == null) return false; + String keyField; + String keyFieldSecondary; + if (o instanceof AvroBucketMetadata) { + AvroBucketMetadata that = (AvroBucketMetadata) o; + keyField = that.keyField; + keyFieldSecondary = that.keyFieldSecondary; + } else if (o instanceof ParquetBucketMetadata) { + ParquetBucketMetadata that = (ParquetBucketMetadata) o; + keyField = that.keyField; + keyFieldSecondary = that.keyFieldSecondary; + } else { + return false; + } + boolean allSecondaryPresent = getKeyClassSecondary() != null - && that.getKeyClassSecondary() != null - && keyFieldSecondary != null - && that.keyFieldSecondary != null; + && o.getKeyClassSecondary() != null + && this.keyFieldSecondary != null + && keyFieldSecondary != null; // you messed up if (!allSecondaryPresent) return false; - return getKeyClass() == that.getKeyClass() - && getKeyClassSecondary() == that.getKeyClassSecondary() - && keyField.equals(that.keyField) - && keyFieldSecondary.equals(that.keyFieldSecondary); + return getKeyClass() == o.getKeyClass() + && getKeyClassSecondary() == o.getKeyClassSecondary() + && this.keyField.equals(keyField) + && this.keyFieldSecondary.equals(keyFieldSecondary); } @Override diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index f8aba2adcc..cbacb1fa33 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -464,15 +463,6 @@ public BucketedInput( TupleTag tupleTag, Map>> directories, Predicate predicate) { - final Set> coderTypes = - directories.values().stream() - .map(f -> f.getValue().getCoder().getEncodedTypeDescriptor()) - .collect(Collectors.toSet()); - - Preconditions.checkArgument( - coderTypes.size() == 1, - "All FileOperations Coders must use the same encoding type; found: " + coderTypes); - this.keying = keying; this.tupleTag = tupleTag; this.directories = @@ -494,6 +484,7 @@ public Predicate getPredicate() { } public Coder getCoder() { + // Todo what if two input directories use a different Coder for the same type? return directories.entrySet().iterator().next().getValue().getValue().getCoder(); } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java index 4c6673b26f..fb234c16e4 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.extensions.smb.SortedBucketSource.PrimaryKeyedBucketedInput; import static org.apache.beam.sdk.extensions.smb.TestUtils.fromFolder; +import com.google.common.collect.ImmutableSet; import java.io.File; import java.io.OutputStream; import java.nio.channels.Channels; @@ -686,6 +687,15 @@ public void testUniqueTupleTagIdSecondaryKey() { @Test public void testMixedInputTypesInSource() throws Exception { + final TemporaryFolder avroDir = new TemporaryFolder(); + avroDir.create(); + final TemporaryFolder parquetDir = new TemporaryFolder(); + parquetDir.create(); + final SMBFilenamePolicy avroPolicy = + new SMBFilenamePolicy(fromFolder(avroDir), "bucket", ".avro"); + final SMBFilenamePolicy parquetPolicy = + new SMBFilenamePolicy(fromFolder(parquetDir), "bucket", ".parquet"); + final AvroGeneratedUser avroRecord = AvroGeneratedUser.newBuilder() .setName("foo") @@ -700,7 +710,7 @@ public void testMixedInputTypesInSource() throws Exception { .build(); write( - lhsPolicy.forDestination(), + avroPolicy.forDestination(), new AvroBucketMetadata( 1, 1, @@ -709,13 +719,13 @@ public void testMixedInputTypesInSource() throws Exception { null, null, BucketMetadata.HashType.MURMUR3_32, - "bucket-", + "bucket", AvroGeneratedUser.class), ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of(avroRecord)), AvroFileOperations.of(AvroGeneratedUser.class)); write( - rhsPolicy.forDestination(), + parquetPolicy.forDestination(), new ParquetBucketMetadata( 1, 1, @@ -724,7 +734,7 @@ public void testMixedInputTypesInSource() throws Exception { null, null, BucketMetadata.HashType.MURMUR3_32, - "bucket-", + "bucket", AvroGeneratedUser.class), ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of(parquetRecord)), ParquetAvroFileOperations.of(AvroGeneratedUser.class)); @@ -736,11 +746,11 @@ public void testMixedInputTypesInSource() throws Exception { String.class, ImmutableList.of( new PrimaryKeyedBucketedInput<>( - new TupleTag(), + new TupleTag<>("source-tag"), ImmutableMap.of( - TestUtils.fromFolder(lhsFolder).toString(), + TestUtils.fromFolder(avroDir).toString(), KV.of(".avro", AvroFileOperations.of(AvroGeneratedUser.class)), - TestUtils.fromFolder(rhsFolder).toString(), + TestUtils.fromFolder(parquetDir).toString(), KV.of( ".parquet", ParquetAvroFileOperations.of(AvroGeneratedUser.class))), @@ -753,14 +763,12 @@ public void testMixedInputTypesInSource() throws Exception { m -> { Assert.assertEquals(1, m.keySet().size()); Assert.assertEquals( - ImmutableList.of(avroRecord, parquetRecord), - m.entrySet() - .iterator() - .next() - .getValue() - .getAll(new TupleTag())); + ImmutableSet.of(avroRecord, parquetRecord), + Sets.newHashSet(m.get("foo").getAll("source-tag"))); return null; }); + + pipeline.run(); } @SuppressWarnings("unchecked") From 1be2bbea2c41691162bf26e303ef600005781eab Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Wed, 13 Dec 2023 16:09:05 -0500 Subject: [PATCH 05/21] Refactor partition-compatibility check to use hashed value --- .../extensions/smb/AvroBucketMetadata.java | 66 ++++++------------- .../sdk/extensions/smb/BucketMetadata.java | 37 ++++++++++- .../extensions/smb/JsonBucketMetadata.java | 30 ++------- .../extensions/smb/ParquetBucketMetadata.java | 52 ++++----------- .../smb/TensorFlowBucketMetadata.java | 23 ++----- .../extensions/smb/BucketMetadataTest.java | 8 +-- .../smb/SortedBucketSourceTest.java | 63 +++++++++++++----- .../extensions/smb/TestBucketMetadata.java | 22 +++---- .../smb/TestBucketMetadataWithSecondary.java | 24 +++---- 9 files changed, 147 insertions(+), 178 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java index 7da69a7791..e71dbee0b7 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java @@ -26,6 +26,8 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.avro.Schema; @@ -34,17 +36,18 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; /** * {@link org.apache.beam.sdk.extensions.smb.BucketMetadata} for Avro {@link IndexedRecord} records. */ public class AvroBucketMetadata extends BucketMetadata { - @JsonProperty final String keyField; + @JsonProperty private final String keyField; @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) - final String keyFieldSecondary; + private final String keyFieldSecondary; @JsonIgnore private final AtomicReference keyPath = new AtomicReference<>(); @JsonIgnore private final AtomicReference keyPathSecondary = new AtomicReference<>(); @@ -131,6 +134,21 @@ public Map, Coder> coderOverrides() { return AvroUtils.coderOverrides(); } + @Override + public int hashPrimaryKeyMetadata() { + return Objects.hash(keyField, getKeyClass()); + } + + @Override + public int hashSecondaryKeyMetadata() { + return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); + } + + @Override + public Set> compatibleMetadataTypes() { + return ImmutableSet.of(ParquetBucketMetadata.class); + } + @Override public K1 extractKeyPrimary(V value) { int[] path = keyPath.get(); @@ -175,48 +193,4 @@ public void populateDisplayData(Builder builder) { if (keyFieldSecondary != null) builder.add(DisplayData.item("keyFieldSecondary", keyFieldSecondary)); } - - @Override - public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata o) { - if (o == null) return false; - if (o instanceof AvroBucketMetadata) { - AvroBucketMetadata that = (AvroBucketMetadata) o; - return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); - } else if (o instanceof ParquetBucketMetadata) { - ParquetBucketMetadata that = (ParquetBucketMetadata) o; - return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); - } else { - return false; - } - } - - @Override - public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata o) { - if (o == null) return false; - String keyField; - String keyFieldSecondary; - if (o instanceof AvroBucketMetadata) { - AvroBucketMetadata that = (AvroBucketMetadata) o; - keyField = that.keyField; - keyFieldSecondary = that.keyFieldSecondary; - } else if (o instanceof ParquetBucketMetadata) { - ParquetBucketMetadata that = (ParquetBucketMetadata) o; - keyField = that.keyField; - keyFieldSecondary = that.keyFieldSecondary; - } else { - return false; - } - - boolean allSecondaryPresent = - getKeyClassSecondary() != null - && o.getKeyClassSecondary() != null - && this.keyFieldSecondary != null - && keyFieldSecondary != null; - // you messed up - if (!allSecondaryPresent) return false; - return getKeyClass() == o.getKeyClass() - && getKeyClassSecondary() == o.getKeyClassSecondary() - && this.keyField.equals(keyField) - && this.keyFieldSecondary.equals(keyFieldSecondary); - } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java index af957921ee..9749bfb768 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java @@ -290,14 +290,47 @@ byte[] encodeKeyBytes(K key, Coder coder) { } // Checks for complete equality between BucketMetadatas originating from the same BucketedInput - public abstract boolean isPartitionCompatibleForPrimaryKey(BucketMetadata other); + public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata other) { + return isIntraPartitionCompatibleWith(other, false); + } + + public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata other) { + return isIntraPartitionCompatibleWith(other, true); + } - public abstract boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata other); + private boolean isIntraPartitionCompatibleWith( + MetadataT other, boolean checkSecondaryKeys) { + if (other == null) { + return false; + } + final Class otherClass = other.getClass(); + final Set> compatibleTypes = compatibleMetadataTypes(); + + if (compatibleTypes.isEmpty() && other.getClass() != this.getClass()) { + return false; + } else if (this.getKeyClass() != other.getKeyClass() + && !(compatibleTypes.contains(otherClass) + && (other.compatibleMetadataTypes().contains(this.getClass())))) { + return false; + } + + return (this.hashPrimaryKeyMetadata() == other.hashPrimaryKeyMetadata() + && (!checkSecondaryKeys + || this.hashSecondaryKeyMetadata() == other.hashSecondaryKeyMetadata())); + } + + public Set> compatibleMetadataTypes() { + return new HashSet<>(); + } public abstract K1 extractKeyPrimary(V value); public abstract K2 extractKeySecondary(V value); + public abstract int hashPrimaryKeyMetadata(); + + public abstract int hashSecondaryKeyMetadata(); + public SortedBucketIO.ComparableKeyBytes primaryComparableKeyBytes(V value) { return new SortedBucketIO.ComparableKeyBytes(getKeyBytesPrimary(value), null); } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java index 396d115111..4b41a3bf6a 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java @@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.services.bigquery.model.TableRow; -import java.util.Arrays; +import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -148,32 +148,12 @@ public void populateDisplayData(Builder builder) { } @Override - public boolean isPartitionCompatibleForPrimaryKey(final BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - JsonBucketMetadata that = (JsonBucketMetadata) o; - return getKeyClass() == that.getKeyClass() - && keyField.equals(that.keyField) - && Arrays.equals(keyPath, that.keyPath); + public int hashPrimaryKeyMetadata() { + return Objects.hash(keyField, getKeyClass()); } @Override - public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(final BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - JsonBucketMetadata that = (JsonBucketMetadata) o; - boolean allSecondaryPresent = - getKeyClassSecondary() != null - && that.getKeyClassSecondary() != null - && keyFieldSecondary != null - && that.keyFieldSecondary != null - && keyPathSecondary != null - && that.keyPathSecondary != null; - // you messed up - if (!allSecondaryPresent) return false; - return getKeyClass() == that.getKeyClass() - && getKeyClassSecondary() == that.getKeyClassSecondary() - && keyField.equals(that.keyField) - && keyFieldSecondary.equals(that.keyFieldSecondary) - && Arrays.equals(keyPath, that.keyPath) - && Arrays.equals(keyPathSecondary, that.keyPathSecondary); + public int hashSecondaryKeyMetadata() { + return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java index 44b1812a5d..2c828efb02 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java @@ -27,6 +27,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import javax.annotation.Nullable; @@ -36,14 +38,15 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; public class ParquetBucketMetadata extends BucketMetadata { - @JsonProperty final String keyField; + @JsonProperty private final String keyField; @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) - final String keyFieldSecondary; + private final String keyFieldSecondary; @JsonIgnore private final AtomicReference> keyGettersPrimary = new AtomicReference<>(); @@ -189,47 +192,18 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Override - public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata o) { - if (o == null) return false; - if (o instanceof AvroBucketMetadata) { - AvroBucketMetadata that = (AvroBucketMetadata) o; - return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); - } else if (o instanceof ParquetBucketMetadata) { - ParquetBucketMetadata that = (ParquetBucketMetadata) o; - return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); - } else { - return false; - } + public int hashPrimaryKeyMetadata() { + return Objects.hash(keyField, getKeyClass()); } @Override - public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata o) { - if (o == null) return false; - String keyField; - String keyFieldSecondary; - if (o instanceof AvroBucketMetadata) { - AvroBucketMetadata that = (AvroBucketMetadata) o; - keyField = that.keyField; - keyFieldSecondary = that.keyFieldSecondary; - } else if (o instanceof ParquetBucketMetadata) { - ParquetBucketMetadata that = (ParquetBucketMetadata) o; - keyField = that.keyField; - keyFieldSecondary = that.keyFieldSecondary; - } else { - return false; - } + public int hashSecondaryKeyMetadata() { + return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); + } - boolean allSecondaryPresent = - getKeyClassSecondary() != null - && o.getKeyClassSecondary() != null - && this.keyFieldSecondary != null - && keyFieldSecondary != null; - // you messed up - if (!allSecondaryPresent) return false; - return getKeyClass() == o.getKeyClass() - && getKeyClassSecondary() == o.getKeyClassSecondary() - && this.keyField.equals(keyField) - && this.keyFieldSecondary.equals(keyFieldSecondary); + @Override + public Set> compatibleMetadataTypes() { + return ImmutableSet.of(AvroBucketMetadata.class); } @Override diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java index c1be631a12..0611b4288d 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.protobuf.ByteString; +import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; @@ -158,26 +159,12 @@ public void populateDisplayData(Builder builder) { } @Override - public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - TensorFlowBucketMetadata that = (TensorFlowBucketMetadata) o; - return getKeyClass() == that.getKeyClass() && keyField.equals(that.keyField); + public int hashPrimaryKeyMetadata() { + return Objects.hash(keyField, getKeyClass()); } @Override - public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - TensorFlowBucketMetadata that = (TensorFlowBucketMetadata) o; - boolean allSecondaryPresent = - getKeyClassSecondary() != null - && that.getKeyClassSecondary() != null - && keyFieldSecondary != null - && that.keyFieldSecondary != null; - // you messed up - if (!allSecondaryPresent) return false; - return getKeyClass() == that.getKeyClass() - && getKeyClassSecondary() == that.getKeyClassSecondary() - && keyField.equals(that.keyField) - && keyFieldSecondary.equals(that.keyFieldSecondary); + public int hashSecondaryKeyMetadata() { + return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java index 5f0d5bd153..0396c4b26e 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java @@ -75,13 +75,13 @@ public Void extractKeySecondary(Object value) { } @Override - public boolean isPartitionCompatibleForPrimaryKey(BucketMetadata other) { - return false; + public int hashPrimaryKeyMetadata() { + return -1; } @Override - public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(BucketMetadata other) { - return false; + public int hashSecondaryKeyMetadata() { + return -1; } }); } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java index fb234c16e4..60eb8dc7dc 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java @@ -27,7 +27,15 @@ import java.io.File; import java.io.OutputStream; import java.nio.channels.Channels; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.ToIntFunction; @@ -37,6 +45,9 @@ import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; import org.apache.beam.sdk.extensions.smb.FileOperations.Writer; import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy.FileAssignment; @@ -53,6 +64,8 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; +import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -739,28 +752,42 @@ public void testMixedInputTypesInSource() throws Exception { ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of(parquetRecord)), ParquetAvroFileOperations.of(AvroGeneratedUser.class)); + final TupleTag tupleTag = new TupleTag<>("source-tag"); final PCollection> output = - pipeline.apply( - Read.from( - new SortedBucketPrimaryKeyedSource<>( - String.class, - ImmutableList.of( - new PrimaryKeyedBucketedInput<>( - new TupleTag<>("source-tag"), - ImmutableMap.of( - TestUtils.fromFolder(avroDir).toString(), - KV.of(".avro", AvroFileOperations.of(AvroGeneratedUser.class)), - TestUtils.fromFolder(parquetDir).toString(), - KV.of( - ".parquet", - ParquetAvroFileOperations.of(AvroGeneratedUser.class))), - null)), - TargetParallelism.max(), - "metrics-key"))); + pipeline + .apply( + Read.from( + new SortedBucketPrimaryKeyedSource<>( + String.class, + ImmutableList.of( + new PrimaryKeyedBucketedInput<>( + tupleTag, + ImmutableMap.of( + TestUtils.fromFolder(avroDir).toString(), + KV.of( + ".avro", + AvroFileOperations.of(AvroGeneratedUser.class)), + TestUtils.fromFolder(parquetDir).toString(), + KV.of( + ".parquet", + ParquetAvroFileOperations.of(AvroGeneratedUser.class))), + null)), + TargetParallelism.max(), + "metrics-key"))) + .setCoder( + KvCoder.of( + StringUtf8Coder.of(), + CoGbkResult.CoGbkResultCoder.of( + CoGbkResultSchema.of(ImmutableList.of(tupleTag)), + // Set reflect coder to map Utf8s to Strings + UnionCoder.of( + ImmutableList.of(AvroCoder.reflect(AvroGeneratedUser.class)))))); PAssert.thatMap(output) .satisfies( m -> { + Set actual = Sets.newHashSet(m.get("foo").getAll("source-tag")); + Set expected = ImmutableSet.of(avroRecord, parquetRecord); Assert.assertEquals(1, m.keySet().size()); Assert.assertEquals( ImmutableSet.of(avroRecord, parquetRecord), diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java index 544df9fbbf..6dc01e0edb 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java @@ -84,18 +84,6 @@ public int hashCode() { return Objects.hash(keyIndex, getNumBuckets(), getNumShards(), getHashType()); } - @Override - public boolean isPartitionCompatibleForPrimaryKey(final BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - TestBucketMetadata that = (TestBucketMetadata) o; - return keyIndex.equals(that.keyIndex); - } - - @Override - public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(final BucketMetadata other) { - throw new IllegalArgumentException(); - } - @Override public String extractKeyPrimary(final String value) { try { @@ -109,4 +97,14 @@ public String extractKeyPrimary(final String value) { public Void extractKeySecondary(final String value) { throw new IllegalArgumentException(); } + + @Override + public int hashPrimaryKeyMetadata() { + return Objects.hash(getClass(), keyIndex); + } + + @Override + public int hashSecondaryKeyMetadata() { + throw new IllegalArgumentException(); + } } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java index 794c5784c5..8a4d02b4d6 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java @@ -78,20 +78,6 @@ public int hashCode() { return Objects.hash(keyIndex, getNumBuckets(), getNumShards(), getHashType()); } - @Override - public boolean isPartitionCompatibleForPrimaryKey(final BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - TestBucketMetadataWithSecondary that = (TestBucketMetadataWithSecondary) o; - return keyIndex.equals(that.keyIndex); - } - - @Override - public boolean isPartitionCompatibleForPrimaryAndSecondaryKey(final BucketMetadata o) { - if (o == null || getClass() != o.getClass()) return false; - TestBucketMetadataWithSecondary that = (TestBucketMetadataWithSecondary) o; - return keyIndex.equals(that.keyIndex) && keyIndexSecondary.equals(that.keyIndexSecondary); - } - @Override public String extractKeyPrimary(final String value) { try { @@ -109,4 +95,14 @@ public String extractKeySecondary(final String value) { return null; } } + + @Override + public int hashPrimaryKeyMetadata() { + return Objects.hash(keyIndex); + } + + @Override + public int hashSecondaryKeyMetadata() { + return Objects.hash(keyIndexSecondary); + } } From 95f75f94fcb797a651e5ef522a34cdb9bd3d6505 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Wed, 13 Dec 2023 16:13:31 -0500 Subject: [PATCH 06/21] cleanup --- .../apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java index 60eb8dc7dc..a86f65d5af 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java @@ -786,8 +786,6 @@ public void testMixedInputTypesInSource() throws Exception { PAssert.thatMap(output) .satisfies( m -> { - Set actual = Sets.newHashSet(m.get("foo").getAll("source-tag")); - Set expected = ImmutableSet.of(avroRecord, parquetRecord); Assert.assertEquals(1, m.keySet().size()); Assert.assertEquals( ImmutableSet.of(avroRecord, parquetRecord), From 563fb43e2e00c09fb06518136a1e9b8533053f07 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Wed, 13 Dec 2023 16:33:03 -0500 Subject: [PATCH 07/21] Less wasteful ptransform serialization --- .../extensions/smb/SortedBucketSource.java | 57 +++++++++++++++---- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index cbacb1fa33..6259eada37 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -45,6 +45,7 @@ import java.util.stream.IntStream; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.extensions.smb.BucketMetadataUtil.SourceMetadata; import org.apache.beam.sdk.io.BoundedSource; @@ -614,28 +615,60 @@ public String toString() { @SuppressWarnings("unchecked") private void writeObject(ObjectOutputStream outStream) throws IOException { SerializableCoder.of(TupleTag.class).encode(tupleTag, outStream); - outStream.writeInt(directories.size()); - for (Map.Entry>> entry : directories.entrySet()) { - ResourceIdCoder.of().encode(entry.getKey(), outStream); - outStream.writeObject(entry.getValue()); - } outStream.writeObject(predicate); outStream.writeObject(keying); + + long numDistinctFileSuffixes = + directories.values().stream().map(KV::getKey).distinct().count(); + long numDistinctFileOperations = + directories.values().stream().map(kv -> kv.getValue().getClass()).distinct().count(); + + // If all partitions use the same file operations type, don't keep re-encoding it + if (numDistinctFileSuffixes == 1 && numDistinctFileOperations == 1) { + outStream.writeBoolean(true); + + ListCoder.of(ResourceIdCoder.of()).encode(new ArrayList<>(directories.keySet()), outStream); + KV> singleton = directories.values().iterator().next(); + outStream.writeUTF(singleton.getKey()); + outStream.writeObject(singleton.getValue()); + } else { + outStream.writeBoolean(false); + outStream.writeInt(directories.size()); + + for (Map.Entry>> entry : directories.entrySet()) { + ResourceIdCoder.of().encode(entry.getKey(), outStream); + outStream.writeUTF(entry.getValue().getKey()); + outStream.writeObject(entry.getValue().getValue()); + } + } outStream.flush(); } @SuppressWarnings("unchecked") private void readObject(ObjectInputStream inStream) throws ClassNotFoundException, IOException { this.tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream); - final int numDirectories = inStream.readInt(); - this.directories = new HashMap<>(); - for (int i = 0; i < numDirectories; i++) { - directories.put( - ResourceIdCoder.of().decode(inStream), - (KV>) inStream.readObject()); - } this.predicate = (Predicate) inStream.readObject(); this.keying = (Keying) inStream.readObject(); + + final boolean partitionsHaveSameFileType = inStream.readBoolean(); + if (partitionsHaveSameFileType) { + final List dirs = ListCoder.of(ResourceIdCoder.of()).decode(inStream); + final String filenameSuffix = inStream.readUTF(); + final FileOperations fileOperations = (FileOperations) inStream.readObject(); + this.directories = + dirs.stream() + .collect( + Collectors.toMap( + Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))); + } else { + final int numDirectories = inStream.readInt(); + this.directories = new HashMap<>(); + for (int i = 0; i < numDirectories; i++) { + directories.put( + ResourceIdCoder.of().decode(inStream), + KV.of(inStream.readUTF(), (FileOperations) inStream.readObject())); + } + } } } From 00f41dad8fffe0d58d07de64f605ab4633015222 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 14 Dec 2023 08:58:20 -0500 Subject: [PATCH 08/21] ParquetAvroFileOperations getCoder should match AvroFileOperations getCoder --- .../smb/ParquetAvroFileOperations.java | 17 +++++-- .../extensions/smb/SortedBucketSource.java | 1 - .../smb/SortedBucketSourceTest.java | 50 +++++++------------ 3 files changed, 29 insertions(+), 39 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java index d7f92cc272..78a4471a72 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java @@ -52,6 +52,8 @@ */ public class ParquetAvroFileOperations extends FileOperations { static final CompressionCodecName DEFAULT_COMPRESSION = CompressionCodecName.ZSTD; + + private final Class recordClass; private final SerializableSchemaSupplier schemaSupplier; private final CompressionCodecName compression; private final SerializableConfiguration conf; @@ -59,11 +61,13 @@ public class ParquetAvroFileOperations extends FileOperations { private ParquetAvroFileOperations( Schema schema, + Class recordClass, CompressionCodecName compression, Configuration conf, FilterPredicate predicate) { super(Compression.UNCOMPRESSED, MimeTypes.BINARY); this.schemaSupplier = new SerializableSchemaSupplier(schema); + this.recordClass = recordClass; this.compression = compression; this.conf = new SerializableConfiguration(conf); this.predicate = predicate; @@ -80,7 +84,7 @@ public static ParquetAvroFileOperations of( public static ParquetAvroFileOperations of( Schema schema, CompressionCodecName compression, Configuration conf) { - return new ParquetAvroFileOperations<>(schema, compression, conf, null); + return new ParquetAvroFileOperations<>(schema, null, compression, conf, null); } public static ParquetAvroFileOperations of( @@ -90,7 +94,7 @@ public static ParquetAvroFileOperations of( public static ParquetAvroFileOperations of( Schema schema, FilterPredicate predicate, Configuration conf) { - return new ParquetAvroFileOperations<>(schema, DEFAULT_COMPRESSION, conf, predicate); + return new ParquetAvroFileOperations<>(schema, null, DEFAULT_COMPRESSION, conf, predicate); } public static ParquetAvroFileOperations of(Class recordClass) { @@ -106,7 +110,7 @@ public static ParquetAvroFileOperations of( Class recordClass, CompressionCodecName compression, Configuration conf) { // Use reflection to get SR schema final Schema schema = new ReflectData(recordClass.getClassLoader()).getSchema(recordClass); - return new ParquetAvroFileOperations<>(schema, compression, conf, null); + return new ParquetAvroFileOperations<>(schema, recordClass, compression, conf, null); } public static ParquetAvroFileOperations of( @@ -118,7 +122,8 @@ public static ParquetAvroFileOperations of( Class recordClass, FilterPredicate predicate, Configuration conf) { // Use reflection to get SR schema final Schema schema = new ReflectData(recordClass.getClassLoader()).getSchema(recordClass); - return new ParquetAvroFileOperations<>(schema, DEFAULT_COMPRESSION, conf, predicate); + return new ParquetAvroFileOperations<>( + schema, recordClass, DEFAULT_COMPRESSION, conf, predicate); } @Override @@ -141,7 +146,9 @@ protected FileIO.Sink createSink() { @SuppressWarnings("unchecked") @Override public Coder getCoder() { - return (AvroCoder) AvroCoder.of(getSchema()); + return recordClass == null + ? (AvroCoder) AvroCoder.of(getSchema()) + : AvroCoder.of(recordClass, true); } Schema getSchema() { diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 6259eada37..1c51d40b76 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -485,7 +485,6 @@ public Predicate getPredicate() { } public Coder getCoder() { - // Todo what if two input directories use a different Coder for the same type? return directories.entrySet().iterator().next().getValue().getValue().getCoder(); } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java index a86f65d5af..8d0f02c0f7 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java @@ -45,9 +45,6 @@ import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; import org.apache.beam.sdk.extensions.smb.FileOperations.Writer; import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy.FileAssignment; @@ -64,8 +61,6 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; -import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -754,34 +749,23 @@ public void testMixedInputTypesInSource() throws Exception { final TupleTag tupleTag = new TupleTag<>("source-tag"); final PCollection> output = - pipeline - .apply( - Read.from( - new SortedBucketPrimaryKeyedSource<>( - String.class, - ImmutableList.of( - new PrimaryKeyedBucketedInput<>( - tupleTag, - ImmutableMap.of( - TestUtils.fromFolder(avroDir).toString(), - KV.of( - ".avro", - AvroFileOperations.of(AvroGeneratedUser.class)), - TestUtils.fromFolder(parquetDir).toString(), - KV.of( - ".parquet", - ParquetAvroFileOperations.of(AvroGeneratedUser.class))), - null)), - TargetParallelism.max(), - "metrics-key"))) - .setCoder( - KvCoder.of( - StringUtf8Coder.of(), - CoGbkResult.CoGbkResultCoder.of( - CoGbkResultSchema.of(ImmutableList.of(tupleTag)), - // Set reflect coder to map Utf8s to Strings - UnionCoder.of( - ImmutableList.of(AvroCoder.reflect(AvroGeneratedUser.class)))))); + pipeline.apply( + Read.from( + new SortedBucketPrimaryKeyedSource<>( + String.class, + ImmutableList.of( + new PrimaryKeyedBucketedInput<>( + tupleTag, + ImmutableMap.of( + TestUtils.fromFolder(avroDir).toString(), + KV.of(".avro", AvroFileOperations.of(AvroGeneratedUser.class)), + TestUtils.fromFolder(parquetDir).toString(), + KV.of( + ".parquet", + ParquetAvroFileOperations.of(AvroGeneratedUser.class))), + null)), + TargetParallelism.max(), + "metrics-key"))); PAssert.thatMap(output) .satisfies( From d62b24f3aa934c149cb4144c5778149d741cb12e Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 14 Dec 2023 09:28:05 -0500 Subject: [PATCH 09/21] Test specific + generic reads --- .../smb/SortedBucketSourceTest.java | 62 ++++++++++++++----- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java index 8d0f02c0f7..908aeb2db3 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/SortedBucketSourceTest.java @@ -44,6 +44,8 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; import org.apache.beam.sdk.extensions.smb.FileOperations.Writer; @@ -60,6 +62,8 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -694,7 +698,29 @@ public void testUniqueTupleTagIdSecondaryKey() { } @Test - public void testMixedInputTypesInSource() throws Exception { + public void testMixedSpecificAvroInputTypesInSource() throws Exception { + testBucketedInputOfPartitions( + AvroFileOperations.of(AvroGeneratedUser.class), + SerializableFunctions.identity(), + ParquetAvroFileOperations.of(AvroGeneratedUser.class), + SerializableFunctions.identity()); + } + + @Test + public void testMixedGenericAvroInputTypesInSource() throws Exception { + testBucketedInputOfPartitions( + AvroFileOperations.of(AvroGeneratedUser.getClassSchema()), + SortedBucketSourceTest::toAvroUserGenericRecord, + ParquetAvroFileOperations.of(AvroGeneratedUser.getClassSchema()), + SortedBucketSourceTest::toAvroUserGenericRecord); + } + + private void testBucketedInputOfPartitions( + FileOperations avroFileOp, + SerializableFunction avroToT, + FileOperations parquetFileOp, + SerializableFunction parquetToT) + throws Exception { final TemporaryFolder avroDir = new TemporaryFolder(); avroDir.create(); final TemporaryFolder parquetDir = new TemporaryFolder(); @@ -719,7 +745,7 @@ public void testMixedInputTypesInSource() throws Exception { write( avroPolicy.forDestination(), - new AvroBucketMetadata( + new AvroBucketMetadata( 1, 1, String.class, @@ -728,13 +754,13 @@ public void testMixedInputTypesInSource() throws Exception { null, BucketMetadata.HashType.MURMUR3_32, "bucket", - AvroGeneratedUser.class), - ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of(avroRecord)), - AvroFileOperations.of(AvroGeneratedUser.class)); + (Class) AvroGeneratedUser.class), + ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of((RecordT) avroRecord)), + avroFileOp); write( parquetPolicy.forDestination(), - new ParquetBucketMetadata( + new ParquetBucketMetadata( 1, 1, String.class, @@ -743,11 +769,11 @@ public void testMixedInputTypesInSource() throws Exception { null, BucketMetadata.HashType.MURMUR3_32, "bucket", - AvroGeneratedUser.class), - ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of(parquetRecord)), - ParquetAvroFileOperations.of(AvroGeneratedUser.class)); + (Class) AvroGeneratedUser.class), + ImmutableMap.of(BucketShardId.of(0, 0), ImmutableList.of((RecordT) parquetRecord)), + parquetFileOp); - final TupleTag tupleTag = new TupleTag<>("source-tag"); + final TupleTag tupleTag = new TupleTag<>("source-tag"); final PCollection> output = pipeline.apply( Read.from( @@ -758,11 +784,9 @@ public void testMixedInputTypesInSource() throws Exception { tupleTag, ImmutableMap.of( TestUtils.fromFolder(avroDir).toString(), - KV.of(".avro", AvroFileOperations.of(AvroGeneratedUser.class)), + KV.of(".avro", avroFileOp), TestUtils.fromFolder(parquetDir).toString(), - KV.of( - ".parquet", - ParquetAvroFileOperations.of(AvroGeneratedUser.class))), + KV.of(".parquet", parquetFileOp)), null)), TargetParallelism.max(), "metrics-key"))); @@ -772,7 +796,7 @@ public void testMixedInputTypesInSource() throws Exception { m -> { Assert.assertEquals(1, m.keySet().size()); Assert.assertEquals( - ImmutableSet.of(avroRecord, parquetRecord), + ImmutableSet.of(avroToT.apply(avroRecord), avroToT.apply(parquetRecord)), Sets.newHashSet(m.get("foo").getAll("source-tag"))); return null; }); @@ -1264,4 +1288,12 @@ static void verifyMetrics( Assert.assertEquals(expectedDistributions, actualDistributions); } + + private static GenericRecord toAvroUserGenericRecord(AvroGeneratedUser user) { + return new GenericRecordBuilder(AvroGeneratedUser.getClassSchema()) + .set("name", user.getName()) + .set("favorite_number", user.getFavoriteNumber()) + .set("favorite_color", user.getFavoriteColor()) + .build(); + } } From a45419c230e44be964d69babc88a79b3d154cfbc Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 14 Dec 2023 12:15:19 -0500 Subject: [PATCH 10/21] fetchMetadata must use a List for deterministic batching --- .../apache/beam/sdk/extensions/smb/BucketMetadataUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java index 9b09c53139..7a22bf22b6 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadataUtil.java @@ -69,7 +69,7 @@ int leastNumBuckets() { this.batchSize = batchSize; } - private Map> fetchMetadata(Set directories) { + private Map> fetchMetadata(List directories) { final int total = directories.size(); final Map> metadata = new ConcurrentHashMap<>(); int start = 0; @@ -89,7 +89,7 @@ private SourceMetadata getSourceMetadata( BiFunction, BucketMetadata, Boolean> compatibilityCompareFn) { final Map> bucketMetadatas = - fetchMetadata(directories.keySet()); + fetchMetadata(new ArrayList<>(directories.keySet())); Preconditions.checkState(!bucketMetadatas.isEmpty(), "Failed to find metadata"); Map> mapping = new HashMap<>(); From 28259813223dcd7448bfc0e66c717fb4549f29aa Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 14 Dec 2023 13:01:09 -0500 Subject: [PATCH 11/21] Make SortedBucketIO.Read extendable for multi-format --- .../beam/sdk/extensions/smb/AvroSortedBucketIO.java | 6 ++++++ .../beam/sdk/extensions/smb/JsonSortedBucketIO.java | 5 +++++ .../sdk/extensions/smb/ParquetAvroSortedBucketIO.java | 6 ++++++ .../apache/beam/sdk/extensions/smb/SortedBucketIO.java | 6 ------ .../beam/sdk/extensions/smb/SortedBucketSource.java | 10 ++++++++++ .../beam/sdk/extensions/smb/TensorFlowBucketIO.java | 5 +++++ .../sdk/extensions/smb/ParquetTypeSortedBucketIO.scala | 5 +++-- 7 files changed, 35 insertions(+), 8 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java index d013f6f215..a35c704128 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; /** API for reading and writing Avro sorted-bucket files. */ public class AvroSortedBucketIO { @@ -191,6 +192,11 @@ public static TransformOutput tran /** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); + + abstract String getFilenameSuffix(); + @Nullable abstract Schema getSchema(); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java index c001d8670c..443faa1496 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; /** API for reading and writing BigQuery {@link TableRow} JSON sorted-bucket files. */ public class JsonSortedBucketIO { @@ -107,6 +108,10 @@ public static TransformOutput transformOutput( */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); + + abstract String getFilenameSuffix(); abstract Compression getCompression(); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java index e4243d5968..688e686d94 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -199,6 +200,11 @@ public static TransformOutput tran /** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); + + abstract String getFilenameSuffix(); + @Nullable abstract Schema getSchema(); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java index 419db493ae..d1bbb584db 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java @@ -493,12 +493,6 @@ public abstract static class TransformOutput implements Serializable /** Represents a single sorted-bucket source written using {@link SortedBucketSink}. */ public abstract static class Read implements Serializable { - - @Nullable - abstract ImmutableList getInputDirectories(); - - abstract String getFilenameSuffix(); - public abstract TupleTag getTupleTag(); protected abstract BucketedInput toBucketedInput(SortedBucketSource.Keying keying); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 1c51d40b76..bbd1924cce 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -442,6 +442,16 @@ public static BucketedInput of( tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate); } + public static BucketedInput of( + Keying keying, + TupleTag tupleTag, + Map>> directories, + Predicate predicate) { + if (keying == Keying.PRIMARY) + return new PrimaryKeyedBucketedInput<>(tupleTag, directories, predicate); + return new PrimaryAndSecondaryKeyedBucktedInput<>(tupleTag, directories, predicate); + } + public BucketedInput( Keying keying, TupleTag tupleTag, diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java index bd509f74da..38e5188951 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.tensorflow.proto.example.Example; /** @@ -124,6 +125,10 @@ public static TransformOutput transformOutput( */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); + + abstract String getFilenameSuffix(); abstract Compression getCompression(); diff --git a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala index 3063a62bb4..42f8b28a00 100644 --- a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala +++ b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala @@ -81,9 +81,10 @@ object ParquetTypeSortedBucketIO { def withConfiguration(configuration: Configuration): Read[T] = this.copy(configuration = configuration) - override def getInputDirectories: ImmutableList[String] = + def getInputDirectories: ImmutableList[String] = ImmutableList.copyOf(inputDirectories.asJava: java.lang.Iterable[String]) - override def getFilenameSuffix: String = filenameSuffix + def getFilenameSuffix: String = filenameSuffix + override def getTupleTag: TupleTag[T] = tupleTag override protected def toBucketedInput( From a4e2193d888053fb071cb7c11c3360ce817c9bb9 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 14 Dec 2023 13:10:15 -0500 Subject: [PATCH 12/21] Fixup SortedBucketIO.Read signature --- .../apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java | 4 ---- .../apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java | 4 ---- .../beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java | 3 --- .../org/apache/beam/sdk/extensions/smb/SortedBucketIO.java | 3 +++ .../apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java | 4 ---- .../beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala | 2 +- 6 files changed, 4 insertions(+), 16 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java index a35c704128..e6265e6e3f 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; /** API for reading and writing Avro sorted-bucket files. */ public class AvroSortedBucketIO { @@ -192,9 +191,6 @@ public static TransformOutput tran /** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { - @Nullable - abstract ImmutableList getInputDirectories(); - abstract String getFilenameSuffix(); @Nullable diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java index 443faa1496..3a09223b95 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; /** API for reading and writing BigQuery {@link TableRow} JSON sorted-bucket files. */ public class JsonSortedBucketIO { @@ -108,9 +107,6 @@ public static TransformOutput transformOutput( */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { - @Nullable - abstract ImmutableList getInputDirectories(); - abstract String getFilenameSuffix(); abstract Compression getCompression(); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java index 688e686d94..f215e0d1e6 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -200,8 +199,6 @@ public static TransformOutput tran /** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { - @Nullable - abstract ImmutableList getInputDirectories(); abstract String getFilenameSuffix(); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java index d1bbb584db..b8dbdb0a6f 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java @@ -493,6 +493,9 @@ public abstract static class TransformOutput implements Serializable /** Represents a single sorted-bucket source written using {@link SortedBucketSink}. */ public abstract static class Read implements Serializable { + @Nullable + abstract ImmutableList getInputDirectories(); + public abstract TupleTag getTupleTag(); protected abstract BucketedInput toBucketedInput(SortedBucketSource.Keying keying); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java index 38e5188951..f89d897a23 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.tensorflow.proto.example.Example; /** @@ -125,9 +124,6 @@ public static TransformOutput transformOutput( */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { - @Nullable - abstract ImmutableList getInputDirectories(); - abstract String getFilenameSuffix(); abstract Compression getCompression(); diff --git a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala index 42f8b28a00..8f996fc765 100644 --- a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala +++ b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala @@ -81,7 +81,7 @@ object ParquetTypeSortedBucketIO { def withConfiguration(configuration: Configuration): Read[T] = this.copy(configuration = configuration) - def getInputDirectories: ImmutableList[String] = + override def getInputDirectories: ImmutableList[String] = ImmutableList.copyOf(inputDirectories.asJava: java.lang.Iterable[String]) def getFilenameSuffix: String = filenameSuffix From 1925d7efa243cbc4e4eeb81b28f41a73f2e0cfa5 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 14 Dec 2023 14:19:22 -0500 Subject: [PATCH 13/21] Make getInputDirectories() public for extensions --- .../java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java index b8dbdb0a6f..4458ae23da 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java @@ -494,7 +494,7 @@ public abstract static class TransformOutput implements Serializable /** Represents a single sorted-bucket source written using {@link SortedBucketSink}. */ public abstract static class Read implements Serializable { @Nullable - abstract ImmutableList getInputDirectories(); + public abstract ImmutableList getInputDirectories(); public abstract TupleTag getTupleTag(); From 875ae0eea06629980bf02a47bd22d7fe18304fbf Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 18 Dec 2023 09:14:29 -0500 Subject: [PATCH 14/21] PR comments --- .../beam/sdk/extensions/smb/AvroBucketMetadata.java | 4 ++-- .../apache/beam/sdk/extensions/smb/BucketMetadata.java | 4 ++-- .../beam/sdk/extensions/smb/JsonBucketMetadata.java | 4 ++-- .../beam/sdk/extensions/smb/ParquetBucketMetadata.java | 4 ++-- .../beam/sdk/extensions/smb/SortedBucketSource.java | 10 ++++++---- .../sdk/extensions/smb/TensorFlowBucketMetadata.java | 4 ++-- .../beam/sdk/extensions/smb/BucketMetadataTest.java | 4 ++-- .../beam/sdk/extensions/smb/TestBucketMetadata.java | 4 ++-- .../smb/TestBucketMetadataWithSecondary.java | 4 ++-- 9 files changed, 22 insertions(+), 20 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java index e71dbee0b7..f568422942 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java @@ -135,12 +135,12 @@ public Map, Coder> coderOverrides() { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java index 9749bfb768..10299f22df 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java @@ -327,9 +327,9 @@ public Set> compatibleMetadataTypes() { public abstract K2 extractKeySecondary(V value); - public abstract int hashPrimaryKeyMetadata(); + abstract int hashPrimaryKeyMetadata(); - public abstract int hashSecondaryKeyMetadata(); + abstract int hashSecondaryKeyMetadata(); public SortedBucketIO.ComparableKeyBytes primaryComparableKeyBytes(V value) { return new SortedBucketIO.ComparableKeyBytes(getKeyBytesPrimary(value), null); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java index 4b41a3bf6a..d986cf09be 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java @@ -148,12 +148,12 @@ public void populateDisplayData(Builder builder) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java index 2c828efb02..dbe2aeeeb9 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java @@ -192,12 +192,12 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index bbd1924cce..2db687bb06 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -495,7 +495,9 @@ public Predicate getPredicate() { } public Coder getCoder() { - return directories.entrySet().iterator().next().getValue().getValue().getCoder(); + final KV> sampledSource = + directories.entrySet().iterator().next().getValue(); + return sampledSource.getValue().getCoder(); } static CoGbkResultSchema schemaOf(List> sources) { @@ -523,13 +525,13 @@ long getOrSampleByteSize() { entry -> { // Take at most 10 buckets from the directory to sample // Check for single-shard filenames template first, then multi-shard + final String filenameSuffix = entry.getValue().getKey(); List sampledFiles = - sampleDirectory(entry.getKey(), "*-0000?-of-?????" + entry.getValue().getKey()); + sampleDirectory(entry.getKey(), "*-0000?-of-?????" + filenameSuffix); if (sampledFiles.isEmpty()) { sampledFiles = sampleDirectory( - entry.getKey(), - "*-0000?-of-*-shard-00000-of-?????" + entry.getValue().getKey()); + entry.getKey(), "*-0000?-of-*-shard-00000-of-?????" + filenameSuffix); } int numBuckets = 0; diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java index 0611b4288d..8b5255bb59 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java @@ -159,12 +159,12 @@ public void populateDisplayData(Builder builder) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java index 0396c4b26e..0e34b19f8e 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java @@ -75,12 +75,12 @@ public Void extractKeySecondary(Object value) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return -1; } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return -1; } }); diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java index 6dc01e0edb..106c98c16b 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java @@ -99,12 +99,12 @@ public Void extractKeySecondary(final String value) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(getClass(), keyIndex); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { throw new IllegalArgumentException(); } } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java index 8a4d02b4d6..d2a0bb9eb0 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java @@ -97,12 +97,12 @@ public String extractKeySecondary(final String value) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyIndex); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyIndexSecondary); } } From ab1ec152b592585d642277feb18b3e4e65797bf7 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 18 Dec 2023 11:31:26 -0500 Subject: [PATCH 15/21] Encode BucketedInput as indexed map of FileOperations/FileSuffixes --- .../sdk/extensions/smb/FileOperations.java | 26 ++++++ .../extensions/smb/SortedBucketSource.java | 85 ++++++++++--------- 2 files changed, 69 insertions(+), 42 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java index d65d555462..3da0c88a58 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java @@ -18,6 +18,10 @@ package org.apache.beam.sdk.extensions.smb; import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; import java.io.Serializable; import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; @@ -32,6 +36,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; @@ -249,4 +255,24 @@ private ReadableFile toReadableFile(ResourceId resourceId) { throw new RuntimeException(String.format("Exception opening bucket file %s", resourceId), e); } } + + static class FileOperationsCoder extends CustomCoder> { + @Override + public void encode(FileOperations value, OutputStream outStream) + throws CoderException, IOException { + final ObjectOutputStream oos = new ObjectOutputStream(outStream); + oos.writeObject(value); + oos.flush(); + } + + @Override + public FileOperations decode(InputStream inStream) throws CoderException, IOException { + final ObjectInputStream ois = new ObjectInputStream(inStream); + try { + return (FileOperations) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new CoderException(e); + } + } + } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 2db687bb06..cde10926c5 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -45,8 +45,10 @@ import java.util.stream.IntStream; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.smb.BucketMetadataUtil.SourceMetadata; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; @@ -629,30 +631,32 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { outStream.writeObject(predicate); outStream.writeObject(keying); - long numDistinctFileSuffixes = - directories.values().stream().map(KV::getKey).distinct().count(); - long numDistinctFileOperations = - directories.values().stream().map(kv -> kv.getValue().getClass()).distinct().count(); - - // If all partitions use the same file operations type, don't keep re-encoding it - if (numDistinctFileSuffixes == 1 && numDistinctFileOperations == 1) { - outStream.writeBoolean(true); - - ListCoder.of(ResourceIdCoder.of()).encode(new ArrayList<>(directories.keySet()), outStream); - KV> singleton = directories.values().iterator().next(); - outStream.writeUTF(singleton.getKey()); - outStream.writeObject(singleton.getValue()); - } else { - outStream.writeBoolean(false); - outStream.writeInt(directories.size()); - - for (Map.Entry>> entry : directories.entrySet()) { - ResourceIdCoder.of().encode(entry.getKey(), outStream); - outStream.writeUTF(entry.getValue().getKey()); - outStream.writeObject(entry.getValue().getValue()); + // Map distinct FileOperations/FileSuffixes to indices in a map, for efficient encoding of + // large BucketedInputs + final Map, Integer> fileOperationsMetadata = new HashMap<>(); + final Map>> fileOperationsEncoding = new HashMap<>(); + final Map directoriesEncoding = new HashMap<>(); + int i = 0; + + for (Map.Entry>> entry : directories.entrySet()) { + final KV> fileOps = entry.getValue(); + final KV metadataKey = + KV.of(fileOps.getKey(), fileOps.getValue().getClass().getName()); + if (!fileOperationsMetadata.containsKey(metadataKey)) { + fileOperationsMetadata.put(metadataKey, i); + fileOperationsEncoding.put(i, KV.of(fileOps.getKey(), fileOps.getValue())); + i++; } + directoriesEncoding.put(entry.getKey(), fileOperationsMetadata.get(metadataKey)); } - outStream.flush(); + + final Coder> fileOperationsCoder = + new FileOperations.FileOperationsCoder<>(); + + MapCoder.of(VarIntCoder.of(), KvCoder.of(StringUtf8Coder.of(), fileOperationsCoder)) + .encode(fileOperationsEncoding, outStream); + + MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()).encode(directoriesEncoding, outStream); } @SuppressWarnings("unchecked") @@ -661,25 +665,22 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio this.predicate = (Predicate) inStream.readObject(); this.keying = (Keying) inStream.readObject(); - final boolean partitionsHaveSameFileType = inStream.readBoolean(); - if (partitionsHaveSameFileType) { - final List dirs = ListCoder.of(ResourceIdCoder.of()).decode(inStream); - final String filenameSuffix = inStream.readUTF(); - final FileOperations fileOperations = (FileOperations) inStream.readObject(); - this.directories = - dirs.stream() - .collect( - Collectors.toMap( - Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))); - } else { - final int numDirectories = inStream.readInt(); - this.directories = new HashMap<>(); - for (int i = 0; i < numDirectories; i++) { - directories.put( - ResourceIdCoder.of().decode(inStream), - KV.of(inStream.readUTF(), (FileOperations) inStream.readObject())); - } - } + final Coder> fileOperationsCoder = + new FileOperations.FileOperationsCoder<>(); + + final Map>> fileOperationsEncoding = + MapCoder.of(VarIntCoder.of(), KvCoder.of(StringUtf8Coder.of(), fileOperationsCoder)) + .decode(inStream); + + final Map directoriesEncoding = + MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()).decode(inStream); + + this.directories = + directoriesEncoding.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + dirAndIndex -> fileOperationsEncoding.get(dirAndIndex.getValue()))); } } From 25dbe46aac07a8df1515664429a9c6606e116544 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 19 Dec 2023 08:44:32 -0500 Subject: [PATCH 16/21] Add BucketedInput#getInputs to unlock SMB taps --- .../extensions/smb/AvroSortedBucketIO.java | 6 ++++- .../extensions/smb/JsonSortedBucketIO.java | 6 ++++- .../smb/ParquetAvroSortedBucketIO.java | 5 +++- .../sdk/extensions/smb/SortedBucketIO.java | 5 +--- .../extensions/smb/SortedBucketSource.java | 26 ++++++++++--------- .../extensions/smb/TensorFlowBucketIO.java | 6 ++++- .../smb/ParquetTypeSortedBucketIO.scala | 4 +-- .../extensions/smb/SortedBucketIOUtil.scala | 11 +++++++- 8 files changed, 46 insertions(+), 23 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java index e6265e6e3f..5ae1cfa572 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; /** API for reading and writing Avro sorted-bucket files. */ public class AvroSortedBucketIO { @@ -191,6 +192,9 @@ public static TransformOutput tran /** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); + abstract String getFilenameSuffix(); @Nullable @@ -244,7 +248,7 @@ public Read withPredicate(Predicate predicate) { } @Override - protected SortedBucketSource.BucketedInput toBucketedInput( + public SortedBucketSource.BucketedInput toBucketedInput( final SortedBucketSource.Keying keying) { @SuppressWarnings("unchecked") final AvroFileOperations fileOperations = diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java index 3a09223b95..7d825704ef 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; /** API for reading and writing BigQuery {@link TableRow} JSON sorted-bucket files. */ public class JsonSortedBucketIO { @@ -107,6 +108,9 @@ public static TransformOutput transformOutput( */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); + abstract String getFilenameSuffix(); abstract Compression getCompression(); @@ -153,7 +157,7 @@ public Read withPredicate(Predicate predicate) { } @Override - protected BucketedInput toBucketedInput(final SortedBucketSource.Keying keying) { + public BucketedInput toBucketedInput(final SortedBucketSource.Keying keying) { return BucketedInput.of( keying, getTupleTag(), diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java index f215e0d1e6..11dbe5ea65 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -199,6 +200,8 @@ public static TransformOutput tran /** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); abstract String getFilenameSuffix(); @@ -272,7 +275,7 @@ public Read withConfiguration(Configuration configuration) { } @Override - protected BucketedInput toBucketedInput(final SortedBucketSource.Keying keying) { + public BucketedInput toBucketedInput(final SortedBucketSource.Keying keying) { final Schema schema = getRecordClass() == null ? getSchema() diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java index 4458ae23da..258a8da0cf 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java @@ -493,12 +493,9 @@ public abstract static class TransformOutput implements Serializable /** Represents a single sorted-bucket source written using {@link SortedBucketSink}. */ public abstract static class Read implements Serializable { - @Nullable - public abstract ImmutableList getInputDirectories(); - public abstract TupleTag getTupleTag(); - protected abstract BucketedInput toBucketedInput(SortedBucketSource.Keying keying); + public abstract BucketedInput toBucketedInput(SortedBucketSource.Keying keying); } @FunctionalInterface diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index cde10926c5..05cb011157 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -378,7 +378,7 @@ public PrimaryKeyedBucketedInput( public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) - sourceMetadata = BucketMetadataUtil.get().getPrimaryKeyedSourceMetadata(directories); + sourceMetadata = BucketMetadataUtil.get().getPrimaryKeyedSourceMetadata(inputs); return sourceMetadata; } } @@ -408,8 +408,7 @@ public PrimaryAndSecondaryKeyedBucktedInput( public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) - sourceMetadata = - BucketMetadataUtil.get().getPrimaryAndSecondaryKeyedSourceMetadata(directories); + sourceMetadata = BucketMetadataUtil.get().getPrimaryAndSecondaryKeyedSourceMetadata(inputs); return sourceMetadata; } } @@ -424,7 +423,7 @@ public abstract static class BucketedInput implements Serializable { private static final Pattern BUCKET_PATTERN = Pattern.compile("(\\d+)-of-(\\d+)"); protected TupleTag tupleTag; - protected Map>> directories; + protected Map>> inputs; protected Predicate predicate; protected Keying keying; // lazy, internal checks depend on what kind of iteration is requested @@ -478,7 +477,7 @@ public BucketedInput( Predicate predicate) { this.keying = keying; this.tupleTag = tupleTag; - this.directories = + this.inputs = directories.entrySet().stream() .collect( Collectors.toMap( @@ -496,9 +495,13 @@ public Predicate getPredicate() { return predicate; } + public Map>> getInputs() { + return inputs; + } + public Coder getCoder() { final KV> sampledSource = - directories.entrySet().iterator().next().getValue(); + inputs.entrySet().iterator().next().getValue(); return sampledSource.getValue().getCoder(); } @@ -520,7 +523,7 @@ private static List sampleDirectory(ResourceId directory, String filep } long getOrSampleByteSize() { - return directories + return inputs .entrySet() .parallelStream() .mapToLong( @@ -596,8 +599,7 @@ public KeyGroupIterator createIterator( try { Iterator> iterator = Iterators.transform( - directories.get(dir).getValue().iterator(file), - v -> KV.of(keyFn.apply(v), v)); + inputs.get(dir).getValue().iterator(file), v -> KV.of(keyFn.apply(v), v)); Iterator> out = (bufferSize > 0) ? new BufferedIterator<>(iterator, bufferSize) : iterator; iterators.add(out); @@ -612,7 +614,7 @@ public KeyGroupIterator createIterator( @Override public String toString() { - List inputDirectories = new ArrayList<>(directories.keySet()); + List inputDirectories = new ArrayList<>(inputs.keySet()); return String.format( "BucketedInput[tupleTag=%s, inputDirectories=[%s]]", tupleTag.getId(), @@ -638,7 +640,7 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { final Map directoriesEncoding = new HashMap<>(); int i = 0; - for (Map.Entry>> entry : directories.entrySet()) { + for (Map.Entry>> entry : inputs.entrySet()) { final KV> fileOps = entry.getValue(); final KV metadataKey = KV.of(fileOps.getKey(), fileOps.getValue().getClass().getName()); @@ -675,7 +677,7 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio final Map directoriesEncoding = MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()).decode(inStream); - this.directories = + this.inputs = directoriesEncoding.entrySet().stream() .collect( Collectors.toMap( diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java index f89d897a23..25e2042b1b 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketIO.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.tensorflow.proto.example.Example; /** @@ -124,6 +125,9 @@ public static TransformOutput transformOutput( */ @AutoValue public abstract static class Read extends SortedBucketIO.Read { + @Nullable + abstract ImmutableList getInputDirectories(); + abstract String getFilenameSuffix(); abstract Compression getCompression(); @@ -166,7 +170,7 @@ public Read withPredicate(Predicate predicate) { } @Override - protected BucketedInput toBucketedInput(final SortedBucketSource.Keying keying) { + public BucketedInput toBucketedInput(final SortedBucketSource.Keying keying) { return BucketedInput.of( keying, getTupleTag(), diff --git a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala index 8f996fc765..bc9b512db7 100644 --- a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala +++ b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala @@ -81,13 +81,13 @@ object ParquetTypeSortedBucketIO { def withConfiguration(configuration: Configuration): Read[T] = this.copy(configuration = configuration) - override def getInputDirectories: ImmutableList[String] = + def getInputDirectories: ImmutableList[String] = ImmutableList.copyOf(inputDirectories.asJava: java.lang.Iterable[String]) def getFilenameSuffix: String = filenameSuffix override def getTupleTag: TupleTag[T] = tupleTag - override protected def toBucketedInput( + override def toBucketedInput( keying: SortedBucketSource.Keying ): SortedBucketSource.BucketedInput[T] = { val fileOperations = ParquetTypeFileOperations[T](filterPredicate, configuration) diff --git a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/SortedBucketIOUtil.scala b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/SortedBucketIOUtil.scala index e60337b282..e11fddfa6f 100644 --- a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/SortedBucketIOUtil.scala +++ b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/SortedBucketIOUtil.scala @@ -23,7 +23,16 @@ import scala.jdk.CollectionConverters._ object SortedBucketIOUtil { def testId(read: beam.SortedBucketIO.Read[_]): String = - scio.SortedBucketIO.testId(read.getInputDirectories.asScala.toSeq: _*) + scio.SortedBucketIO.testId( + read + .toBucketedInput(SortedBucketSource.Keying.PRIMARY) + .getInputs + .asScala + .toSeq + .map { case (rId, _) => + s"${rId.getCurrentDirectory}${Option(rId.getFilename).getOrElse("")}" + }: _* + ) def testId(write: beam.SortedBucketIO.Write[_, _, _]): String = scio.SortedBucketIO.testId(write.getOutputDirectory.toString) From 035066483291dd6fc5f3eaf2c3834f823fa62ec0 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 19 Dec 2023 11:15:40 -0500 Subject: [PATCH 17/21] Use AvroCoder.reflect --- .../beam/sdk/extensions/smb/ParquetAvroFileOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java index 78a4471a72..3642a3090a 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java @@ -148,7 +148,7 @@ protected FileIO.Sink createSink() { public Coder getCoder() { return recordClass == null ? (AvroCoder) AvroCoder.of(getSchema()) - : AvroCoder.of(recordClass, true); + : AvroCoder.reflect(recordClass); } Schema getSchema() { From ae11f9ab635947555fa6fadad9853bf6581274d8 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 19 Dec 2023 11:17:38 -0500 Subject: [PATCH 18/21] Use local variable --- .../apache/beam/sdk/extensions/smb/SortedBucketSource.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 05cb011157..86270fb220 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -531,12 +531,13 @@ long getOrSampleByteSize() { // Take at most 10 buckets from the directory to sample // Check for single-shard filenames template first, then multi-shard final String filenameSuffix = entry.getValue().getKey(); + final ResourceId directory = entry.getKey(); List sampledFiles = - sampleDirectory(entry.getKey(), "*-0000?-of-?????" + filenameSuffix); + sampleDirectory(directory, "*-0000?-of-?????" + filenameSuffix); if (sampledFiles.isEmpty()) { sampledFiles = sampleDirectory( - entry.getKey(), "*-0000?-of-*-shard-00000-of-?????" + filenameSuffix); + directory, "*-0000?-of-*-shard-00000-of-?????" + filenameSuffix); } int numBuckets = 0; @@ -559,7 +560,7 @@ long getOrSampleByteSize() { } if (numBuckets == 0) { throw new IllegalArgumentException( - "Directory " + entry.getKey() + " has no bucket files"); + "Directory " + directory + " has no bucket files"); } if (seenBuckets.size() < numBuckets) { return (long) (sampledBytes * (numBuckets / (seenBuckets.size() * 1.0))); From 8f39bd16c0b78417da4d2de84e540f5ee09713cd Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 5 Jan 2024 08:59:05 -0500 Subject: [PATCH 19/21] Use SerializableCoder --- .../sdk/extensions/smb/FileOperations.java | 26 ------------------- .../extensions/smb/SortedBucketSource.java | 23 +++++++++------- 2 files changed, 14 insertions(+), 35 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java index 3da0c88a58..d65d555462 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java @@ -18,10 +18,6 @@ package org.apache.beam.sdk.extensions.smb; import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; import java.io.Serializable; import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; @@ -36,8 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; @@ -255,24 +249,4 @@ private ReadableFile toReadableFile(ResourceId resourceId) { throw new RuntimeException(String.format("Exception opening bucket file %s", resourceId), e); } } - - static class FileOperationsCoder extends CustomCoder> { - @Override - public void encode(FileOperations value, OutputStream outStream) - throws CoderException, IOException { - final ObjectOutputStream oos = new ObjectOutputStream(outStream); - oos.writeObject(value); - oos.flush(); - } - - @Override - public FileOperations decode(InputStream inStream) throws CoderException, IOException { - final ObjectInputStream ois = new ObjectInputStream(inStream); - try { - return (FileOperations) ois.readObject(); - } catch (ClassNotFoundException e) { - throw new CoderException(e); - } - } - } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 86270fb220..13bcf3e4ab 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -637,7 +637,7 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { // Map distinct FileOperations/FileSuffixes to indices in a map, for efficient encoding of // large BucketedInputs final Map, Integer> fileOperationsMetadata = new HashMap<>(); - final Map>> fileOperationsEncoding = new HashMap<>(); + final Map> fileOperationsEncoding = new HashMap<>(); final Map directoriesEncoding = new HashMap<>(); int i = 0; @@ -653,8 +653,8 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { directoriesEncoding.put(entry.getKey(), fileOperationsMetadata.get(metadataKey)); } - final Coder> fileOperationsCoder = - new FileOperations.FileOperationsCoder<>(); + final SerializableCoder fileOperationsCoder = + SerializableCoder.of(FileOperations.class); MapCoder.of(VarIntCoder.of(), KvCoder.of(StringUtf8Coder.of(), fileOperationsCoder)) .encode(fileOperationsEncoding, outStream); @@ -668,11 +668,10 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio this.predicate = (Predicate) inStream.readObject(); this.keying = (Keying) inStream.readObject(); - final Coder> fileOperationsCoder = - new FileOperations.FileOperationsCoder<>(); - - final Map>> fileOperationsEncoding = - MapCoder.of(VarIntCoder.of(), KvCoder.of(StringUtf8Coder.of(), fileOperationsCoder)) + final Map> fileOperationsEncoding = + MapCoder.of( + VarIntCoder.of(), + KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(FileOperations.class))) .decode(inStream); final Map directoriesEncoding = @@ -683,7 +682,13 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio .collect( Collectors.toMap( Map.Entry::getKey, - dirAndIndex -> fileOperationsEncoding.get(dirAndIndex.getValue()))); + dirAndIndex -> { + final String dir = + fileOperationsEncoding.get(dirAndIndex.getValue()).getKey(); + final FileOperations fileOps = + fileOperationsEncoding.get(dirAndIndex.getValue()).getValue(); + return KV.of(dir, fileOps); + })); } } From 4271462ccfb8dd95e847bc98072f3b990d4dea77 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 5 Jan 2024 09:04:11 -0500 Subject: [PATCH 20/21] cleanup --- .../apache/beam/sdk/extensions/smb/SortedBucketSource.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 13bcf3e4ab..28e3d95e75 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -653,10 +653,9 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { directoriesEncoding.put(entry.getKey(), fileOperationsMetadata.get(metadataKey)); } - final SerializableCoder fileOperationsCoder = - SerializableCoder.of(FileOperations.class); - - MapCoder.of(VarIntCoder.of(), KvCoder.of(StringUtf8Coder.of(), fileOperationsCoder)) + MapCoder.of( + VarIntCoder.of(), + KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(FileOperations.class))) .encode(fileOperationsEncoding, outStream); MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()).encode(directoriesEncoding, outStream); From 29a6295358ca71bc268d6e99928241ac87d945e5 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 5 Jan 2024 09:08:52 -0500 Subject: [PATCH 21/21] move coders to static variables --- .../extensions/smb/SortedBucketSource.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 28e3d95e75..1a209c8964 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -429,6 +429,15 @@ public abstract static class BucketedInput implements Serializable { // lazy, internal checks depend on what kind of iteration is requested protected transient SourceMetadata sourceMetadata = null; // lazy + // Used to efficiently serialize BucketedInput instances + private static Coder> directoriesEncodingCoder = + MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()); + + private static Coder>> fileOperationsEncodingCoder = + MapCoder.of( + VarIntCoder.of(), + KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(FileOperations.class))); + public static BucketedInput of( Keying keying, TupleTag tupleTag, @@ -653,12 +662,8 @@ private void writeObject(ObjectOutputStream outStream) throws IOException { directoriesEncoding.put(entry.getKey(), fileOperationsMetadata.get(metadataKey)); } - MapCoder.of( - VarIntCoder.of(), - KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(FileOperations.class))) - .encode(fileOperationsEncoding, outStream); - - MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()).encode(directoriesEncoding, outStream); + fileOperationsEncodingCoder.encode(fileOperationsEncoding, outStream); + directoriesEncodingCoder.encode(directoriesEncoding, outStream); } @SuppressWarnings("unchecked") @@ -668,13 +673,9 @@ private void readObject(ObjectInputStream inStream) throws ClassNotFoundExceptio this.keying = (Keying) inStream.readObject(); final Map> fileOperationsEncoding = - MapCoder.of( - VarIntCoder.of(), - KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(FileOperations.class))) - .decode(inStream); - + fileOperationsEncodingCoder.decode(inStream); final Map directoriesEncoding = - MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()).decode(inStream); + directoriesEncodingCoder.decode(inStream); this.inputs = directoriesEncoding.entrySet().stream()