Skip to content

Commit

Permalink
[Issue 3762][Schema] Fix the problem with parsing of an Avro schema r…
Browse files Browse the repository at this point in the history
…elated to shading in pulsar-client. (#6406)

Motivation
Avro schemas are quite important for proper data flow and it is a pity that the #3762 issue stayed untouched for so long. There were some workarounds on how to make Pulsar use an original avro schema, but in the end, it is pretty hard to run an enterprise solution on workarounds. With this PR I would like to find a solution to the problem caused by shading avro in pulsar-client. As it was discussed in the issue, there are two possible solutions for this problem:

Unshade the avro library in the pulsar-client library. (IMHO it seems like a proper solution for this problem, but it also brings a risk of unknown side-effects)
Use reflection to get original schemas from generated classes. (I went for this solution)
Could you please comment if this is a proper solution for the problem? I will add tests when my approach will be confirmed.

Modifications
First, we try to extract an original avro schema from the "$SCHEMA" field using reflection. If it doesn't work, the process falls back generation of the schema from POJO.

(cherry picked from commit dab14ac)
  • Loading branch information
vzhikserg authored and tuteng committed Apr 13, 2020
1 parent 81f4460 commit 337cdc5
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schema
try {
// Disable validation of default values for compatibility
validateDefaults.set(false);
return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo)
: ReflectData.get().getSchema(pojo);
return extractAvroSchema(schemaDefinition, pojo);
} finally {
validateDefaults.set(savedValidateDefaults);
}
Expand All @@ -160,6 +159,15 @@ protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schema
}
}

protected static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class pojo) {
try {
return parseAvroSchema(pojo.getDeclaredField("SCHEMA$").get(null).toString());
} catch (NoSuchFieldException | IllegalAccessException | IllegalArgumentException ignored) {
return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo)
: ReflectData.get().getSchema(pojo);
}
}

protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
final Parser parser = new Parser();
parser.setValidateDefaults(false);
Expand Down

0 comments on commit 337cdc5

Please sign in to comment.