From 97c5a5e3acf20a5d8591ef6bc8aefbf4bf02d820 Mon Sep 17 00:00:00 2001 From: Mikayla Yang Date: Mon, 29 Nov 2021 09:22:40 -0800 Subject: [PATCH] NIFI-9259 Simplify the routing strategy to provide 1)all or nothing and 2)allow individual record processing --- .../processors/geohash/GeohashRecord.java | 82 +++++++++---------- .../processors/geohash/GeohashRecordTest.java | 47 +++++------ 2 files changed, 59 insertions(+), 70 deletions(-) diff --git a/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java b/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java index c2bed961b04c..d3678b288537 100644 --- a/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java +++ b/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java @@ -83,12 +83,6 @@ public enum GeohashFormat { BASE32, BINARY, LONG } - public enum RoutingStrategy { - SKIP_UNENRICHED, - SPLIT, - REQUIRE_ALL_ENRICHED - } - public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() .name("mode") .displayName("Mode") @@ -98,17 +92,15 @@ public enum RoutingStrategy { .defaultValue(ProcessingMode.ENCODE.name()) .build(); - public static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder() - .name("routing-strategy") - .displayName("Routing Strategy") - .description("Specifies how to route records after encoding or decoding has been performed. " - + "SKIP_UNENRICHED will route a flowfile to success if any of its records is enriched; otherwise, it will be sent to failure. " - + "SPLIT will separate the records that have been enriched from those that have not and send them to success, while unenriched records will be sent to failure; " - + "and the original flowfile will be sent to the original relationship. " - + "REQUIRE_ALL_ENRICHED will route a flowfile to success only if all of its records are enriched; otherwise, it will be sent to failure") + public static final PropertyDescriptor SPLIT_FOUND_NOT_FOUND = new PropertyDescriptor.Builder() + .name("split-found-not-found") + .displayName("Separate Enriched From Not Enriched") + .description("Separate records that have been enriched from ones that have not. Default behavior is " + + "to send everything to the not found relationship if any of the records is not enriched.") + .allowableValues("true", "false") + .defaultValue("false") .required(true) - .allowableValues(RoutingStrategy.values()) - .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() @@ -177,19 +169,19 @@ public enum RoutingStrategy { .dependsOn(MODE, ProcessingMode.ENCODE.name()) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Records that are successfully encoded or decoded will be routed to success") + public static final Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("Flowfiles with geo-related data provided that are successfully encoded or decoded will be routed to found") .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Records that cannot be encoded or decoded will be routed to failure") + public static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("Flowfiles without geo-related data that cannot be encoded or decoded will be routed to not found") .build(); public static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") - .description("With the SPLIT strategy, the original input flowfile will be sent to this relationship regardless of whether it was enriched or not.") + .description("The original input flowfile will be sent to this relationship regardless of whether it was enriched or not.") .build(); private static final List RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList( @@ -208,7 +200,7 @@ protected void init(final ProcessorInitializationContext context) { descriptors.add(MODE); descriptors.add(RECORD_READER); descriptors.add(RECORD_WRITER); - descriptors.add(ROUTING_STRATEGY); + descriptors.add(SPLIT_FOUND_NOT_FOUND); descriptors.add(LATITUDE_RECORD_PATH); descriptors.add(LONGITUDE_RECORD_PATH); descriptors.add(GEOHASH_RECORD_PATH); @@ -217,8 +209,8 @@ protected void init(final ProcessorInitializationContext context) { descriptors = Collections.unmodifiableList(descriptors); relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); + relationships.add(REL_FOUND); + relationships.add(REL_NOT_FOUND); relationships.add(REL_ORIGINAL); relationships = Collections.unmodifiableSet(relationships); } @@ -243,19 +235,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString()); - final RoutingStrategy routingStrategy = RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue()); + final boolean splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean(); final GeohashFormat format = GeohashFormat.valueOf(context.getProperty(GEOHASH_FORMAT).getValue()); FlowFile output = session.create(input); - FlowFile notFound = routingStrategy == RoutingStrategy.SPLIT ? session.create(input) : null; + FlowFile notFound = splitOutput ? session.create(input) : null; try (final InputStream is = session.read(input); final RecordReader reader = readerFactory.createRecordReader(input, is, getLogger()); final OutputStream os = session.write(output); - final OutputStream osNotFound = routingStrategy == RoutingStrategy.SPLIT ? session.write(notFound) : null) { + final OutputStream osNotFound = splitOutput ? session.write(notFound) : null) { final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output); - final RecordSetWriter notFoundWriter = routingStrategy == RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), reader.getSchema(), osNotFound, notFound) : null; + final RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), reader.getSchema(), osNotFound, notFound) : null; Map paths = new HashMap<>(); for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) { @@ -266,9 +258,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session Record record; - //The overall relationship used by the REQUIRE_ALL_ENRICHED and SKIP_UNENRICHED routing strategies to transfer Flowfiles. - //For the SPLIT strategy, the transfer of Flowfiles does not rely on this overall relationship. Instead, each individual record will be sent to success or failure. - Relationship targetRelationship = routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED ? REL_SUCCESS : REL_FAILURE; + //The overall relationship used to transfer Flowfiles if SPLIT_FOUND_NOT_FOUND is set to false. + //if SPLIT_FOUND_NOT_FOUND is set to true, the transfer of Flowfiles does not rely on this overall relationship. Instead, each individual record will be sent to success or failure. + Relationship targetRelationship = REL_FOUND; writer.beginRecordSet(); @@ -300,22 +292,24 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } - if (routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED) { - if (!updated) { //If the routing strategy is REQUIRE_ALL_ENRICHED and there exists a record that is not updated, the entire flowfile should be route to REL_FAILURE - targetRelationship = REL_FAILURE; + if (!splitOutput) { + if (!updated) { + //If SPLIT_FOUND_NOT_FOUND is set to false(which means all or nothing) and there exists a record + //that is not updated, the entire flowfile should be route to REL_NOT_FOUND + targetRelationship = REL_NOT_FOUND; } } else { if (updated) { - //If the routing strategy is SKIP_UNENRICHED and there exists a record that is updated, the entire flowfile should be route to REL_SUCCESS - //If the routing strategy is SPLIT and there exists a record that is updated, this record should be route to REL_SUCCESS - targetRelationship = REL_SUCCESS; + //If SPLIT_FOUND_NOT_FOUND is set to true(which means individual record processing is allowed) and + //there exists a record that is updated, this record should be route to REL_FOUND + targetRelationship = REL_FOUND; } } - if (routingStrategy != RoutingStrategy.SPLIT || updated) { + if (!splitOutput || updated) { writer.write(record); foundCount++; - } else { //if the routing strategy is SPLIT and the record is not updated + } else { //if SPLIT_FOUND_NOT_FOUND is set to true and the record is not updated notFoundWriter.write(record); notFoundCount++; } @@ -332,17 +326,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } output = session.putAllAttributes(output, buildAttributes(foundCount, writer.getMimeType(), writeResult)); - if (routingStrategy != RoutingStrategy.SPLIT) { + if (!splitOutput) { session.transfer(output, targetRelationship); } else { if (notFoundCount > 0) { notFound = session.putAllAttributes(notFound, buildAttributes(notFoundCount, writer.getMimeType(), notFoundWriterResult)); - session.transfer(notFound, REL_FAILURE); + session.transfer(notFound, REL_NOT_FOUND); } else { session.remove(notFound); } if (foundCount > 0) { - session.transfer(output, REL_SUCCESS); + session.transfer(output, REL_FOUND); } else { session.remove(output); } diff --git a/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/test/java/org/apache/nifi/processors/geohash/GeohashRecordTest.java b/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/test/java/org/apache/nifi/processors/geohash/GeohashRecordTest.java index fca353e176aa..212bbce94c7f 100644 --- a/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/test/java/org/apache/nifi/processors/geohash/GeohashRecordTest.java +++ b/nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/test/java/org/apache/nifi/processors/geohash/GeohashRecordTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -81,72 +82,66 @@ public void setUp() throws InitializationException { runner.setProperty(GeohashRecord.GEOHASH_LEVEL, "12"); } - private void assertTransfers(String path, int failure, int success, int original) { + private void assertTransfers(String path, int notFound, int found, int original) { Map attrs = new HashMap<>(); attrs.put("schema.name", "record"); runner.enqueue(getClass().getResourceAsStream(path), attrs); runner.run(); - runner.assertTransferCount(GeohashRecord.REL_FAILURE, failure); - runner.assertTransferCount(GeohashRecord.REL_SUCCESS, success); + runner.assertTransferCount(GeohashRecord.REL_NOT_FOUND, notFound); + runner.assertTransferCount(GeohashRecord.REL_FOUND, found); runner.assertTransferCount(GeohashRecord.REL_ORIGINAL, original); } @Test - public void testEncodeSendToSuccessDefault() throws Exception { - runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString()); - runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SKIP_UNENRICHED.toString()); + public void testDecodeSendToFoundDefault() throws Exception { + runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.DECODE.toString()); + runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "false"); runner.assertValid(); - assertTransfers("/record_encode.json", 0, 1, 1); + assertTransfers("/record_decode.json", 0, 1, 1); - MockFlowFile ff = runner.getFlowFilesForRelationship(GeohashRecord.REL_SUCCESS).get(0); + MockFlowFile ff = runner.getFlowFilesForRelationship(GeohashRecord.REL_FOUND).get(0); byte[] raw = runner.getContentAsByteArray(ff); String content = new String(raw); ObjectMapper mapper = new ObjectMapper(); List> result = (List>) mapper.readValue(content, List.class); assertNotNull(result); - assertEquals(2, result.size()); + assertEquals(1, result.size()); Map element = result.get(0); - String geohash = (String)element.get("geohash"); - assertNotNull(geohash); + Double latitude = (Double) element.get("latitude"); + Double longitude = (Double) element.get("longitude"); + assertNotNull(latitude); + assertNotNull(longitude); } @Test - public void testEncodeSendToFailureDefault() { + public void testEncodeSendToNotFoundDefault() { runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString()); - runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SKIP_UNENRICHED.toString()); + runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "false"); runner.assertValid(); - assertTransfers("/record_decode.json", 1, 0, 1); + assertTransfers("/record_encode.json", 1, 0, 1); } @Test public void testSplit() { runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString()); - runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SPLIT.toString()); + runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "true"); runner.assertValid(); assertTransfers("/record_encode.json", 1, 1, 1); } @Test - public void testRequireAllEnrichedSendToFailure() { + public void testSplitEmptyRemoved() { runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString()); - runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.REQUIRE_ALL_ENRICHED.toString()); + runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "true"); runner.assertValid(); - assertTransfers("/record_encode.json", 1, 0, 1); + assertTransfers("/record_decode.json", 1, 0, 1); } - @Test - public void testRequireAllEnrichedSendToSuccess() { - runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.DECODE.toString()); - runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.REQUIRE_ALL_ENRICHED.toString()); - runner.assertValid(); - - assertTransfers("/record_decode.json", 0, 1, 1); - } }