Skip to content

Commit

Permalink
NIFI-9259 address review comments and rename geohash format to keep c…
Browse files Browse the repository at this point in the history
…onsistency with EL functions
  • Loading branch information
mikayla-yang committed Nov 28, 2021
1 parent 250bd44 commit 59c41df
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@
<artifactId>nifi-utils</artifactId>
<version>1.15.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.hsr</groupId>
<artifactId>geohash</artifactId>
Expand All @@ -49,16 +43,23 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.10.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
Expand All @@ -69,10 +70,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.10.0</version>
<scope>compile</scope>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
Expand Down Expand Up @@ -67,14 +69,18 @@
@Tags({"geo", "geohash", "record"})
@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "The MIME type indicated by the record writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file")
})
public class GeohashRecord extends AbstractProcessor {

public enum ProcessingMode {
ENCODE, DECODE
}

public enum GeohashFormat {
BASE_32, BINARY_STRING, LONG
BASE32, BINARY, LONG
}

public enum RoutingStrategy {
Expand All @@ -96,10 +102,10 @@ public enum RoutingStrategy {
.name("routing-strategy")
.displayName("Routing Strategy")
.description("Specifies how to route records after encoding or decoding has been performed. "
+ "Skip_Unenriched will route a flowfile to success if any of its records is enriched, otherwise it will be sent to failure. "
+ "Split_Enriched_Unenriched will separate the records that have been enriched from those have not and send them to success, while unenriched records will be sent to failure; "
+ "SKIP_UNENRICHED will route a flowfile to success if any of its records is enriched; otherwise, it will be sent to failure. "
+ "SPLIT will separate the records that have been enriched from those that have not and send them to success, while unenriched records will be sent to failure; "
+ "and the original flowfile will be sent to the original relationship. "
+ "Require_All_Enriched will route a flowfile to success only if all of its records are enriched, otherwise it will be sent to failure")
+ "REQUIRE_ALL_ENRICHED will route a flowfile to success only if all of its records are enriched; otherwise, it will be sent to failure")
.required(true)
.allowableValues(RoutingStrategy.values())
.defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
Expand All @@ -124,8 +130,8 @@ public enum RoutingStrategy {
public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
.name("latitude-record-path")
.displayName("Latitude Record Path")
.description("In the encode mode, this property specifies the record path to retrieve the latitude value; "
+ "in the decode mode, this property specifies the record path to put the latitude value")
.description("In the ENCODE mode, this property specifies the record path to retrieve the latitude value; "
+ "in the DECODE mode, this property specifies the record path to put the latitude value")
.required(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
Expand All @@ -134,8 +140,18 @@ public enum RoutingStrategy {
public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
.name("longitude-record-path")
.displayName("Longitude Record Path")
.description("In the encode mode, this property specifies the record path to retrieve the longitude value; "
+ "in the decode mode, this property specifies the record path to put the longitude value")
.description("In the ENCODE mode, this property specifies the record path to retrieve the longitude value; "
+ "in the DECODE mode, this property specifies the record path to put the longitude value")
.required(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
.name("geohash-record-path")
.displayName("Geohash Record Path")
.description("In the ENCODE mode, this property specifies the record path to put the geohash value; "
+ "in the DECODE mode, this property specifies the record path to retrieve the geohash value")
.required(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
Expand All @@ -144,11 +160,11 @@ public enum RoutingStrategy {
public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
.name("geohash-format")
.displayName("Geohash Format")
.description("In the encode mode, this property specifies the desired format for encoding geohash; "
+ "in the decode mode, this property specifies the format of geohash provided")
.description("In the ENCODE mode, this property specifies the desired format for encoding geohash; "
+ "in the DECODE mode, this property specifies the format of geohash provided")
.required(true)
.allowableValues(GeohashFormat.values())
.defaultValue(GeohashFormat.BASE_32.name())
.defaultValue(GeohashFormat.BASE32.name())
.build();

public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
Expand All @@ -161,16 +177,6 @@ public enum RoutingStrategy {
.dependsOn(MODE, ProcessingMode.ENCODE.name())
.build();

public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
.name("geohash-record-path")
.displayName("Geohash Record Path")
.description("In the encode mode, this property specifies the record path to put the geohash value; "
+ "in the decode mode, this property specifies the record path to retrieve the geohash value")
.required(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Records that are successfully encoded or decoded will be routed to success")
Expand All @@ -183,7 +189,7 @@ public enum RoutingStrategy {

public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("With the Split_Enriched_Unenriched strategy, the original input flowfile will be sent to this relationship regardless of whether it was enriched or not.")
.description("With the SPLIT strategy, the original input flowfile will be sent to this relationship regardless of whether it was enriched or not.")
.build();

private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
Expand Down Expand Up @@ -238,17 +244,18 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
final RoutingStrategy routingStrategy = RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
final String format = context.getProperty(GEOHASH_FORMAT).getValue();
final GeohashFormat format = GeohashFormat.valueOf(context.getProperty(GEOHASH_FORMAT).getValue());

FlowFile output = session.create(input);
FlowFile notFound = routingStrategy == RoutingStrategy.SPLIT ? session.create(input) : null;

try (InputStream is = session.read(input);
RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
OutputStream os = session.write(output);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
OutputStream osNotFound = routingStrategy == RoutingStrategy.SPLIT ? session.write(notFound) : null;
RecordSetWriter notFoundWriter = routingStrategy == RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), reader.getSchema(), osNotFound, notFound) : null) {
try (final InputStream is = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
final OutputStream os = session.write(output);
final OutputStream osNotFound = routingStrategy == RoutingStrategy.SPLIT ? session.write(notFound) : null) {

final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
final RecordSetWriter notFoundWriter = routingStrategy == RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), reader.getSchema(), osNotFound, notFound) : null;

Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
Expand All @@ -259,8 +266,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

Record record;

//The overall relationship used by the Require_All_Enriched and Skip_Unenriched routing strategies to transfer Flowfiles.
//For the Split_Enriched_Unenriched strategy, the transfer of Flowfiles does not rely on this overall relationship. Instead, each individual record will be sent to success or failure.
//The overall relationship used by the REQUIRE_ALL_ENRICHED and SKIP_UNENRICHED routing strategies to transfer Flowfiles.
//For the SPLIT strategy, the transfer of Flowfiles does not rely on this overall relationship. Instead, each individual record will be sent to success or failure.
Relationship targetRelationship = routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED ? REL_SUCCESS : REL_FAILURE;

writer.beginRecordSet();
Expand All @@ -272,7 +279,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
int foundCount = 0;
int notFoundCount = 0;

int level = context.getProperty(GEOHASH_LEVEL).asInteger();
int level = context.getProperty(GEOHASH_LEVEL).evaluateAttributeExpressions(input).asInteger();
final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
Expand All @@ -294,21 +301,21 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

if (routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED) {
if (!updated) { //If the routing strategy is Require_All_Enriched and there exists a record that is not updated, the entire flowfile should be route to REL_FAILURE
if (!updated) { //If the routing strategy is REQUIRE_ALL_ENRICHED and there exists a record that is not updated, the entire flowfile should be route to REL_FAILURE
targetRelationship = REL_FAILURE;
}
} else {
if (updated) {
//If the routing strategy is Skip_Unenriched and there exists a record that is updated, the entire flowfile should be route to REL_SUCCESS
//If the routing strategy is Split_Enriched_Unenriched and there exists a record that is updated, this record should be route to REL_SUCCESS
//If the routing strategy is SKIP_UNENRICHED and there exists a record that is updated, the entire flowfile should be route to REL_SUCCESS
//If the routing strategy is SPLIT and there exists a record that is updated, this record should be route to REL_SUCCESS
targetRelationship = REL_SUCCESS;
}
}

if (routingStrategy != RoutingStrategy.SPLIT || updated) {
writer.write(record);
foundCount++;
} else { //if the routing strategy is Split_Enriched_Unenriched and the record is not updated
} else { //if the routing strategy is SPLIT and the record is not updated
notFoundWriter.write(record);
notFoundCount++;
}
Expand All @@ -324,13 +331,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
notFoundWriter.close();
}

is.close();
os.close();

if (osNotFound != null) {
osNotFound.close();
}

output = session.putAllAttributes(output, buildAttributes(foundCount, writer.getMimeType(), writeResult));
if (routingStrategy != RoutingStrategy.SPLIT) {
session.transfer(output, targetRelationship);
Expand All @@ -352,7 +352,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}


private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePath, Record record, String format, int level) {
private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePath, Record record, GeohashFormat format, int level) {
RecordPathResult latitudeResult = latitudePath.evaluate(record);
RecordPathResult longitudeResult = longitudePath.evaluate(record);
Optional<FieldValue> latitudeField = latitudeResult.getSelectedFields().findFirst();
Expand All @@ -375,8 +375,8 @@ private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePa
double realLongValue = Double.parseDouble(longitudeVal.toString());
GeoHash gh = GeoHash.withCharacterPrecision(realLatValue, realLongValue, level);

switch (GeohashFormat.valueOf(format)) {
case BINARY_STRING:
switch (format) {
case BINARY:
return gh.toBinaryString();
case LONG:
return gh.longValue();
Expand All @@ -385,7 +385,7 @@ private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePa
}
}

private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, Record record, String format) {
private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, Record record, GeohashFormat format) {
RecordPathResult geohashResult = geohashPath.evaluate(record);
Optional<FieldValue> geohashField = geohashResult.getSelectedFields().findFirst();

Expand All @@ -402,8 +402,8 @@ private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, Record rec
String geohashString = geohashVal.toString();
GeoHash decodedHash;

switch (GeohashFormat.valueOf(format)) {
case BINARY_STRING:
switch (format) {
case BINARY:
decodedHash = GeoHash.fromBinaryString(geohashString);
break;
case LONG:
Expand Down
Loading

0 comments on commit 59c41df

Please sign in to comment.