Skip to content

Commit

Permalink
NIFI-9259 Rebase and transfer original flowfiles to original relation…
Browse files Browse the repository at this point in the history
…ship for every routing strategy
  • Loading branch information
mikayla-yang committed Nov 29, 2021
1 parent 59c41df commit c63b7e2
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 17 deletions.
2 changes: 1 addition & 1 deletion nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geohash-nar</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geohash-bundle</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
</parent>

<artifactId>nifi-geohash-nar</artifactId>
Expand All @@ -39,7 +39,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geohash-processors</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geohash-bundle</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
</parent>

<artifactId>nifi-geohash-processors</artifactId>
Expand All @@ -33,7 +33,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ch.hsr</groupId>
Expand All @@ -48,7 +48,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -60,19 +60,19 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
output = session.putAllAttributes(output, buildAttributes(foundCount, writer.getMimeType(), writeResult));
if (routingStrategy != RoutingStrategy.SPLIT) {
session.transfer(output, targetRelationship);
session.remove(input);
} else {
if (notFoundCount > 0) {
notFound = session.putAllAttributes(notFound, buildAttributes(notFoundCount, writer.getMimeType(), notFoundWriterResult));
session.transfer(notFound, REL_FAILURE);
} else {
session.remove(notFound);
}
session.transfer(output, REL_SUCCESS);
session.transfer(input, REL_ORIGINAL);
if (foundCount > 0) {
session.transfer(output, REL_SUCCESS);
} else {
session.remove(output);
}
}
session.transfer(input, REL_ORIGINAL);

} catch (Exception ex) {
getLogger().error("Failed to {} due to {}", encode ? "encode" : "decode", ex.getLocalizedMessage(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testEncodeSendToSuccessDefault() throws Exception {
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SKIP_UNENRICHED.toString());
runner.assertValid();

assertTransfers("/record_encode.json", 0, 1, 0);
assertTransfers("/record_encode.json", 0, 1, 1);

MockFlowFile ff = runner.getFlowFilesForRelationship(GeohashRecord.REL_SUCCESS).get(0);
byte[] raw = runner.getContentAsByteArray(ff);
Expand All @@ -120,7 +120,7 @@ public void testEncodeSendToFailureDefault() {
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.SKIP_UNENRICHED.toString());
runner.assertValid();

assertTransfers("/record_decode.json", 1, 0, 0);
assertTransfers("/record_decode.json", 1, 0, 1);
}

@Test
Expand All @@ -138,7 +138,7 @@ public void testRequireAllEnrichedSendToFailure() {
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.REQUIRE_ALL_ENRICHED.toString());
runner.assertValid();

assertTransfers("/record_encode.json", 1, 0, 0);
assertTransfers("/record_encode.json", 1, 0, 1);
}

@Test
Expand All @@ -147,6 +147,6 @@ public void testRequireAllEnrichedSendToSuccess() {
runner.setProperty(GeohashRecord.ROUTING_STRATEGY, GeohashRecord.RoutingStrategy.REQUIRE_ALL_ENRICHED.toString());
runner.assertValid();

assertTransfers("/record_decode.json", 0, 1, 0);
assertTransfers("/record_decode.json", 0, 1, 1);
}
}
2 changes: 1 addition & 1 deletion nifi-nar-bundles/nifi-geohash-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.15.0-SNAPSHOT</version>
<version>1.16.0-SNAPSHOT</version>
</parent>

<artifactId>nifi-geohash-bundle</artifactId>
Expand Down

0 comments on commit c63b7e2

Please sign in to comment.