Skip to content

Commit

Permalink
Merge b501af1 into f3e737e
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Jun 7, 2023
2 parents f3e737e + b501af1 commit 0a4c41c
Show file tree
Hide file tree
Showing 73 changed files with 1,719 additions and 1,235 deletions.
159 changes: 91 additions & 68 deletions scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import scala.reflect.runtime.universe._
import scala.reflect.ClassTag

final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] {
override type ReadP = Unit
override type ReadP = ObjectFileIO.ReadParam
override type WriteP = ObjectFileIO.WriteParam
final override val tapT: TapT.Aux[T, T] = TapOf[T]

Expand All @@ -51,7 +51,7 @@ final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] {
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val coder = CoderMaterializer.beamWithDefault(Coder[T])
sc.read(GenericRecordIO(path, AvroBytesUtil.schema))
sc.read(GenericRecordIO(path, AvroBytesUtil.schema))(params)
.parDo(new DoFn[GenericRecord, T] {
@ProcessElement
private[scio] def processElement(
Expand Down Expand Up @@ -81,20 +81,22 @@ final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] {
out.output(AvroBytesUtil.encode(elemCoder, element))
})
.write(GenericRecordIO(path, AvroBytesUtil.schema))(params)
tap(())
tap(AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[T] =
ObjectFileTap[T](ScioUtil.addPartSuffix(path))
ObjectFileTap[T](path, read)
}

object ObjectFileIO {
type ReadParam = AvroIO.ReadParam
val ReadParam = AvroIO.ReadParam
type WriteParam = AvroIO.WriteParam
val WriteParam = AvroIO.WriteParam
}

final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO[T] {
override type ReadP = Unit
override type ReadP = ProtobufIO.ReadParam
override type WriteP = ProtobufIO.WriteParam
final override val tapT: TapT.Aux[T, T] = TapOf[T]
private val protoCoder = Coder.protoMessageCoder[T]
Expand All @@ -106,7 +108,7 @@ final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO
* block file format.
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.read(ObjectFileIO[T](path)(protoCoder))
sc.read(ObjectFileIO[T](path)(protoCoder))(params)

/**
* Save this SCollection as a Protobuf file.
Expand All @@ -120,10 +122,12 @@ final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO
}

override def tap(read: ReadP): Tap[T] =
ObjectFileTap[T](ScioUtil.addPartSuffix(path))(protoCoder)
ObjectFileTap[T](path, read)(protoCoder)
}

object ProtobufIO {
type ReadParam = AvroIO.ReadParam
val ReadParam = AvroIO.ReadParam
type WriteParam = AvroIO.WriteParam
val WriteParam = AvroIO.WriteParam
}
Expand All @@ -138,19 +142,19 @@ sealed trait AvroIO[T] extends ScioIO[T] {
suffix: String,
codec: CodecFactory,
metadata: Map[String, AnyRef],
shardNameTemplate: String,
tempDirectory: ResourceId,
filenamePolicySupplier: FilenamePolicySupplier,
isWindowed: Boolean
prefix: String,
shardNameTemplate: String,
isWindowed: Boolean,
tempDirectory: ResourceId
): beam.AvroIO.Write[U] = {
require(tempDirectory != null, "tempDirectory must not be null")
val fp = FilenamePolicySupplier.resolve(
path,
suffix,
shardNameTemplate,
tempDirectory,
filenamePolicySupplier,
isWindowed
)
filenamePolicySupplier = filenamePolicySupplier,
prefix = prefix,
shardNameTemplate = shardNameTemplate,
isWindowed = isWindowed
)(ScioUtil.strippedPath(path), suffix)
val transform = write
.to(fp)
.withTempDirectory(tempDirectory)
Expand All @@ -163,7 +167,7 @@ sealed trait AvroIO[T] extends ScioIO[T] {

final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: String)
extends AvroIO[T] {
override type ReadP = Unit
override type ReadP = AvroIO.ReadParam
override type WriteP = AvroIO.WriteParam

override def testId: String = s"AvroIO($path)"
Expand All @@ -175,7 +179,10 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val coder = CoderMaterializer.beam(sc, Coder[T])
val cls = ScioUtil.classOf[T]
val t = beam.AvroIO.read(cls).from(path)
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
.read(cls)
.from(filePattern)
sc
.applyTransform(t)
.setCoder(coder)
Expand All @@ -197,21 +204,22 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
params.suffix,
params.codec,
params.metadata,
params.shardNameTemplate,
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context),
params.filenamePolicySupplier,
ScioUtil.isWindowed(data)
params.prefix,
params.shardNameTemplate,
ScioUtil.isWindowed(data),
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context)
)
)
tap(())
tap(AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[T] =
SpecificRecordTap[T](ScioUtil.addPartSuffix(path))
SpecificRecordTap[T](path, read)
}

final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[GenericRecord] {
override type ReadP = Unit
override type ReadP = AvroIO.ReadParam
override type WriteP = AvroIO.WriteParam

override def testId: String = s"AvroIO($path)"
Expand All @@ -222,9 +230,10 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[GenericRecord] = {
val coder = CoderMaterializer.beam(sc, Coder.avroGenericRecordCoder(schema))
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
.readGenericRecords(schema)
.from(path)
.from(filePattern)
sc
.applyTransform(t)
.setCoder(coder)
Expand All @@ -246,17 +255,18 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
params.suffix,
params.codec,
params.metadata,
params.shardNameTemplate,
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context),
params.filenamePolicySupplier,
ScioUtil.isWindowed(data)
params.prefix,
params.shardNameTemplate,
ScioUtil.isWindowed(data),
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context)
)
)
tap(())
tap(AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[GenericRecord] =
GenericRecordTap(ScioUtil.addPartSuffix(path), schema)
GenericRecordTap(path, schema, read)
}

/**
Expand All @@ -269,7 +279,7 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord => T)(implicit
coder: Coder[T]
) extends AvroIO[T] {
override type ReadP = Unit
override type ReadP = AvroIO.ReadParam
override type WriteP = Nothing // Output is not defined for Avro Generic Parse IO.

override def testId: String = s"AvroIO($path)"
Expand All @@ -278,10 +288,11 @@ final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord =>
* Get an SCollection[T] by applying the [[parseFn]] on
* [[org.apache.avro.generic.GenericRecord GenericRecord]] from an Avro file.
*/
override protected def read(sc: ScioContext, params: Unit): SCollection[T] = {
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
.parseGenericRecords(Functions.serializableFn(parseFn))
.from(path)
.from(filePattern)
.withCoder(CoderMaterializer.beam(sc, coder))

sc.applyTransform(t)
Expand All @@ -290,33 +301,44 @@ final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord =>
/** Writes are undefined for [[GenericRecordParseIO]] since it is used only for reading. */
override protected def write(data: SCollection[T], params: Nothing): Tap[T] = ???

override def tap(read: Unit): Tap[T] =
GenericRecordParseTap[T](ScioUtil.addPartSuffix(path), parseFn)
override def tap(read: ReadP): Tap[T] =
GenericRecordParseTap[T](path, parseFn, read)
}

object AvroIO {

object ReadParam {
val DefaultSuffix: String = null

private[scio] def apply(params: WriteParam): ReadParam =
new ReadParam(params.suffix)
}

final case class ReadParam private (
suffix: String = ReadParam.DefaultSuffix
)

object WriteParam {
private[scio] val DefaultNumShards = 0
private[scio] val DefaultSuffix = ""
private[scio] val DefaultCodec: CodecFactory = CodecFactory.deflateCodec(6)
private[scio] val DefaultMetadata: Map[String, AnyRef] = Map.empty
private[scio] val DefaultShardNameTemplate: String = null
private[scio] val DefaultTempDirectory = null
private[scio] val DefaultFilenamePolicySupplier = null
val DefaultNumShards: Int = 0
val DefaultSuffix: String = ".avro"
val DefaultCodec: CodecFactory = CodecFactory.deflateCodec(6)
val DefaultMetadata: Map[String, AnyRef] = Map.empty
val DefaultFilenamePolicySupplier: FilenamePolicySupplier = null
val DefaultPrefix: String = null
val DefaultShardNameTemplate: String = null
val DefaultTempDirectory: String = null
}

final case class WriteParam private (
numShards: Int = WriteParam.DefaultNumShards,
private val _suffix: String = WriteParam.DefaultSuffix,
suffix: String = WriteParam.DefaultSuffix,
codec: CodecFactory = WriteParam.DefaultCodec,
metadata: Map[String, AnyRef] = WriteParam.DefaultMetadata,
filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier,
prefix: String = WriteParam.DefaultPrefix,
shardNameTemplate: String = WriteParam.DefaultShardNameTemplate,
tempDirectory: String = WriteParam.DefaultTempDirectory,
filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier
) {
// TODO this is kinda weird when compared with the other IOs?
val suffix: String = _suffix + ".avro"
}
tempDirectory: String = WriteParam.DefaultTempDirectory
)

@inline final def apply[T](id: String): AvroIO[T] =
new AvroIO[T] with TestIO[T] {
Expand All @@ -335,7 +357,7 @@ object AvroTyped {
}

final case class AvroIO[T <: HasAvroAnnotation: TypeTag: Coder](path: String) extends ScioIO[T] {
override type ReadP = Unit
override type ReadP = avro.AvroIO.ReadParam
override type WriteP = avro.AvroIO.WriteParam
final override val tapT: TapT.Aux[T, T] = TapOf[T]

Expand All @@ -346,19 +368,19 @@ object AvroTyped {
suffix: String,
codec: CodecFactory,
metadata: Map[String, AnyRef],
shardNameTemplate: String,
tempDirectory: ResourceId,
filenamePolicySupplier: FilenamePolicySupplier,
isWindowed: Boolean
prefix: String,
shardNameTemplate: String,
isWindowed: Boolean,
tempDirectory: ResourceId
) = {
require(tempDirectory != null, "tempDirectory must not be null")
val fp = FilenamePolicySupplier.resolve(
path,
suffix,
shardNameTemplate,
tempDirectory,
filenamePolicySupplier,
isWindowed
)
filenamePolicySupplier = filenamePolicySupplier,
prefix = prefix,
shardNameTemplate = shardNameTemplate,
isWindowed = isWindowed
)(ScioUtil.strippedPath(path), suffix)
val transform = write
.to(fp)
.withTempDirectory(tempDirectory)
Expand All @@ -378,7 +400,8 @@ object AvroTyped {
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val avroT = AvroType[T]
val t = beam.AvroIO.readGenericRecords(avroT.schema).from(path)
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO.readGenericRecords(avroT.schema).from(filePattern)
sc.applyTransform(t).map(avroT.fromGenericRecord)
}

Expand All @@ -395,19 +418,19 @@ object AvroTyped {
params.suffix,
params.codec,
params.metadata,
params.shardNameTemplate,
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context),
params.filenamePolicySupplier,
ScioUtil.isWindowed(data)
params.prefix,
params.shardNameTemplate,
ScioUtil.isWindowed(data),
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context)
)
)
tap(())
tap(avro.AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[T] = {
val avroT = AvroType[T]
GenericRecordTap(ScioUtil.addPartSuffix(path), avroT.schema)
.map(avroT.fromGenericRecord)
GenericRecordTap(path, avroT.schema, read).map(avroT.fromGenericRecord)
}
}
}
Loading

0 comments on commit 0a4c41c

Please sign in to comment.