Skip to content

Commit

Permalink
Initial commit for GCS Batch Source plugin metadata feature.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvitiuk committed Apr 29, 2022
1 parent 873126b commit 323e375
Show file tree
Hide file tree
Showing 22 changed files with 951 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ public String getPathField() {
return null;
}

@Nullable
@Override
public String getLengthField() {
return null;
}

@Nullable
@Override
public String getModificationTimeField() {
return null;
}

@Override
public boolean useFilenameAsPath() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* Tests for ETLBatch.
*/
public class ETLMapReduceTestRun extends ETLBatchTestBase {
private static final Schema TEXT_SCHEMA = TextInputFormatProvider.getDefaultSchema(null);
private static final Schema TEXT_SCHEMA = TextInputFormatProvider.getDefaultSchema(null, null, null);

@Test
public void testInvalidTransformConfigFailsToDeploy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

Expand All @@ -51,11 +52,11 @@ public StructuredRecord transform(GenericRecord genericRecord, Schema structured
}

public StructuredRecord.Builder transform(GenericRecord genericRecord, Schema structuredSchema,
@Nullable String skipField) throws IOException {
@Nullable List<String> skipFields) throws IOException {
StructuredRecord.Builder builder = StructuredRecord.builder(structuredSchema);
for (Schema.Field field : structuredSchema.getFields()) {
String fieldName = field.getName();
if (!fieldName.equals(skipField)) {
if (!skipFields.contains(fieldName)) {
builder.set(fieldName, convertField(genericRecord.get(fieldName), field));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,66 +84,61 @@ public static class Conf extends PathTrackingConfig {
@Description(NAME_SCHEMA)
public String schema;

}

@Nullable
@Override
public Schema getSchema(FormatContext context) {
if (conf.containsMacro("schema")) {
return super.getSchema(context);
}
if (!Strings.isNullOrEmpty(conf.schema)) {
return super.getSchema(context);
}
String filePath = conf.getProperties().getProperties().getOrDefault("path", null);
if (filePath == null) {
return super.getSchema(context);
}
try {
return getDefaultSchema(context);
} catch (IOException e) {
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
}
}

/**
* Extract schema from file
*
* @param context {@link FormatContext}
* @return {@link Schema}
* @throws IOException raised when error occurs during schema extraction
*/
public Schema getDefaultSchema(@Nullable FormatContext context) throws IOException {
String filePath = conf.getProperties().getProperties().getOrDefault("path", null);
SeekableInput seekableInput = null;
FileReader<GenericRecord> dataFileReader = null;
try {
Job job = JobUtils.createInstance();
Configuration hconf = job.getConfiguration();
// set entries here, before FileSystem is used
for (Map.Entry<String, String> entry : conf.getFileSystemProperties().entrySet()) {
hconf.set(entry.getKey(), entry.getValue());
@Nullable
@Override
public Schema getSchema() {
if (containsMacro("schema")) {
return super.getSchema();
}
Path file = conf.getFilePathForSchemaGeneration(filePath, ".+\\.avro", hconf, job);
DatumReader<GenericRecord> dataReader = new GenericDatumReader<>();
seekableInput = new FsInput(file, hconf);
dataFileReader = DataFileReader.openReader(seekableInput, dataReader);
GenericRecord firstRecord;
if (!dataFileReader.hasNext()) {
return null;
if (!Strings.isNullOrEmpty(schema)) {
return super.getSchema();
}
firstRecord = dataFileReader.next();
return new AvroToStructuredTransformer().convertSchema(firstRecord.getSchema());
} catch (IOException e) {
context.getFailureCollector().addFailure("Schema parse error", e.getMessage());
} finally {
if (dataFileReader != null) {
dataFileReader.close();
String filePath = getProperties().getProperties().getOrDefault("path", null);
if (filePath == null) {
return super.getSchema();
}
if (seekableInput != null) {
seekableInput.close();
try {
return getDefaultSchema();
} catch (IOException e) {
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
}
}

/**
* Extract schema from file
*
* @return {@link Schema}
* @throws IOException raised when error occurs during schema extraction
*/
public Schema getDefaultSchema() throws IOException {
String filePath = getProperties().getProperties().getOrDefault("path", null);
SeekableInput seekableInput = null;
FileReader<GenericRecord> dataFileReader = null;
try {
Job job = JobUtils.createInstance();
Configuration hconf = job.getConfiguration();
// set entries here, before FileSystem is used
for (Map.Entry<String, String> entry : getFileSystemProperties().entrySet()) {
hconf.set(entry.getKey(), entry.getValue());
}
Path file = getFilePathForSchemaGeneration(filePath, ".+\\.avro", hconf, job);
DatumReader<GenericRecord> dataReader = new GenericDatumReader<>();
seekableInput = new FsInput(file, hconf);
dataFileReader = DataFileReader.openReader(seekableInput, dataReader);
GenericRecord firstRecord;
if (!dataFileReader.hasNext()) {
return null;
}
firstRecord = dataFileReader.next();
return new AvroToStructuredTransformer().convertSchema(firstRecord.getSchema());
} finally {
if (dataFileReader != null) {
dataFileReader.close();
}
if (seekableInput != null) {
seekableInput.close();
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.format.MetadataField;
import io.cdap.plugin.format.avro.AvroToStructuredTransformer;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -31,7 +32,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
Expand All @@ -46,7 +49,17 @@ protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReade

RecordReader<AvroKey<GenericRecord>, NullWritable> delegate = (new AvroKeyInputFormat<GenericRecord>())
.createRecordReader(split, context);
return new AvroRecordReader(delegate, schema, pathField);
return new AvroRecordReader(delegate, schema, pathField, null);
}

@Override
protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReader(
FileSplit split, TaskAttemptContext context, @Nullable String pathField, Map<String, MetadataField> metadataFields,
@Nullable Schema schema) throws IOException, InterruptedException {

RecordReader<AvroKey<GenericRecord>, NullWritable> delegate = (new AvroKeyInputFormat<GenericRecord>())
.createRecordReader(split, context);
return new AvroRecordReader(delegate, schema, pathField, metadataFields);
}

/**
Expand All @@ -56,13 +69,15 @@ static class AvroRecordReader extends RecordReader<NullWritable, StructuredRecor
private final RecordReader<AvroKey<GenericRecord>, NullWritable> delegate;
private final AvroToStructuredTransformer recordTransformer;
private final String pathField;
private final Map<String, MetadataField> metadataFields;
private Schema schema;

AvroRecordReader(RecordReader<AvroKey<GenericRecord>, NullWritable> delegate, @Nullable Schema schema,
@Nullable String pathField) {
@Nullable String pathField, @Nullable Map<String, MetadataField> metadataFields) {
this.delegate = delegate;
this.schema = schema;
this.pathField = pathField;
this.metadataFields = metadataFields == null ? Collections.EMPTY_MAP : metadataFields;
this.recordTransformer = new AvroToStructuredTransformer();
}

Expand All @@ -87,18 +102,25 @@ public StructuredRecord.Builder getCurrentValue() throws IOException, Interrupte
// if schema is null, but we're still able to read, that means the file contains the schema information
// set the schema based on the schema of the record
if (schema == null) {
if (pathField == null) {
if (pathField == null && metadataFields.isEmpty()) {
schema = Schema.parseJson(genericRecord.getSchema().toString());
} else {
// if there is a path field, add the path as a field in the schema
Schema schemaWithoutPath = Schema.parseJson(genericRecord.getSchema().toString());
List<Schema.Field> fields = new ArrayList<>(schemaWithoutPath.getFields().size() + 1);
fields.addAll(schemaWithoutPath.getFields());
fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING)));
if (pathField != null) {
fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING)));
}
for (String fieldName : metadataFields.keySet()) {
fields.add(Schema.Field.of(fieldName, Schema.of(metadataFields.get(fieldName).getSchemaType())));
}
schema = Schema.recordOf(schemaWithoutPath.getRecordName(), fields);
}
}
return recordTransformer.transform(genericRecord, schema, pathField);
List<String> fieldsToExclude = new ArrayList<>(metadataFields.keySet());
fieldsToExclude.add(pathField);
return recordTransformer.transform(genericRecord, schema, fieldsToExclude);
}

@Override
Expand Down
Loading

0 comments on commit 323e375

Please sign in to comment.