From 3323edaf2895ec81e8d84586088daf90927ebae6 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 23 Feb 2024 09:51:52 -0500 Subject: [PATCH] Simplify BucketedInput serialization (#5270) --- .../extensions/smb/AvroFileOperations.java | 20 --- .../sdk/extensions/smb/FileOperations.java | 13 -- .../smb/ParquetAvroFileOperations.java | 35 ----- .../extensions/smb/SortedBucketSource.java | 125 +++++++----------- .../smb/ParquetTypeFileOperations.scala | 20 --- .../smb/AvroFileOperationsTest.java | 8 -- .../smb/JsonFileOperationsTest.java | 3 - .../smb/ParquetAvroFileOperationsTest.java | 42 ------ .../smb/TensorFlowFileOperationsTest.java | 3 - .../smb/ParquetTypeFileOperationsTest.scala | 26 ---- 10 files changed, 46 insertions(+), 249 deletions(-) diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java index f4cbb5cf21..f12831ca5d 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java @@ -22,7 +22,6 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Map; -import java.util.Objects; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileStream; @@ -111,25 +110,6 @@ Schema getSchema() { return schemaSupplier.get(); } - @Override - public int hashCode() { - return Objects.hash(getSchema(), codec.getCodec(), metadata, datumFactory); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object obj) { - if (!(obj instanceof AvroFileOperations)) { - return false; - } - final AvroFileOperations that = (AvroFileOperations) obj; - return that.getSchema().equals(this.getSchema()) - && that.codec.getCodec().toString().equals(this.codec.getCodec().toString()) - && ((that.metadata == null && this.metadata == null) - || (this.metadata.equals(that.metadata))) - && that.datumFactory.equals(this.datumFactory); - } - private static class SerializableSchemaString implements Serializable { private final String schema; diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java index 1596d44a12..d65d555462 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java @@ -28,7 +28,6 @@ import java.nio.file.StandardOpenOption; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -142,18 +141,6 @@ public void populateDisplayData(Builder builder) { builder.add(DisplayData.item("mimeType", mimeType)); } - @Override - public int hashCode() { - return Objects.hash(getClass().getName(), compression, mimeType); - } - - @Override - public boolean equals(Object obj) { - return obj.getClass() == getClass() - && this.compression == ((FileOperations) obj).compression - && this.mimeType.equals(((FileOperations) obj).mimeType); - } - /** Per-element file reader. */ public abstract static class Reader implements Serializable { private transient Supplier cleanupFn = null; diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java index 6de73c5833..9ee198f428 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java @@ -20,11 +20,7 @@ import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; -import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -129,37 +125,6 @@ Schema getSchema() { return schemaSupplier.get(); } - @Override - public int hashCode() { - return Objects.hash( - getSchema(), - compression, - conf.get(), - projectionSupplier != null ? projectionSupplier.get() : null, - predicate); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object obj) { - if (!(obj instanceof ParquetAvroFileOperations)) { - return false; - } - final ParquetAvroFileOperations that = (ParquetAvroFileOperations) obj; - - return that.getSchema().equals(this.getSchema()) - && that.compression.name().equals(this.compression.name()) - && ((that.projectionSupplier == null && this.projectionSupplier == null) - || (this.projectionSupplier.get().equals(that.projectionSupplier.get()))) - && ((that.predicate == null && this.predicate == null) - || (that.predicate.equals(this.predicate))) - && StreamSupport.stream(that.conf.get().spliterator(), false) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) - .equals( - StreamSupport.stream(this.conf.get().spliterator(), false) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); - } - //////////////////////////////////////// // Reader //////////////////////////////////////// diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index 1fc2526367..78f88e11b4 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -22,8 +22,6 @@ import com.google.common.base.Preconditions; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.text.DecimalFormat; import java.util.ArrayList; @@ -45,17 +43,12 @@ import java.util.stream.IntStream; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.smb.BucketMetadataUtil.SourceMetadata; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.fs.ResourceIdCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -373,7 +366,7 @@ public PrimaryKeyedBucketedInput( public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) - sourceMetadata = BucketMetadataUtil.get().getPrimaryKeyedSourceMetadata(inputs); + sourceMetadata = BucketMetadataUtil.get().getPrimaryKeyedSourceMetadata(getInputs()); return sourceMetadata; } } @@ -403,7 +396,8 @@ public PrimaryAndSecondaryKeyedBucktedInput( public SourceMetadata getSourceMetadata() { if (sourceMetadata == null) - sourceMetadata = BucketMetadataUtil.get().getPrimaryAndSecondaryKeyedSourceMetadata(inputs); + sourceMetadata = + BucketMetadataUtil.get().getPrimaryAndSecondaryKeyedSourceMetadata(getInputs()); return sourceMetadata; } } @@ -416,22 +410,15 @@ public SourceMetadata getSourceMetadata() { */ public abstract static class BucketedInput implements Serializable { private static final Pattern BUCKET_PATTERN = Pattern.compile("(\\d+)-of-(\\d+)"); - protected TupleTag tupleTag; - protected Map>> inputs; protected Predicate predicate; protected Keying keying; // lazy, internal checks depend on what kind of iteration is requested protected transient SourceMetadata sourceMetadata = null; // lazy - // Used to efficiently serialize BucketedInput instances - private static Coder> directoriesEncodingCoder = - MapCoder.of(ResourceIdCoder.of(), VarIntCoder.of()); - - private static Coder>> fileOperationsEncodingCoder = - MapCoder.of( - VarIntCoder.of(), - KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(FileOperations.class))); + private transient Map>> inputs; + private final Map>> fileOperationsEncoding; + private final Map directoriesEncoding; public static BucketedInput of( Keying keying, @@ -486,6 +473,26 @@ public BucketedInput( .collect( Collectors.toMap( e -> FileSystems.matchNewResource(e.getKey(), true), Map.Entry::getValue)); + + // Map distinct FileOperations/FileSuffixes to indices in a map, for efficient encoding of + // large BucketedInputs + final Map, Integer> fileOperationsMetadata = new HashMap<>(); + fileOperationsEncoding = new HashMap<>(); + directoriesEncoding = new HashMap<>(); + + int i = 0; + for (Map.Entry>> entry : inputs.entrySet()) { + final KV> fileOps = entry.getValue(); + final KV metadataKey = + KV.of(fileOps.getKey(), fileOps.getValue().getClass().getName()); + if (!fileOperationsMetadata.containsKey(metadataKey)) { + fileOperationsMetadata.put(metadataKey, i); + fileOperationsEncoding.put(i, KV.of(fileOps.getKey(), fileOps.getValue())); + i++; + } + directoriesEncoding.put(entry.getKey(), fileOperationsMetadata.get(metadataKey)); + } + this.predicate = predicate; } @@ -499,13 +506,28 @@ public Predicate getPredicate() { return predicate; } + @SuppressWarnings("unchecked") public Map>> getInputs() { + if (inputs == null) { + this.inputs = + directoriesEncoding.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + dirAndIndex -> { + final String dir = + fileOperationsEncoding.get(dirAndIndex.getValue()).getKey(); + final FileOperations fileOps = + fileOperationsEncoding.get(dirAndIndex.getValue()).getValue(); + return KV.of(dir, fileOps); + })); + } return inputs; } public Coder getCoder() { final KV> sampledSource = - inputs.entrySet().iterator().next().getValue(); + getInputs().entrySet().iterator().next().getValue(); return sampledSource.getValue().getCoder(); } @@ -527,7 +549,7 @@ private static List sampleDirectory(ResourceId directory, String filep } long getOrSampleByteSize() { - return inputs + return getInputs() .entrySet() .parallelStream() .mapToLong( @@ -604,7 +626,8 @@ public KeyGroupIterator createIterator( try { Iterator> iterator = Iterators.transform( - inputs.get(dir).getValue().iterator(file), v -> KV.of(keyFn.apply(v), v)); + getInputs().get(dir).getValue().iterator(file), + v -> KV.of(keyFn.apply(v), v)); Iterator> out = (bufferSize > 0) ? new BufferedIterator<>(iterator, bufferSize) : iterator; iterators.add(out); @@ -619,7 +642,7 @@ public KeyGroupIterator createIterator( @Override public String toString() { - List inputDirectories = new ArrayList<>(inputs.keySet()); + List inputDirectories = new ArrayList<>(getInputs().keySet()); return String.format( "BucketedInput[tupleTag=%s, inputDirectories=[%s]]", tupleTag.getId(), @@ -629,62 +652,6 @@ public String toString() { + inputDirectories.get(inputDirectories.size() - 1) : inputDirectories); } - - // Not all instance members can be natively serialized, so override writeObject/readObject - // using Coders for each type - @SuppressWarnings("unchecked") - private void writeObject(ObjectOutputStream outStream) throws IOException { - SerializableCoder.of(TupleTag.class).encode(tupleTag, outStream); - outStream.writeObject(predicate); - outStream.writeObject(keying); - - // Map distinct FileOperations/FileSuffixes to indices in a map, for efficient encoding of - // large BucketedInputs - final Map, Integer> fileOperationsMetadata = new HashMap<>(); - final Map> fileOperationsEncoding = new HashMap<>(); - final Map directoriesEncoding = new HashMap<>(); - int i = 0; - - for (Map.Entry>> entry : inputs.entrySet()) { - final KV> fileOps = entry.getValue(); - final KV metadataKey = - KV.of(fileOps.getKey(), fileOps.getValue().getClass().getName()); - if (!fileOperationsMetadata.containsKey(metadataKey)) { - fileOperationsMetadata.put(metadataKey, i); - fileOperationsEncoding.put(i, KV.of(fileOps.getKey(), fileOps.getValue())); - i++; - } - directoriesEncoding.put(entry.getKey(), fileOperationsMetadata.get(metadataKey)); - } - - fileOperationsEncodingCoder.encode(fileOperationsEncoding, outStream); - directoriesEncodingCoder.encode(directoriesEncoding, outStream); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream inStream) throws ClassNotFoundException, IOException { - this.tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream); - this.predicate = (Predicate) inStream.readObject(); - this.keying = (Keying) inStream.readObject(); - - final Map> fileOperationsEncoding = - fileOperationsEncodingCoder.decode(inStream); - final Map directoriesEncoding = - directoriesEncodingCoder.decode(inStream); - - this.inputs = - directoriesEncoding.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - dirAndIndex -> { - final String dir = - fileOperationsEncoding.get(dirAndIndex.getValue()).getKey(); - final FileOperations fileOps = - fileOperationsEncoding.get(dirAndIndex.getValue()).getValue(); - return KV.of(dir, fileOps); - })); - } } /** diff --git a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala index 9a1e4c1456..ee99146679 100644 --- a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala +++ b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala @@ -30,10 +30,7 @@ import org.apache.parquet.filter2.predicate.FilterPredicate import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter} import org.apache.parquet.hadoop.metadata.CompressionCodecName -import scala.jdk.CollectionConverters._ - import java.nio.channels.{ReadableByteChannel, WritableByteChannel} -import java.util.Objects object ParquetTypeFileOperations { @@ -94,23 +91,6 @@ case class ParquetTypeFileOperations[T]( override protected def createSink(): FileIO.Sink[T] = ParquetTypeSink(compression, conf) override def getCoder: BCoder[T] = CoderMaterializer.beamWithDefault(coder) - - override def hashCode(): Int = Objects.hash(compression.name(), conf.get(), predicate) - - override def equals(obj: Any): Boolean = obj match { - case ParquetTypeFileOperations(compressionThat, confThat, predicateThat) => - this.compression.name() == compressionThat.name() && this.predicate == predicateThat && - conf - .get() - .iterator() - .asScala - .map(e => (e.getKey, e.getValue)) - .toMap - .equals( - confThat.get().iterator().asScala.map(e => (e.getKey, e.getValue)).toMap - ) - case _ => false - } } private case class ParquetTypeReader[T]( diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java index 4c124cea94..ec2678ced3 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java @@ -45,7 +45,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.hamcrest.MatcherAssert; import org.junit.Assert; @@ -73,9 +72,6 @@ public void testGenericRecord() throws Exception { AvroFileOperations.of(GenericRecordDatumFactory$.INSTANCE, USER_SCHEMA) .withCodec(CodecFactory.snappyCodec()) .withMetadata(TEST_METADATA); - - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final ResourceId file = fromFolder(output).resolve("file.avro", StandardResolveOptions.RESOLVE_FILE); @@ -89,7 +85,6 @@ public void testGenericRecord() throws Exception { .build()) .collect(Collectors.toList()); final FileOperations.Writer writer = fileOperations.createWriter(file); - for (GenericRecord record : records) { writer.write(record); } @@ -110,9 +105,6 @@ public void testSpecificRecord() throws Exception { AvroFileOperations.of(new SpecificRecordDatumFactory<>(AvroGeneratedUser.class), schema) .withCodec(CodecFactory.snappyCodec()) .withMetadata(TEST_METADATA); - - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final ResourceId file = fromFolder(output).resolve("file.avro", StandardResolveOptions.RESOLVE_FILE); diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java index 6b31a90445..d58618a3a1 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/JsonFileOperationsTest.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.util.SerializableUtils; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Rule; @@ -53,8 +52,6 @@ public void testCompression() throws Exception { private void test(Compression compression) throws Exception { final JsonFileOperations fileOperations = JsonFileOperations.of(compression); - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final ResourceId file = fromFolder(output).resolve("file.json", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java index 21b10c01cf..cbf1bb1f2f 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -94,8 +93,6 @@ public void testGenericRecord() throws Exception { final ParquetAvroFileOperations fileOperations = ParquetAvroFileOperations.of(USER_SCHEMA); - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final List actual = new ArrayList<>(); fileOperations.iterator(file).forEachRemaining(actual::add); @@ -106,9 +103,6 @@ public void testGenericRecord() throws Exception { public void testSpecificRecord() throws Exception { final ParquetAvroFileOperations fileOperations = ParquetAvroFileOperations.of(AvroGeneratedUser.class); - - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final ResourceId file = fromFolder(output) .resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); @@ -141,7 +135,6 @@ public void testLogicalTypes() throws Exception { final ParquetAvroFileOperations fileOperations = ParquetAvroFileOperations.of(TestLogicalTypes.class).withConfiguration(conf); - final ResourceId file = fromFolder(output) .resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); @@ -186,8 +179,6 @@ public void testGenericProjection() throws Exception { .withCompression(CompressionCodecName.ZSTD) .withProjection(projection); - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final List expected = USER_RECORDS.stream() .map(r -> new GenericRecordBuilder(USER_SCHEMA).set("name", r.get("name")).build()) @@ -210,8 +201,6 @@ public void testSpecificRecordWithProjection() throws Exception { final ParquetAvroFileOperations fileOperations = ParquetAvroFileOperations.of(AvroGeneratedUser.class).withProjection(projection); - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final ResourceId file = fromFolder(output) .resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); @@ -301,8 +290,6 @@ public void testPredicate() throws Exception { final ParquetAvroFileOperations fileOperations = ParquetAvroFileOperations.of(USER_SCHEMA).withFilterPredicate(predicate); - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final List expected = USER_RECORDS.stream().filter(r -> (int) r.get("age") <= 5).collect(Collectors.toList()); final List actual = new ArrayList<>(); @@ -327,35 +314,6 @@ public void testDisplayData() { MatcherAssert.assertThat(displayData, hasDisplayItem("schema", USER_SCHEMA.getFullName())); } - @Test - public void testConfigurationEquality() { - final Configuration configuration1 = new Configuration(); - configuration1.set("foo", "bar"); - - final ParquetAvroFileOperations fileOperations1 = - ParquetAvroFileOperations.of(USER_SCHEMA).withConfiguration(configuration1); - - // Copy of configuration with same keys - final Configuration configuration2 = new Configuration(); - configuration2.set("foo", "bar"); - - final ParquetAvroFileOperations fileOperations2 = - ParquetAvroFileOperations.of(USER_SCHEMA).withConfiguration(configuration2); - - // Assert that configuration equality check fails - Assert.assertEquals(fileOperations1, SerializableUtils.ensureSerializable(fileOperations2)); - - // Copy of configuration with different keys - final Configuration configuration3 = new Configuration(); - configuration3.set("bar", "baz"); - - final ParquetAvroFileOperations fileOperations3 = - ParquetAvroFileOperations.of(USER_SCHEMA).withConfiguration(configuration3); - - // Assert that configuration equality check fails - Assert.assertNotEquals(fileOperations1, SerializableUtils.ensureSerializable(fileOperations3)); - } - private void writeFile(ResourceId file) throws IOException { final ParquetAvroFileOperations fileOperations = ParquetAvroFileOperations.of(USER_SCHEMA).withCompression(CompressionCodecName.ZSTD); diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java index 30c0968514..3427401e2f 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TensorFlowFileOperationsTest.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.util.SerializableUtils; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Rule; @@ -59,8 +58,6 @@ public void testCompressed() throws Exception { private void test(Compression compression) throws Exception { final TensorFlowFileOperations fileOperations = TensorFlowFileOperations.of(compression); - Assert.assertEquals(fileOperations, SerializableUtils.ensureSerializable(fileOperations)); - final ResourceId file = fromFolder(output) .resolve("file.tfrecord", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); diff --git a/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperationsTest.scala b/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperationsTest.scala index e63df27eb2..4f54634eb8 100644 --- a/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperationsTest.scala +++ b/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperationsTest.scala @@ -23,8 +23,6 @@ import com.spotify.scio.CoreSysProps import org.apache.beam.sdk.io.LocalResources import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions import org.apache.beam.sdk.io.fs.ResourceId -import org.apache.beam.sdk.util.SerializableUtils -import org.apache.hadoop.conf.Configuration import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.scalatest.flatspec.AnyFlatSpec @@ -52,8 +50,6 @@ class ParquetTypeFileOperationsTest extends AnyFlatSpec with Matchers { writeFile(file) val fileOps = ParquetTypeFileOperations[User]() - SerializableUtils.ensureSerializable(fileOps) should equal(fileOps) - val actual = fileOps.iterator(file).asScala.toSeq actual shouldBe users @@ -68,7 +64,6 @@ class ParquetTypeFileOperationsTest extends AnyFlatSpec with Matchers { writeFile(file) val fileOps = ParquetTypeFileOperations[Username]() - SerializableUtils.ensureSerializable(fileOps) should equal(fileOps) val actual = fileOps.iterator(file).asScala.toSeq actual shouldBe users.map(u => Username(u.name)) @@ -84,33 +79,12 @@ class ParquetTypeFileOperationsTest extends AnyFlatSpec with Matchers { val predicate = FilterApi.ltEq(FilterApi.intColumn("age"), java.lang.Integer.valueOf(5)) val fileOps = ParquetTypeFileOperations[User](predicate) - SerializableUtils.ensureSerializable(fileOps) should equal(fileOps) val actual = fileOps.iterator(file).asScala.toSeq actual shouldBe users.filter(_.age <= 5) tmpDir.delete() } - it should "compare Configuration values in equals() check" in { - val conf1 = new Configuration() - conf1.set("foo", "bar") - val fileOps1 = ParquetTypeFileOperations[User](CompressionCodecName.UNCOMPRESSED, conf1) - - val conf2 = new Configuration() - conf2.set("foo", "bar") - val fileOps2 = ParquetTypeFileOperations[User](CompressionCodecName.UNCOMPRESSED, conf2) - - // FileOperations with equal Configuration maps should be equal - SerializableUtils.ensureSerializable(fileOps2) should equal(fileOps1) - - val conf3 = new Configuration() - conf3.set("bar", "baz") - val fileOps3 = ParquetTypeFileOperations[User](CompressionCodecName.UNCOMPRESSED, conf3) - - // FileOperations with different Configuration maps should not be equal - SerializableUtils.ensureSerializable(fileOps3) shouldNot equal(fileOps1) - } - private def writeFile(file: ResourceId): Unit = { val fileOps = ParquetTypeFileOperations[User](CompressionCodecName.GZIP) val writer = fileOps.createWriter(file);