Skip to content

Commit

Permalink
NIFI-9259 Simplify the routing strategy to provide 1)all or nothing a…
Browse files Browse the repository at this point in the history
…nd 2)allow individual record processing
  • Loading branch information
mikayla-yang committed Nov 29, 2021
1 parent c63b7e2 commit 97c5a5e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ public enum GeohashFormat {
BASE32, BINARY, LONG
}

public enum RoutingStrategy {
SKIP_UNENRICHED,
SPLIT,
REQUIRE_ALL_ENRICHED
}

public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("mode")
.displayName("Mode")
Expand All @@ -98,17 +92,15 @@ public enum RoutingStrategy {
.defaultValue(ProcessingMode.ENCODE.name())
.build();

public static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder()
.name("routing-strategy")
.displayName("Routing Strategy")
.description("Specifies how to route records after encoding or decoding has been performed. "
+ "SKIP_UNENRICHED will route a flowfile to success if any of its records is enriched; otherwise, it will be sent to failure. "
+ "SPLIT will separate the records that have been enriched from those that have not and send them to success, while unenriched records will be sent to failure; "
+ "and the original flowfile will be sent to the original relationship. "
+ "REQUIRE_ALL_ENRICHED will route a flowfile to success only if all of its records are enriched; otherwise, it will be sent to failure")
public static final PropertyDescriptor SPLIT_FOUND_NOT_FOUND = new PropertyDescriptor.Builder()
.name("split-found-not-found")
.displayName("Separate Enriched From Not Enriched")
.description("Separate records that have been enriched from ones that have not. Default behavior is " +
"to send everything to the not found relationship if any of the records is not enriched.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.allowableValues(RoutingStrategy.values())
.defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -177,19 +169,19 @@ public enum RoutingStrategy {
.dependsOn(MODE, ProcessingMode.ENCODE.name())
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Records that are successfully encoded or decoded will be routed to success")
public static final Relationship REL_FOUND = new Relationship.Builder()
.name("found")
.description("Flowfiles with geo-related data provided that are successfully encoded or decoded will be routed to found")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Records that cannot be encoded or decoded will be routed to failure")
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not found")
.description("Flowfiles without geo-related data that cannot be encoded or decoded will be routed to not found")
.build();

public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("With the SPLIT strategy, the original input flowfile will be sent to this relationship regardless of whether it was enriched or not.")
.description("The original input flowfile will be sent to this relationship regardless of whether it was enriched or not.")
.build();

private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
Expand All @@ -208,7 +200,7 @@ protected void init(final ProcessorInitializationContext context) {
descriptors.add(MODE);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(ROUTING_STRATEGY);
descriptors.add(SPLIT_FOUND_NOT_FOUND);
descriptors.add(LATITUDE_RECORD_PATH);
descriptors.add(LONGITUDE_RECORD_PATH);
descriptors.add(GEOHASH_RECORD_PATH);
Expand All @@ -217,8 +209,8 @@ protected void init(final ProcessorInitializationContext context) {
descriptors = Collections.unmodifiableList(descriptors);

relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_FOUND);
relationships.add(REL_NOT_FOUND);
relationships.add(REL_ORIGINAL);
relationships = Collections.unmodifiableSet(relationships);
}
Expand All @@ -243,19 +235,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
final RoutingStrategy routingStrategy = RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
final boolean splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
final GeohashFormat format = GeohashFormat.valueOf(context.getProperty(GEOHASH_FORMAT).getValue());

FlowFile output = session.create(input);
FlowFile notFound = routingStrategy == RoutingStrategy.SPLIT ? session.create(input) : null;
FlowFile notFound = splitOutput ? session.create(input) : null;

try (final InputStream is = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
final OutputStream os = session.write(output);
final OutputStream osNotFound = routingStrategy == RoutingStrategy.SPLIT ? session.write(notFound) : null) {
final OutputStream osNotFound = splitOutput ? session.write(notFound) : null) {

final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
final RecordSetWriter notFoundWriter = routingStrategy == RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), reader.getSchema(), osNotFound, notFound) : null;
final RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), reader.getSchema(), osNotFound, notFound) : null;

Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
Expand All @@ -266,9 +258,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

Record record;

//The overall relationship used by the REQUIRE_ALL_ENRICHED and SKIP_UNENRICHED routing strategies to transfer Flowfiles.
//For the SPLIT strategy, the transfer of Flowfiles does not rely on this overall relationship. Instead, each individual record will be sent to success or failure.
Relationship targetRelationship = routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED ? REL_SUCCESS : REL_FAILURE;
//The overall relationship used to transfer Flowfiles if SPLIT_FOUND_NOT_FOUND is set to false.
//if SPLIT_FOUND_NOT_FOUND is set to true, the transfer of Flowfiles does not rely on this overall relationship. Instead, each individual record will be sent to success or failure.
Relationship targetRelationship = REL_FOUND;

writer.beginRecordSet();

Expand Down Expand Up @@ -300,22 +292,24 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

if (routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED) {
if (!updated) { //If the routing strategy is REQUIRE_ALL_ENRICHED and there exists a record that is not updated, the entire flowfile should be route to REL_FAILURE
targetRelationship = REL_FAILURE;
if (!splitOutput) {
if (!updated) {
//If SPLIT_FOUND_NOT_FOUND is set to false(which means all or nothing) and there exists a record
//that is not updated, the entire flowfile should be route to REL_NOT_FOUND
targetRelationship = REL_NOT_FOUND;
}
} else {
if (updated) {
//If the routing strategy is SKIP_UNENRICHED and there exists a record that is updated, the entire flowfile should be route to REL_SUCCESS
//If the routing strategy is SPLIT and there exists a record that is updated, this record should be route to REL_SUCCESS
targetRelationship = REL_SUCCESS;
//If SPLIT_FOUND_NOT_FOUND is set to true(which means individual record processing is allowed) and
//there exists a record that is updated, this record should be route to REL_FOUND
targetRelationship = REL_FOUND;
}
}

if (routingStrategy != RoutingStrategy.SPLIT || updated) {
if (!splitOutput || updated) {
writer.write(record);
foundCount++;
} else { //if the routing strategy is SPLIT and the record is not updated
} else { //if SPLIT_FOUND_NOT_FOUND is set to true and the record is not updated
notFoundWriter.write(record);
notFoundCount++;
}
Expand All @@ -332,17 +326,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

output = session.putAllAttributes(output, buildAttributes(foundCount, writer.getMimeType(), writeResult));
if (routingStrategy != RoutingStrategy.SPLIT) {
if (!splitOutput) {
session.transfer(output, targetRelationship);
} else {
if (notFoundCount > 0) {
notFound = session.putAllAttributes(notFound, buildAttributes(notFoundCount, writer.getMimeType(), notFoundWriterResult));
session.transfer(notFound, REL_FAILURE);
session.transfer(notFound, REL_NOT_FOUND);
} else {
session.remove(notFound);
}
if (foundCount > 0) {
session.transfer(output, REL_SUCCESS);
session.transfer(output, REL_FOUND);
} else {
session.remove(output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand Down Expand Up @@ -81,72 +82,66 @@ public void setUp() throws InitializationException {
runner.setProperty(GeohashRecord.GEOHASH_LEVEL, "12");
}

private void assertTransfers(String path, int failure, int success, int original) {
private void assertTransfers(String path, int notFound, int found, int original) {
Map<String, String> attrs = new HashMap<>();
attrs.put("schema.name", "record");
runner.enqueue(getClass().getResourceAsStream(path), attrs);
runner.run();

runner.assertTransferCount(GeohashRecord.REL_FAILURE, failure);
runner.assertTransferCount(GeohashRecord.REL_SUCCESS, success);
runner.assertTransferCount(GeohashRecord.REL_NOT_FOUND, notFound);
runner.assertTransferCount(GeohashRecord.REL_FOUND, found);
runner.assertTransferCount(GeohashRecord.REL_ORIGINAL, original);
}

@Test
public void testEncodeSendToSuccessDefault() throws Exception {
runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString());
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SKIP_UNENRICHED.toString());
public void testDecodeSendToFoundDefault() throws Exception {
runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.DECODE.toString());
runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "false");
runner.assertValid();

assertTransfers("/record_encode.json", 0, 1, 1);
assertTransfers("/record_decode.json", 0, 1, 1);

MockFlowFile ff = runner.getFlowFilesForRelationship(GeohashRecord.REL_SUCCESS).get(0);
MockFlowFile ff = runner.getFlowFilesForRelationship(GeohashRecord.REL_FOUND).get(0);
byte[] raw = runner.getContentAsByteArray(ff);
String content = new String(raw);
ObjectMapper mapper = new ObjectMapper();
List<Map<String, Object>> result = (List<Map<String, Object>>) mapper.readValue(content, List.class);

assertNotNull(result);
assertEquals(2, result.size());
assertEquals(1, result.size());

Map<String, Object> element = result.get(0);
String geohash = (String)element.get("geohash");
assertNotNull(geohash);
Double latitude = (Double) element.get("latitude");
Double longitude = (Double) element.get("longitude");
assertNotNull(latitude);
assertNotNull(longitude);
}

@Test
public void testEncodeSendToFailureDefault() {
public void testEncodeSendToNotFoundDefault() {
runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString());
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SKIP_UNENRICHED.toString());
runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "false");
runner.assertValid();

assertTransfers("/record_decode.json", 1, 0, 1);
assertTransfers("/record_encode.json", 1, 0, 1);
}

@Test
public void testSplit() {
runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString());
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SPLIT.toString());
runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "true");
runner.assertValid();

assertTransfers("/record_encode.json", 1, 1, 1);
}

@Test
public void testRequireAllEnrichedSendToFailure() {
public void testSplitEmptyRemoved() {
runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.ENCODE.toString());
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.REQUIRE_ALL_ENRICHED.toString());
runner.setProperty(GeohashRecord.SPLIT_FOUND_NOT_FOUND, "true");
runner.assertValid();

assertTransfers("/record_encode.json", 1, 0, 1);
assertTransfers("/record_decode.json", 1, 0, 1);
}

@Test
public void testRequireAllEnrichedSendToSuccess() {
runner.setProperty(GeohashRecord.MODE, GeohashRecord.ProcessingMode.DECODE.toString());
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.REQUIRE_ALL_ENRICHED.toString());
runner.assertValid();

assertTransfers("/record_decode.json", 0, 1, 1);
}
}

0 comments on commit 97c5a5e

Please sign in to comment.