diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java index 42674829d1..e94b6f1c3a 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java @@ -34,6 +34,7 @@ import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory; import org.apache.beam.sdk.extensions.avro.io.AvroIO; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; @@ -113,17 +114,9 @@ protected FileIO.Sink createSink() { // https://github.com/spotify/scio/issues/2649 // force GenericDatumWriter instead of ReflectDatumWriter ? (AvroIO.Sink) - AvroIO.sinkViaGenericRecords( - getSchema(), - new AvroIO.RecordFormatter() { - @Override - public GenericRecord formatRecord(ValueT element, Schema schema) { - return (GenericRecord) element; - } - }) - .withCodec(codec.getCodec()) + AvroIO.sink(getSchema()) + .withDatumWriterFactory(AvroDatumFactory.generic()) : AvroIO.sink(recordClass) - .withCodec(codec.getCodec()) .withDatumWriterFactory( (writer) -> { // same as SpecificRecordDatumFactory in scio-avro @@ -137,7 +130,7 @@ public GenericRecord formatRecord(ValueT element, Schema schema) { return sink.withMetadata(metadata); } - return sink; + return sink.withCodec(codec.getCodec()); } @SuppressWarnings("unchecked") @@ -210,7 +203,7 @@ public void prepareRead(ReadableByteChannel channel) throws IOException { // same as SpecificRecordDatumFactory in scio-avro ReflectData data = new ReflectData(recordClass.getClassLoader()); org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils.addLogicalTypeConversions(data); - datumReader = new ReflectDatumReader<>(data); + datumReader = new ReflectDatumReader<>(schema, schema, data); } reader = new DataFileStream<>(Channels.newInputStream(channel), datumReader);