Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prefix & suffix param for all IO APIs #4809

Merged
merged 30 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3822aa9
Add prefix param for all IO APIs
RustedBones May 17, 2023
29742ff
Make scalafix happy
RustedBones May 17, 2023
e422502
Allow null shardNameTemplate
RustedBones May 17, 2023
7aaf4f9
Allow to match files with suffix
RustedBones May 23, 2023
cfeb26b
Merge branch 'main' into streamline-io-prefix
RustedBones May 23, 2023
a4cab23
Apply format
RustedBones May 23, 2023
9e94e54
Remove unused nowarn
RustedBones May 23, 2023
8f76e96
Streamline file taps
RustedBones May 23, 2023
525992c
Use more stable shard pattern
RustedBones May 23, 2023
e6167cd
Scalafix
RustedBones May 24, 2023
4c90b5a
Streamline pattern parameter
RustedBones May 24, 2023
5f4ef78
Scalafix
RustedBones May 24, 2023
4c7abf3
Use file path in tap without suffix
RustedBones May 24, 2023
4e08ee5
Normalize all taps
RustedBones May 24, 2023
310adb4
Create missing filePattern in IOs read
RustedBones May 24, 2023
f1a78c8
WIP
RustedBones May 24, 2023
3edc05b
Fix taps tests
RustedBones May 24, 2023
f012168
Scalafix
RustedBones May 24, 2023
c11fb5e
Fix factorized TableRowJson params
RustedBones May 24, 2023
78749b8
Merge branch 'main' into streamline-io-prefix
RustedBones May 25, 2023
d269e45
Open IO default params
RustedBones May 31, 2023
03ddcbc
Change ReadParam from WriteParam constructor visibility
RustedBones May 31, 2023
33b198e
Fix compile issues
RustedBones May 31, 2023
d31e6ff
Overload avroFile API
RustedBones May 31, 2023
83e996b
Merge branch 'main' into streamline-io-prefix
RustedBones Jun 5, 2023
6ef6702
Merge remote-tracking branch 'origin/main' into streamline-io-prefix
RustedBones Jun 7, 2023
209e5e8
Standardize io param default values
RustedBones Jun 7, 2023
3a89081
Add checks for null paths
RustedBones Jun 7, 2023
28fca66
FIx typo and type param
RustedBones Jun 7, 2023
b501af1
Scalafix
RustedBones Jun 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class AvroTapIT extends AnyFlatSpec with Matchers {
it should "read avro file" in {
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create)

val tap = AvroTaps(Taps()).avroFile(
val tap = AvroTaps(Taps()).avroGenericFile(
"gs://data-integration-test-eu/avro-integration-test/folder-a/folder-b/shakespeare.avro",
schema = schema
)
Expand Down
162 changes: 93 additions & 69 deletions scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import scala.reflect.runtime.universe._
import scala.reflect.ClassTag

final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] {
override type ReadP = Unit
override type ReadP = ObjectFileIO.ReadParam
override type WriteP = ObjectFileIO.WriteParam
final override val tapT: TapT.Aux[T, T] = TapOf[T]

Expand All @@ -51,7 +51,7 @@ final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] {
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val coder = CoderMaterializer.beamWithDefault(Coder[T])
sc.read(GenericRecordIO(path, AvroBytesUtil.schema))
sc.read(GenericRecordIO(path, AvroBytesUtil.schema))(params)
.parDo(new DoFn[GenericRecord, T] {
@ProcessElement
private[scio] def processElement(
Expand Down Expand Up @@ -81,20 +81,22 @@ final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] {
out.output(AvroBytesUtil.encode(elemCoder, element))
})
.write(GenericRecordIO(path, AvroBytesUtil.schema))(params)
tap(())
tap(AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[T] =
ObjectFileTap[T](ScioUtil.addPartSuffix(path))
ObjectFileTap[T](path, read)
}

object ObjectFileIO {
type ReadParam = AvroIO.ReadParam
val ReadParam = AvroIO.ReadParam
type WriteParam = AvroIO.WriteParam
val WriteParam = AvroIO.WriteParam
}

final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO[T] {
override type ReadP = Unit
override type ReadP = ProtobufIO.ReadParam
override type WriteP = ProtobufIO.WriteParam
final override val tapT: TapT.Aux[T, T] = TapOf[T]
private val protoCoder = Coder.protoMessageCoder[T]
Expand All @@ -106,7 +108,7 @@ final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO
* block file format.
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.read(ObjectFileIO[T](path)(protoCoder))
sc.read(ObjectFileIO[T](path)(protoCoder))(params)

/**
* Save this SCollection as a Protobuf file.
Expand All @@ -120,10 +122,12 @@ final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO
}

override def tap(read: ReadP): Tap[T] =
ObjectFileTap[T](ScioUtil.addPartSuffix(path))(protoCoder)
ObjectFileTap[T](path, read)(protoCoder)
}

object ProtobufIO {
type ReadParam = AvroIO.ReadParam
val ReadParam = AvroIO.ReadParam
type WriteParam = AvroIO.WriteParam
val WriteParam = AvroIO.WriteParam
}
Expand All @@ -138,19 +142,19 @@ sealed trait AvroIO[T] extends ScioIO[T] {
suffix: String,
codec: CodecFactory,
metadata: Map[String, AnyRef],
shardNameTemplate: String,
tempDirectory: ResourceId,
filenamePolicySupplier: FilenamePolicySupplier,
isWindowed: Boolean
prefix: String,
shardNameTemplate: String,
isWindowed: Boolean,
tempDirectory: ResourceId
): beam.AvroIO.Write[U] = {
require(tempDirectory != null, "tempDirectory must not be null")
val fp = FilenamePolicySupplier.resolve(
path,
suffix,
shardNameTemplate,
tempDirectory,
filenamePolicySupplier,
isWindowed
)
filenamePolicySupplier = filenamePolicySupplier,
prefix = prefix,
shardNameTemplate = shardNameTemplate,
isWindowed = isWindowed
)(ScioUtil.strippedPath(path), suffix)
val transform = write
.to(fp)
.withTempDirectory(tempDirectory)
Expand All @@ -163,7 +167,7 @@ sealed trait AvroIO[T] extends ScioIO[T] {

final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: String)
extends AvroIO[T] {
override type ReadP = Unit
override type ReadP = AvroIO.ReadParam
override type WriteP = AvroIO.WriteParam

override def testId: String = s"AvroIO($path)"
Expand All @@ -175,7 +179,10 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val coder = CoderMaterializer.beam(sc, Coder[T])
val cls = ScioUtil.classOf[T]
val t = beam.AvroIO.read(cls).from(path)
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
.read(cls)
.from(filePattern)
sc
.applyTransform(t)
.setCoder(coder)
Expand All @@ -197,21 +204,22 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St
params.suffix,
params.codec,
params.metadata,
params.shardNameTemplate,
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context),
params.filenamePolicySupplier,
ScioUtil.isWindowed(data)
params.prefix,
params.shardNameTemplate,
ScioUtil.isWindowed(data),
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context)
)
)
tap(())
tap(AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[T] =
SpecificRecordTap[T](ScioUtil.addPartSuffix(path))
SpecificRecordTap[T](path, read)
}

final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[GenericRecord] {
override type ReadP = Unit
override type ReadP = AvroIO.ReadParam
override type WriteP = AvroIO.WriteParam

override def testId: String = s"AvroIO($path)"
Expand All @@ -222,9 +230,10 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[GenericRecord] = {
val coder = CoderMaterializer.beam(sc, Coder.avroGenericRecordCoder(schema))
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
.readGenericRecords(schema)
.from(path)
.from(filePattern)
sc
.applyTransform(t)
.setCoder(coder)
Expand All @@ -246,17 +255,18 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
params.suffix,
params.codec,
params.metadata,
params.shardNameTemplate,
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context),
params.filenamePolicySupplier,
ScioUtil.isWindowed(data)
params.prefix,
params.shardNameTemplate,
ScioUtil.isWindowed(data),
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context)
)
)
tap(())
tap(AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[GenericRecord] =
GenericRecordTap(ScioUtil.addPartSuffix(path), schema)
GenericRecordTap(path, schema, read)
}

/**
Expand All @@ -269,7 +279,7 @@ final case class GenericRecordIO(path: String, schema: Schema) extends AvroIO[Ge
final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord => T)(implicit
coder: Coder[T]
) extends AvroIO[T] {
override type ReadP = Unit
override type ReadP = AvroIO.ReadParam
override type WriteP = Nothing // Output is not defined for Avro Generic Parse IO.

override def testId: String = s"AvroIO($path)"
Expand All @@ -278,10 +288,11 @@ final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord =>
* Get an SCollection[T] by applying the [[parseFn]] on
* [[org.apache.avro.generic.GenericRecord GenericRecord]] from an Avro file.
*/
override protected def read(sc: ScioContext, params: Unit): SCollection[T] = {
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO
.parseGenericRecords(Functions.serializableFn(parseFn))
.from(path)
.from(filePattern)
.withCoder(CoderMaterializer.beam(sc, coder))

sc.applyTransform(t)
Expand All @@ -290,33 +301,45 @@ final case class GenericRecordParseIO[T](path: String, parseFn: GenericRecord =>
/** Writes are undefined for [[GenericRecordParseIO]] since it is used only for reading. */
override protected def write(data: SCollection[T], params: Nothing): Tap[T] = ???

override def tap(read: Unit): Tap[T] =
GenericRecordParseTap[T](ScioUtil.addPartSuffix(path), parseFn)
override def tap(read: ReadP): Tap[T] =
GenericRecordParseTap[T](path, parseFn, read)
}

object AvroIO {
object WriteParam {
private[scio] val DefaultNumShards = 0
private[scio] val DefaultSuffix = ""
private[scio] val DefaultCodec: CodecFactory = CodecFactory.deflateCodec(6)
private[scio] val DefaultMetadata: Map[String, AnyRef] = Map.empty
private[scio] val DefaultShardNameTemplate: String = null
private[scio] val DefaultTempDirectory = null
private[scio] val DefaultFilenamePolicySupplier = null

private[scio] object ReadParam {
val DefaultSuffix: String = null

def apply(params: WriteParam): ReadParam = new ReadParam(
params.suffix
)
}

final case class ReadParam private (
suffix: String = ReadParam.DefaultSuffix
)

private[scio] object WriteParam {
val DefaultNumShards = 0
val DefaultSuffix = ".avro"
val DefaultCodec: CodecFactory = CodecFactory.deflateCodec(6)
val DefaultMetadata: Map[String, AnyRef] = Map.empty
val DefaultFilenamePolicySupplier = null
val DefaultPrefix = null
val DefaultShardNameTemplate: String = null
val DefaultTempDirectory = null
}

final case class WriteParam private (
numShards: Int = WriteParam.DefaultNumShards,
private val _suffix: String = WriteParam.DefaultSuffix,
suffix: String = WriteParam.DefaultSuffix,
codec: CodecFactory = WriteParam.DefaultCodec,
metadata: Map[String, AnyRef] = WriteParam.DefaultMetadata,
filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier,
prefix: String = WriteParam.DefaultPrefix,
shardNameTemplate: String = WriteParam.DefaultShardNameTemplate,
tempDirectory: String = WriteParam.DefaultTempDirectory,
filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier
RustedBones marked this conversation as resolved.
Show resolved Hide resolved
) {
// TODO this is kinda weird when compared with the other IOs?
val suffix: String = _suffix + ".avro"
}
tempDirectory: String = WriteParam.DefaultTempDirectory
)

@inline final def apply[T](id: String): AvroIO[T] =
new AvroIO[T] with TestIO[T] {
Expand All @@ -335,7 +358,7 @@ object AvroTyped {
}

final case class AvroIO[T <: HasAvroAnnotation: TypeTag: Coder](path: String) extends ScioIO[T] {
override type ReadP = Unit
override type ReadP = avro.AvroIO.ReadParam
override type WriteP = avro.AvroIO.WriteParam
final override val tapT: TapT.Aux[T, T] = TapOf[T]

Expand All @@ -346,19 +369,19 @@ object AvroTyped {
suffix: String,
codec: CodecFactory,
metadata: Map[String, AnyRef],
shardNameTemplate: String,
tempDirectory: ResourceId,
filenamePolicySupplier: FilenamePolicySupplier,
isWindowed: Boolean
prefix: String,
shardNameTemplate: String,
isWindowed: Boolean,
tempDirectory: ResourceId
) = {
require(tempDirectory != null, "tempDirectory must not be null")
val fp = FilenamePolicySupplier.resolve(
path,
suffix,
shardNameTemplate,
tempDirectory,
filenamePolicySupplier,
isWindowed
)
filenamePolicySupplier = filenamePolicySupplier,
prefix = prefix,
shardNameTemplate = shardNameTemplate,
isWindowed = isWindowed
)(ScioUtil.strippedPath(path), suffix)
val transform = write
.to(fp)
.withTempDirectory(tempDirectory)
Expand All @@ -378,7 +401,8 @@ object AvroTyped {
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val avroT = AvroType[T]
val t = beam.AvroIO.readGenericRecords(avroT.schema).from(path)
val filePattern = ScioUtil.filePattern(path, params.suffix)
val t = beam.AvroIO.readGenericRecords(avroT.schema).from(filePattern)
sc.applyTransform(t).map(avroT.fromGenericRecord)
}

Expand All @@ -395,19 +419,19 @@ object AvroTyped {
params.suffix,
params.codec,
params.metadata,
params.shardNameTemplate,
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context),
params.filenamePolicySupplier,
ScioUtil.isWindowed(data)
params.prefix,
params.shardNameTemplate,
ScioUtil.isWindowed(data),
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context)
)
)
tap(())
tap(avro.AvroIO.ReadParam(params))
}

override def tap(read: ReadP): Tap[T] = {
val avroT = AvroType[T]
GenericRecordTap(ScioUtil.addPartSuffix(path), avroT.schema)
.map(avroT.fromGenericRecord)
GenericRecordTap(path, avroT.schema, read).map(avroT.fromGenericRecord)
}
}
}
Loading