Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated output formats to output BigDecimal fields as numeric values instead of a byte representation. #1640

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.format.io;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this in this package so I had access to the underlaying JsonWriter instance.


import com.google.gson.stream.JsonWriter;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.common.io.Encoder;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;

/**
* Custom Datum Writer for Structured Records, this class writes BigDecimal values as numbers instead of byte arrays in
* the JSON output.
*/
public class BigDecimalAwareJsonStructuredRecordDatumWriter extends JsonStructuredRecordDatumWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases do we need the original represenatation? Also, I am wondering why is it a byte array given io.cdap.cdap.format.io.JsonStructuredRecordDatumWriter#encode(io.cdap.cdap.common.io.Encoder, io.cdap.cdap.api.data.schema.Schema, java.lang.Object) handles DECIMAL?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we pass the value directly from the StructuredRecord. So decimals are simply a byte array internally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, why this logic does not go into JsonStructuredRecordDatumWriter itself?

@Override
protected void encode(Encoder encoder, Schema schema, Object value) throws IOException {
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
Schema.LogicalType logicalType = nonNullableSchema.getLogicalType();

if (value != null && logicalType == Schema.LogicalType.DECIMAL) {
BigDecimal bdValue = fromObject(value, nonNullableSchema);
getJsonWriter(encoder).value(bdValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a hack. Why can't we do it without it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the code as implemented in the platform doesn't have logic to handle BigDecimals directly. Only primitive CDAP types are handled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.cdap.cdap.format.io.JsonStructuredRecordDatumWriter#encode(io.cdap.cdap.common.io.Encoder, io.cdap.cdap.api.data.schema.Schema, java.lang.Object) has Decimal support. If it does not work, we should fix it over there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it's cdap-formats it's shipped in plugin, so we can have a new plugin released with a cdap-formats fix without new CDAP release

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class only handles Decimals if the logicalTypeAsString flag is set (which we only do for preview). It also writes the value as a String which would wrap the output in quotes

https://github.com/cdapio/cdap/blob/develop/cdap-formats/src/main/java/io/cdap/cdap/format/io/JsonStructuredRecordDatumWriter.java#L135

return;
}

super.encode(encoder, schema, value);
}

/**
* Extract a BigDecimal value from a supplied object
* @param value object to convert into BigDecimal
* @param logicalTypeSchema logical type schema for this field
* @return value converted ingo a BigDecimal.
*/
protected BigDecimal fromObject(Object value, Schema logicalTypeSchema) {
// Return BigDecimal as is
if (value instanceof BigDecimal) {
return (BigDecimal) value;
}

// Handle the value as a byte buffer
int scale = logicalTypeSchema.getScale();
if (value instanceof ByteBuffer) {
return new BigDecimal(new BigInteger(Bytes.toBytes((ByteBuffer) value)), scale);
}

// Handle the BigDecimal value
try {
return new BigDecimal(new BigInteger((byte[]) value), scale);
} catch (ClassCastException e) {
throw new ClassCastException(String.format("Field '%s' is expected to be a decimal, but is a %s.",
logicalTypeSchema.getDisplayName(),
value.getClass().getSimpleName()));
}
}

private JsonWriter getJsonWriter(Encoder encoder) {
// Type already checked in the encode method, hence assuming the casting is fine.
return ((JsonEncoder) encoder).getJsonWriter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.format.io;

import com.google.gson.stream.JsonWriter;

import java.io.IOException;
import java.io.Writer;
import java.math.BigDecimal;

/**
* JsonWriter instance which handles writing BigDecimal fields.
*/
public class BigDecimalAwareJsonWriter extends JsonWriter {
public BigDecimalAwareJsonWriter(Writer out) {
super(out);
}

@Override
public JsonWriter value(Number value) throws IOException {
if (value == null) {
return this.nullValue();
}

// Wrap BigDecimal fields in a wrapper which handles the conversion to String.
if (value instanceof BigDecimal) {
value = new BigDecimalWrapper((BigDecimal) value);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wrapper is used because the original JsonWriter class uses toString() to generate the representation of the Number instance.

Becase of internal private methods, I can't simply implement this logic directly, so I'm using the wrapper class defined below as a workaround.

}

super.value(value);
return this;
}

/**
* Wrapper used to ensure that BigDecimals are generated as plain strings.
*/
private static class BigDecimalWrapper extends Number {
BigDecimal wrapped;

protected BigDecimalWrapper(BigDecimal wrapped) {
this.wrapped = wrapped;
}

@Override
public String toString() {
return wrapped.toPlainString();
}

@Override
public int intValue() {
return wrapped.intValue();
}

@Override
public long longValue() {
return wrapped.longValue();
}

@Override
public float floatValue() {
return wrapped.floatValue();
}

@Override
public double doubleValue() {
return wrapped.doubleValue();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.format;

import com.google.gson.stream.JsonWriter;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.format.io.BigDecimalAwareJsonStructuredRecordDatumWriter;
import io.cdap.cdap.format.io.BigDecimalAwareJsonWriter;
import io.cdap.cdap.format.io.JsonEncoder;

import java.io.IOException;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.util.stream.Collectors;

/**
* Structured record converted that handles writing decimal files as numbers in the output (instead of int arrays).
*/
public class BigDecimalAwareStructuredRecordStringConverter {
private static final BigDecimalAwareJsonStructuredRecordDatumWriter JSON_DATUM_WRITER =
new BigDecimalAwareJsonStructuredRecordDatumWriter();

/**
* Converts a {@link StructuredRecord} to a json string.
*/
public static String toJsonString(StructuredRecord record) throws IOException {
StringWriter strWriter = new StringWriter();
try (JsonWriter writer = new BigDecimalAwareJsonWriter(strWriter)) {
JSON_DATUM_WRITER.encode(record, new JsonEncoder(writer));
return strWriter.toString();
}
}

/**
* Converts a {@link StructuredRecord} to a delimited string.
*/
public static String toDelimitedString(final StructuredRecord record, String delimiter) {
return record.getSchema().getFields().stream()
.map(f -> mapField(record, f))
.collect(Collectors.joining(delimiter));
}

/**
* Get the string representation for a given record field. BigDecimals are printed as plain strings.
* @param record record to process
* @param field field to extract
* @return String representing the value for this field.
*/
private static String mapField(StructuredRecord record, Schema.Field field) {
String fieldName = field.getName();
Object value = record.get(fieldName);

// Return null value as empty string.
if (value == null) {
return "";
tivv marked this conversation as resolved.
Show resolved Hide resolved
}

// Get the field schema.
Schema fieldSchema = field.getSchema();
if (fieldSchema.isNullable()) {
fieldSchema = fieldSchema.getNonNullable();
}

// Write decimal values as decimal strings.
if (fieldSchema.getLogicalType() != null && fieldSchema.getLogicalType() == Schema.LogicalType.DECIMAL) {
BigDecimal decimalValue = record.getDecimal(fieldName);

// Throw exception if the field is expected tu be decimal, but it could not be processed as such.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: tu

if (decimalValue == null) {
throw new IllegalArgumentException("Invalid schema for field " + fieldName + ". Decimal was expected.");
}
return decimalValue.toPlainString();
}

return value.toString();
}

private BigDecimalAwareStructuredRecordStringConverter() {
//inaccessible constructor for static class
}
}
Loading