From 1066e6871391c83dc6a5a38e265f2a71108b6b29 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 5 Sep 2024 16:27:47 -0400 Subject: [PATCH 1/2] (fix #5472) Set desiredBundleSizeBytes to Long.MaxValue in BinaryIO reads to prevent file-splitting --- .../scala/com/spotify/scio/io/BinaryIO.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala b/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala index dfde1661d9..f3565d7a62 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala @@ -34,6 +34,7 @@ import org.apache.beam.sdk.options.PipelineOptions import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider import org.apache.beam.sdk.transforms.SerializableFunctions import org.apache.beam.sdk.util.MimeTypes +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions import org.apache.commons.compress.compressors.CompressorStreamFactory import org.typelevel.scalaccompat.annotation.unused @@ -56,7 +57,6 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] { override protected def read(sc: ScioContext, params: ReadP): SCollection[Array[Byte]] = { val filePattern = ScioUtil.filePattern(path, params.suffix) - val desiredBundleSizeBytes = 64 * 1024 * 1024L // 64 mb val coder = ByteArrayCoder.of() val srcFn = Functions.serializableFn { path: String => new BinaryIO.BinarySource(path, params.emptyMatchTreatment, params.reader) @@ -74,7 +74,8 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] { ) .applyTransform( "Read all via FileBasedSource", - new ReadAllViaFileBasedSource[Array[Byte]](desiredBundleSizeBytes, srcFn, coder) + // Setting desiredBundleSizeBytes to Long.MaxValue prevents Beam from trying to split files + new ReadAllViaFileBasedSource[Array[Byte]](Long.MaxValue, srcFn, coder) ) } @@ -326,6 +327,9 @@ object BinaryIO { false } } + + override def allowsDynamicSplitting(): Boolean = + false } } @@ -344,8 +348,14 @@ object BinaryIO { fileMetadata: Metadata, start: Long, end: Long - ): FileBasedSource[Array[Byte]] = + ): FileBasedSource[Array[Byte]] = { + Preconditions.checkArgument( + start == 0, + "Range with offset {} requested, but BinaryIO is unsplittable", + start + ) new BinarySingleFileSource(binaryFileReader, fileMetadata, start, end) + } override def createSingleFileReader( options: PipelineOptions From 47c068d50b23292dd4823750f7393f71b8d296df Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 10 Sep 2024 09:31:01 -0400 Subject: [PATCH 2/2] Use require over Preconditions --- scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala b/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala index f3565d7a62..7fadeb95d8 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala @@ -34,7 +34,6 @@ import org.apache.beam.sdk.options.PipelineOptions import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider import org.apache.beam.sdk.transforms.SerializableFunctions import org.apache.beam.sdk.util.MimeTypes -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions import org.apache.commons.compress.compressors.CompressorStreamFactory import org.typelevel.scalaccompat.annotation.unused @@ -349,10 +348,9 @@ object BinaryIO { start: Long, end: Long ): FileBasedSource[Array[Byte]] = { - Preconditions.checkArgument( + require( start == 0, - "Range with offset {} requested, but BinaryIO is unsplittable", - start + s"Range with offset $start requested, but BinaryIO is unsplittable" ) new BinarySingleFileSource(binaryFileReader, fileMetadata, start, end) }