Skip to content

Commit

Permalink
Merge 1ecbae4 into eb6a91a
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty authored May 3, 2023
2 parents eb6a91a + 1ecbae4 commit 6e6a868
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil}
import com.spotify.scio.values.SCollection
Expand Down Expand Up @@ -102,7 +102,7 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
)
val dynamicDestinations =
DynamicFileDestinations.constant(fp, SerializableFunctions.identity[T])
val job = Job.getInstance(Option(conf).getOrElse(new Configuration()))
val job = Job.getInstance(ParquetConfiguration.ofNullable(conf))
if (isLocalRunner) GcsConnectorUtil.setCredentials(job)

val sink = new ParquetAvroFileBasedSink[T](
Expand All @@ -119,7 +119,7 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val isAssignable = classOf[SpecificRecordBase].isAssignableFrom(cls)
val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else params.schema
val conf = Option(params.conf).getOrElse(new Configuration())
val conf = ParquetConfiguration.ofNullable(params.conf)
if (
conf.get(AvroWriteSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO.containsLogicalType(
writerSchema
Expand Down Expand Up @@ -185,7 +185,7 @@ object ParquetAvroIO {
if (isSpecific) ReflectData.get().getSchema(avroClass) else projection

def read(sc: ScioContext, path: String)(implicit coder: Coder[T]): SCollection[T] = {
val jobConf = Option(conf).getOrElse(new Configuration())
val jobConf = ParquetConfiguration.ofNullable(conf)

if (
jobConf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO
Expand Down Expand Up @@ -331,7 +331,7 @@ case class ParquetAvroTap[A, T: ClassTag: Coder](
xs.iterator.flatMap { metadata =>
val reader = AvroParquetReader
.builder[A](BeamInputFile.of(metadata.resourceId()))
.withConf(Option(params.conf).getOrElse(new Configuration()))
.withConf(ParquetConfiguration.ofNullable(params.conf))
.build()
new Iterator[T] {
private var current: A = reader.read()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.spotify.scio.parquet.avro.dynamic.syntax
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.dynamic.syntax.DynamicSCollectionOps.writeDynamic
import com.spotify.scio.io.{ClosedTap, EmptyTap}
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.avro.{ParquetAvroIO, ParquetAvroSink}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.values.SCollection
Expand Down Expand Up @@ -60,7 +61,7 @@ final class DynamicParquetAvroSCollectionOps[T](
new ParquetAvroSink[T](
writerSchema,
compression,
new SerializableConfiguration(Option(conf).getOrElse(new Configuration()))
new SerializableConfiguration(ParquetConfiguration.ofNullable(conf))
)
val write = writeDynamic(path, numShards, suffix, destinationFn, tempDirectory).via(sink)
self.applyInternal(write)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ package object parquet {
}

object ParquetConfiguration {
def empty(): Configuration =
new Configuration()

def of(entries: (String, Any)*): Configuration = {
val conf = new Configuration()
val conf = empty()
entries.foreach { case (k, v) =>
v match {
case b: Boolean => conf.setBoolean(k, b)
Expand All @@ -67,5 +70,8 @@ package object parquet {
}
conf
}

private[parquet] def ofNullable(conf: Configuration): Configuration =
Option(conf).getOrElse(empty())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean}
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil}
import com.spotify.scio.testing.TestDataManager
Expand Down Expand Up @@ -55,7 +56,7 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {
override val tapT: TapT.Aux[Example, Example] = TapOf[Example]

override protected def read(sc: ScioContext, params: ReadP): SCollection[Example] = {
val conf = Option(params.conf).getOrElse(new Configuration())
val conf = ParquetConfiguration.ofNullable(params.conf)
val useSplittableDoFn = conf.getBoolean(
ParquetReadConfiguration.UseSplittableDoFn,
ParquetReadConfiguration.UseSplittableDoFnDefault
Expand Down Expand Up @@ -169,7 +170,7 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {
)
val dynamicDestinations =
DynamicFileDestinations.constant(fp, SerializableFunctions.identity[Example])
val job = Job.getInstance(Option(conf).getOrElse(new Configuration()))
val job = Job.getInstance(ParquetConfiguration.ofNullable(conf))
if (isLocalRunner) GcsConnectorUtil.setCredentials(job)
val sink = new ParquetExampleFileBasedSink(
StaticValueProvider.of(tempDirectory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.spotify.scio.parquet.tensorflow.dynamic.syntax
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.dynamic.syntax.DynamicSCollectionOps.writeDynamic
import com.spotify.scio.io.{ClosedTap, EmptyTap}
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.tensorflow.ParquetExampleIO
import com.spotify.scio.parquet.tensorflow.dynamic.ParquetExampleSink
import com.spotify.scio.values.SCollection
Expand Down Expand Up @@ -57,7 +58,7 @@ final class DynamicParquetExampleSCollectionOps(
val sink = new ParquetExampleSink(
schema,
compression,
new SerializableConfiguration(Option(conf).getOrElse(new Configuration()))
new SerializableConfiguration(ParquetConfiguration.ofNullable(conf))
)
val write = writeDynamic(path, numShards, suffix, destinationFn, tempDirectory).via(sink)
self.applyInternal(write)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.values.SCollection
Expand Down Expand Up @@ -54,7 +54,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType](
private val tpe: ParquetType[T] = implicitly[ParquetType[T]]

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val conf = Option(params.conf).getOrElse(new Configuration())
val conf = ParquetConfiguration.ofNullable(params.conf)
val useSplittableDoFn = conf.getBoolean(
ParquetReadConfiguration.UseSplittableDoFn,
ParquetReadConfiguration.UseSplittableDoFnDefault
Expand Down Expand Up @@ -140,7 +140,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType](
)
val dynamicDestinations =
DynamicFileDestinations.constant(fp, SerializableFunctions.identity[T])
val job = Job.getInstance(Option(conf).getOrElse(new Configuration()))
val job = Job.getInstance(ParquetConfiguration.ofNullable(conf))
if (isLocalRunner) GcsConnectorUtil.setCredentials(job)
val sink = new ParquetTypeFileBasedSink[T](
StaticValueProvider.of(tempDirectory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.spotify.scio.parquet.types.dynamic.syntax
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.dynamic.syntax.DynamicSCollectionOps.writeDynamic
import com.spotify.scio.io.{ClosedTap, EmptyTap}
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.types.{ParquetTypeIO, ParquetTypeSink}
import com.spotify.scio.values.SCollection
import magnolify.parquet.ParquetType
Expand Down Expand Up @@ -50,7 +51,7 @@ final class DynamicParquetTypeSCollectionOps[T](
} else {
val sink = new ParquetTypeSink[T](
compression,
new SerializableConfiguration(Option(conf).getOrElse(new Configuration()))
new SerializableConfiguration(ParquetConfiguration.ofNullable(conf))
)
val write = writeDynamic(path, numShards, suffix, destinationFn, tempDirectory).via(sink)
self.applyInternal(write)
Expand Down

0 comments on commit 6e6a868

Please sign in to comment.