Skip to content

Commit

Permalink
Pass avro reader schema for SMB operation (#5032)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Oct 17, 2023
1 parent 8553596 commit fb3c27f
Showing 1 changed file with 5 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
Expand Down Expand Up @@ -113,17 +114,9 @@ protected FileIO.Sink<ValueT> createSink() {
// https://github.com/spotify/scio/issues/2649
// force GenericDatumWriter instead of ReflectDatumWriter
? (AvroIO.Sink<ValueT>)
AvroIO.sinkViaGenericRecords(
getSchema(),
new AvroIO.RecordFormatter<ValueT>() {
@Override
public GenericRecord formatRecord(ValueT element, Schema schema) {
return (GenericRecord) element;
}
})
.withCodec(codec.getCodec())
AvroIO.<GenericRecord>sink(getSchema())
.withDatumWriterFactory(AvroDatumFactory.generic())
: AvroIO.sink(recordClass)
.withCodec(codec.getCodec())
.withDatumWriterFactory(
(writer) -> {
// same as SpecificRecordDatumFactory in scio-avro
Expand All @@ -137,7 +130,7 @@ public GenericRecord formatRecord(ValueT element, Schema schema) {
return sink.withMetadata(metadata);
}

return sink;
return sink.withCodec(codec.getCodec());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -210,7 +203,7 @@ public void prepareRead(ReadableByteChannel channel) throws IOException {
// same as SpecificRecordDatumFactory in scio-avro
ReflectData data = new ReflectData(recordClass.getClassLoader());
org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils.addLogicalTypeConversions(data);
datumReader = new ReflectDatumReader<>(data);
datumReader = new ReflectDatumReader<>(schema, schema, data);
}

reader = new DataFileStream<>(Channels.newInputStream(channel), datumReader);
Expand Down

0 comments on commit fb3c27f

Please sign in to comment.