From c6ecbd4e9b1432ef3553234be0a4e1c53875bbe1 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 7 Dec 2023 14:38:49 -0500 Subject: [PATCH 01/16] first pass of wiring error handling into write files and adding tests --- .../java/org/apache/beam/sdk/io/FileIO.java | 20 ++ .../org/apache/beam/sdk/io/WriteFiles.java | 199 +++++++++++++++--- .../apache/beam/sdk/io/WriteFilesTest.java | 126 +++++++++++ .../errorhandling/ErrorHandlingTestUtils.java | 26 +++ .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 2 +- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 20 +- 6 files changed, 338 insertions(+), 55 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 76fc1a70b78c5..d47e14571c834 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -61,6 +61,8 @@ import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -1016,6 +1018,10 @@ public static FileNaming relativeFileNaming( abstract boolean getNoSpilling(); + abstract @Nullable ErrorHandler getBadRecordErrorHandler(); + + abstract @Nullable SerializableFunction getBadRecordMatcher(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1062,6 +1068,12 @@ abstract Builder setSharding( abstract Builder setNoSpilling(boolean noSpilling); + abstract Builder setBadRecordErrorHandler( + @Nullable ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordMatcher( + @Nullable SerializableFunction badRecordMatcher); + abstract Write build(); } @@ -1288,6 +1300,11 @@ public Write withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + public Write withBadRecordErrorHandler(ErrorHandler errorHandler, SerializableFunction badRecordMatcher) { + return toBuilder().setBadRecordErrorHandler(errorHandler).setBadRecordMatcher(badRecordMatcher).build(); + } + @VisibleForTesting Contextful> resolveFileNamingFn() { if (getDynamic()) { @@ -1391,6 +1408,9 @@ public WriteFilesResult expand(PCollection input) { if (getNoSpilling()) { writeFiles = writeFiles.withNoSpilling(); } + if (getBadRecordErrorHandler() != null) { + writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler(),getBadRecordMatcher()); + } return input.apply(writeFiles); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 91d6082eede44..c33051bdd8dc3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -49,6 +50,7 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; @@ -57,11 +59,16 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reify; import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -166,6 +173,9 @@ public static WriteFiles()) + .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) + .setBadRecordMatcher((e) -> true) .build(); } @@ -189,6 +199,12 @@ public static WriteFiles getShardingFunction(); + public abstract ErrorHandler getBadRecordErrorHandler(); + + public abstract BadRecordRouter getBadRecordRouter(); + + public abstract SerializableFunction getBadRecordMatcher(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -215,6 +231,15 @@ abstract Builder setSideInputs( abstract Builder setShardingFunction( @Nullable ShardingFunction shardingFunction); + abstract Builder setBadRecordErrorHandler( + ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordRouter( + BadRecordRouter badRecordRouter); + + abstract Builder setBadRecordMatcher( + SerializableFunction badRecordMatcher); + abstract WriteFiles build(); } @@ -330,6 +355,10 @@ public WriteFiles withSkipIfEmpty() { return toBuilder().setSkipIfEmpty(true).build(); } + public WriteFiles withBadRecordErrorHandler(ErrorHandler errorHandler, SerializableFunction badRecordMatcher) { + return toBuilder().setBadRecordErrorHandler(errorHandler).setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER).setBadRecordMatcher(badRecordMatcher).build(); + } + @Override public void validate(PipelineOptions options) { getSink().validate(options); @@ -495,28 +524,33 @@ private WriteUnshardedBundlesToTempFiles( @Override public PCollection> expand(PCollection input) { + TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords"); + TupleTag, UserT>> unwrittenRecordsTag = new TupleTag<>("unwrittenRecords"); + Coder inputCoder = input.getCoder(); if (getMaxNumWritersPerBundle() < 0) { - return input + PCollectionTuple writeTuple = input .apply( "WritedUnshardedBundles", - ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder)) - .withSideInputs(getSideInputs())) - .setCoder(fileResultCoder); + ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder, inputCoder)) + .withSideInputs(getSideInputs()) + .withOutputTags(writtenRecordsTag, TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); + addErrorCollection(writeTuple); + return writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); } - TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords"); - TupleTag, UserT>> unwrittenRecordsTag = - new TupleTag<>("unwrittenRecords"); + PCollectionTuple writeTuple = input.apply( "WriteUnshardedBundles", - ParDo.of(new WriteUnshardedTempFilesFn(unwrittenRecordsTag, destinationCoder)) + ParDo.of(new WriteUnshardedTempFilesFn(unwrittenRecordsTag, destinationCoder, inputCoder)) .withSideInputs(getSideInputs()) - .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittenRecordsTag))); + .withOutputTags(writtenRecordsTag, TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); + addErrorCollection(writeTuple); + PCollection> writtenBundleFiles = writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in // finalize to stay consistent with what WriteWindowedBundles does. - PCollection> writtenSpilledFiles = + PCollectionTuple spilledWriteTuple = writeTuple .get(unwrittenRecordsTag) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) @@ -529,7 +563,11 @@ public PCollection> expand(PCollection input) { .apply("GroupUnwritten", GroupByKey.create()) .apply( "WriteUnwritten", - ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs())) + ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())).withSideInputs(getSideInputs()).withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); + + addErrorCollection(spilledWriteTuple); + + PCollection> writtenSpilledFiles = spilledWriteTuple.get(writtenRecordsTag) .setCoder(fileResultCoder) .apply( "DropShardNum", @@ -556,6 +594,8 @@ private class WriteUnshardedTempFilesFn extends DoFn, UserT>> unwrittenRecordsTag; private final Coder destinationCoder; + private final Coder inputCoder; + // Initialized in startBundle() private @Nullable Map, Writer> writers; @@ -563,9 +603,11 @@ private class WriteUnshardedTempFilesFn extends DoFn, UserT>> unwrittenRecordsTag, - Coder destinationCoder) { + Coder destinationCoder, + Coder inputCoder) { this.unwrittenRecordsTag = unwrittenRecordsTag; this.destinationCoder = destinationCoder; + this.inputCoder = inputCoder; } @StartBundle @@ -575,7 +617,7 @@ public void startBundle(StartBundleContext c) { } @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + public void processElement(ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver) throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(c); PaneInfo paneInfo = c.pane(); // If we are doing windowed writes, we need to ensure that we have separate files for @@ -583,7 +625,10 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except // destinations go to different writers. // In the case of unwindowed writes, the window and the pane will always be the same, and // the map will only have a single element. - DestinationT destination = getDynamicDestinations().getDestination(c.element()); + DestinationT destination = getDestinationWithErrorHandling(c.element(), outputReceiver, inputCoder); + if (destination == null) { + return; + } WriterKey key = new WriterKey<>(window, c.pane(), destination); Writer writer = writers.get(key); if (writer == null) { @@ -607,15 +652,18 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except } else { spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR; } - c.output( - unwrittenRecordsTag, + outputReceiver.get(unwrittenRecordsTag).output( KV.of( ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum), c.element())); return; } } - writeOrClose(writer, getDynamicDestinations().formatRecord(c.element())); + OutputT formattedRecord = formatRecordWithErrorHandling(c.element(),outputReceiver,inputCoder); + if (formattedRecord == null){ + return; + } + writeOrClose(writer, formattedRecord); } @FinishBundle @@ -701,6 +749,45 @@ private static int hashDestination( .asInt(); } + // Utility method to get the dynamic destination based on a record. If the operation fails, and is + // output to the bad record router, this returns null + private @Nullable DestinationT getDestinationWithErrorHandling(UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception{ + try { + return getDynamicDestinations().getDestination(input); + } catch (Exception e) { + if (getBadRecordMatcher().apply(e)){ + getBadRecordRouter().route(outputReceiver,input,inputCoder,e,"Unable to get dynamic destination for record"); + return null; + } else { + throw e; + } + } + } + + private void addErrorCollection(PCollectionTuple sourceTuple){ + getBadRecordErrorHandler() + .addErrorCollection( + sourceTuple + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(sourceTuple.getPipeline()))); + + } + + // Utility method to format a record based on the dynamic destination. If the operation fails, and + // is output to the bad record router, this returns null + private @Nullable OutputT formatRecordWithErrorHandling(UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception{ + try { + return getDynamicDestinations().formatRecord(input); + } catch (Exception e) { + if (getBadRecordMatcher().apply(e)){ + getBadRecordRouter().route(outputReceiver,input,inputCoder,e,"Unable to format record for Dynamic Destination"); + return null; + } else { + throw e; + } + } + } + private class WriteShardedBundlesToTempFiles extends PTransform, PCollection>> { private final Coder destinationCoder; @@ -728,17 +815,25 @@ public PCollection> expand(PCollection input) { ? new RandomShardingFunction(destinationCoder) : getShardingFunction(); - return input + TupleTag, UserT>> shardedRecords = new TupleTag<>("shardedRecords"); + TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords"); + + PCollectionTuple shardedFiles = input .apply( "ApplyShardingKey", - ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView)) - .withSideInputs(shardingSideInputs)) + ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView, input.getCoder())) + .withSideInputs(shardingSideInputs).withOutputTags(shardedRecords, TupleTagList.of(BAD_RECORD_TAG))); + addErrorCollection(shardedFiles); + + PCollectionTuple writtenFiles = shardedFiles.get(shardedRecords) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) .apply("GroupIntoShards", GroupByKey.create()) .apply( "WriteShardsIntoTempFiles", - ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs())) - .setCoder(fileResultCoder); + ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())).withSideInputs(getSideInputs()).withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); + addErrorCollection(writtenFiles); + + return writtenFiles.get(writtenRecordsTag).setCoder(fileResultCoder); } } @@ -763,22 +858,30 @@ public PCollection>> expand(PCollection inp // // TODO(https://github.com/apache/beam/issues/20928): The implementation doesn't currently // work with merging windows. - PCollection, Iterable>> shardedInput = + TupleTag> shardTag = new TupleTag<>("shardTag"); + + PCollectionTuple shardedElements = input .apply( "KeyedByDestinationHash", ParDo.of( new DoFn>() { @ProcessElement - public void processElement(@Element UserT element, ProcessContext context) + public void processElement(@Element UserT element, ProcessContext context, MultiOutputReceiver outputReceiver) throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(context); - DestinationT destination = - getDynamicDestinations().getDestination(context.element()); + DestinationT destination = getDestinationWithErrorHandling(context.element(), outputReceiver, input.getCoder()); + if (destination == null) { + return; + } context.output( KV.of(hashDestination(destination, destinationCoder), element)); } - })) + }).withOutputTags(shardTag, TupleTagList.of(BAD_RECORD_TAG))); + addErrorCollection(shardedElements); + + PCollection, Iterable>> shardedInput = + shardedElements.get(shardTag) .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder())) .apply( "ShardAndBatch", @@ -791,8 +894,9 @@ public void processElement(@Element UserT element, ProcessContext context) org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()), IterableCoder.of(input.getCoder()))); + TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords"); // Write grouped elements to temp files. - PCollection> tempFiles = + PCollectionTuple writtenFiles = shardedInput .apply( "AddDummyShard", @@ -816,7 +920,11 @@ public KV, Iterable> apply( ShardedKeyCoder.of(VarIntCoder.of()), IterableCoder.of(input.getCoder()))) .apply( "WriteShardsIntoTempFiles", - ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs())) + ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())).withSideInputs(getSideInputs()).withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); + + addErrorCollection(writtenFiles); + + PCollection> tempFiles = writtenFiles.get(writtenRecordsTag) .setCoder(fileResultCoder) .apply( "DropShardNum", @@ -903,15 +1011,19 @@ private class ApplyShardingFunctionFn extends DoFn private final ShardingFunction shardingFn; private final @Nullable PCollectionView numShardsView; + private final Coder inputCoder; + ApplyShardingFunctionFn( ShardingFunction shardingFn, - @Nullable PCollectionView numShardsView) { + @Nullable PCollectionView numShardsView, + Coder inputCoder) { this.numShardsView = numShardsView; this.shardingFn = shardingFn; + this.inputCoder = inputCoder; } @ProcessElement - public void processElement(ProcessContext context) throws Exception { + public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(context); final int shardCount; if (numShardsView != null) { @@ -927,7 +1039,10 @@ public void processElement(ProcessContext context) throws Exception { + " Got %s", shardCount); - DestinationT destination = getDynamicDestinations().getDestination(context.element()); + DestinationT destination = getDestinationWithErrorHandling(context.element(), outputReceiver, inputCoder); + if (destination == null) { + return; + } ShardedKey shardKey = shardingFn.assignShardKey(destination, context.element(), shardCount); context.output(KV.of(shardKey, context.element())); @@ -936,6 +1051,12 @@ public void processElement(ProcessContext context) throws Exception { private class WriteShardsIntoTempFilesFn extends DoFn, Iterable>, FileResult> { + + private final Coder inputCoder; + + public WriteShardsIntoTempFilesFn(Coder inputCoder) { + this.inputCoder = inputCoder; + } private transient List> closeFutures = new ArrayList<>(); private transient List>> deferredOutput = new ArrayList<>(); @@ -949,14 +1070,17 @@ private void readObject(java.io.ObjectInputStream in) } @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + public void processElement(ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver) throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(c); // Since we key by a 32-bit hash of the destination, there might be multiple destinations // in this iterable. The number of destinations is generally very small (1000s or less), so // there will rarely be hash collisions. Map> writers = Maps.newHashMap(); for (UserT input : c.element().getValue()) { - DestinationT destination = getDynamicDestinations().getDestination(input); + DestinationT destination = getDestinationWithErrorHandling(input, outputReceiver, inputCoder); + if (destination == null) { + continue; + } Writer writer = writers.get(destination); if (writer == null) { String uuid = UUID.randomUUID().toString(); @@ -971,7 +1095,12 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except writer.open(uuid); writers.put(destination, writer); } - writeOrClose(writer, getDynamicDestinations().formatRecord(input)); + + OutputT formattedRecord = formatRecordWithErrorHandling(input,outputReceiver,inputCoder); + if (formattedRecord == null){ + return; + } + writeOrClose(writer, formattedRecord); } // Ensure that we clean-up any prior writers that were being closed as part of this bundle diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 39cb612f2d895..a46ad52550efb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -61,6 +61,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesTestStream; @@ -78,6 +79,10 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -634,6 +639,127 @@ private void testDynamicDestinationsHelper(boolean bounded, boolean emptyShards) } } + + // Test FailingDynamicDestinations class. Expects user values to be string-encoded integers. + // Throws exceptions when trying to format records or get destinations based on the mod + // of the element + static class FailingTestDestinations extends DynamicDestinations { + private ResourceId baseOutputDirectory; + + FailingTestDestinations(ResourceId baseOutputDirectory) { + this.baseOutputDirectory = baseOutputDirectory; + } + + @Override + public String formatRecord(String record) { + int value = Integer.valueOf(record); + if (value % 2 == 0) { + throw new RuntimeException("Failed To Format Record"); + } + return "record_" + record; + } + + @Override + public Integer getDestination(String element) { + int value = Integer.valueOf(element); + if (value % 3 == 0) { + throw new RuntimeException("Failed To Get Destination"); + } + return value % 5; + } + + @Override + public Integer getDefaultDestination() { + return 0; + } + + @Override + public FilenamePolicy getFilenamePolicy(Integer destination) { + return new PerWindowFiles( + baseOutputDirectory.resolve("file_" + destination, StandardResolveOptions.RESOLVE_FILE), + "simple"); + } + } + + @Test + @Category(NeedsRunner.class) + public void testFailingDynamicDestinationsBounded() throws Exception { + testFailingDynamicDestinationsHelper(true,false); + } + + @Test + @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) + public void testFailingDynamicDestinationsUnbounded() throws Exception { + testFailingDynamicDestinationsHelper(false,false); + } + + @Test + @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) + public void testFailingDynamicDestinationsAutosharding() throws Exception { + testFailingDynamicDestinationsHelper(false,true); + } + + private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autosharding) + throws IOException { + FailingTestDestinations dynamicDestinations = new FailingTestDestinations(getBaseOutputDirectory()); + SimpleSink sink = + new SimpleSink<>(getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED); + + // Flag to validate that the pipeline options are passed to the Sink. + WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); + options.setTestFlag("test_value"); + Pipeline p = TestPipeline.create(options); + + final int numInputs = 100; + long expectedFailures = 0; + List inputs = Lists.newArrayList(); + for (int i = 0; i < numInputs; ++i) { + inputs.add(Integer.toString(i)); + if(i % 2 != 0 && i % 3 != 0){ + expectedFailures++; + } + } + // Prepare timestamps for the elements. + List timestamps = new ArrayList<>(); + for (long i = 0; i < inputs.size(); i++) { + timestamps.add(i + 1); + } + + BadRecordErrorHandler> errorHandler = p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + int numShards = autosharding ? 0 : 2; + WriteFiles writeFiles = WriteFiles.to(sink).withNumShards(numShards).withBadRecordErrorHandler(errorHandler, (e) -> true); + errorHandler.close(); + + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(expectedFailures); + + PCollection input = p.apply(Create.timestamped(inputs, timestamps)); + WriteFilesResult res; + if (!bounded) { + input.setIsBoundedInternal(IsBounded.UNBOUNDED); + input = input.apply(Window.into(FixedWindows.of(Duration.standardDays(1)))); + res = input.apply(writeFiles.withWindowedWrites()); + } else { + res = input.apply(writeFiles); + } + res.getPerDestinationOutputFilenames().apply(new VerifyFilesExist<>()); + p.run(); + + for (int i = 0; i < 5; ++i) { + ResourceId base = + getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE); + List expected = Lists.newArrayList(); + for (int j = i; j < numInputs; j += 5) { + expected.add("record_" + j); + } + checkFileContents( + base.toString(), + expected, + Optional.of(numShards), + bounded /* expectRemovedTempDirectory */); + } + + } + @Test public void testShardedDisplayData() { DynamicDestinations dynamicDestinations = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java new file mode 100644 index 0000000000000..ce01ec881d6f1 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java @@ -0,0 +1,26 @@ +package org.apache.beam.sdk.transforms.errorhandling; + +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.CalendarWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +public class ErrorHandlingTestUtils { + public static class ErrorSinkTransform + extends PTransform, PCollection> { + + @Override + public @UnknownKeyFor @NonNull @Initialized PCollection expand( + PCollection input) { + return input + .apply("Window", Window.into(CalendarWindows.years(1))) + .apply("Combine", Combine.globally(Count.combineFn()).withoutDefaults()); + } + } + +} diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 5b976687f2c1f..ab6ac52e318d4 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.common.IOITHelper; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; -import org.apache.beam.sdk.io.kafka.KafkaIOTest.ErrorSinkTransform; import org.apache.beam.sdk.io.kafka.KafkaIOTest.FailingLongSerializer; import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest.FailingDeserializer; import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource; @@ -77,6 +76,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.transforms.windowing.CalendarWindows; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index b0df82bcdc195..9b15b86051f5d 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -88,7 +88,6 @@ import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Distinct; @@ -97,15 +96,13 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.CalendarWindows; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; @@ -145,10 +142,7 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.hamcrest.collection.IsIterableWithSize; import org.joda.time.Duration; @@ -1472,18 +1466,6 @@ public void testSinkWithSerializationErrors() throws Exception { } } - public static class ErrorSinkTransform - extends PTransform, PCollection> { - - @Override - public @UnknownKeyFor @NonNull @Initialized PCollection expand( - PCollection input) { - return input - .apply("Window", Window.into(CalendarWindows.years(1))) - .apply("Combine", Combine.globally(Count.combineFn()).withoutDefaults()); - } - } - @Test public void testValuesSink() throws Exception { // similar to testSink(), but use values()' interface. From ab68b1ece19e6459c07c537a5a81921670fc6570 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 8 Dec 2023 12:19:25 -0500 Subject: [PATCH 02/16] fix error handling to solve constant filenaming policy returning a null destination --- .../java/org/apache/beam/sdk/io/FileIO.java | 18 +- .../org/apache/beam/sdk/io/WriteFiles.java | 203 ++++++++++++------ .../apache/beam/sdk/io/WriteFilesTest.java | 23 +- .../errorhandling/ErrorHandlingTestUtils.java | 18 +- 4 files changed, 180 insertions(+), 82 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index d47e14571c834..c48e13a7ade4d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -1018,7 +1018,7 @@ public static FileNaming relativeFileNaming( abstract boolean getNoSpilling(); - abstract @Nullable ErrorHandler getBadRecordErrorHandler(); + abstract @Nullable ErrorHandler getBadRecordErrorHandler(); abstract @Nullable SerializableFunction getBadRecordMatcher(); @@ -1069,10 +1069,10 @@ abstract Builder setSharding( abstract Builder setNoSpilling(boolean noSpilling); abstract Builder setBadRecordErrorHandler( - @Nullable ErrorHandler badRecordErrorHandler); + @Nullable ErrorHandler badRecordErrorHandler); abstract Builder setBadRecordMatcher( - @Nullable SerializableFunction badRecordMatcher); + @Nullable SerializableFunction badRecordMatcher); abstract Write build(); } @@ -1301,8 +1301,13 @@ public Write withNoSpilling() { } /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ - public Write withBadRecordErrorHandler(ErrorHandler errorHandler, SerializableFunction badRecordMatcher) { - return toBuilder().setBadRecordErrorHandler(errorHandler).setBadRecordMatcher(badRecordMatcher).build(); + public Write withBadRecordErrorHandler( + ErrorHandler errorHandler, + SerializableFunction badRecordMatcher) { + return toBuilder() + .setBadRecordErrorHandler(errorHandler) + .setBadRecordMatcher(badRecordMatcher) + .build(); } @VisibleForTesting @@ -1409,7 +1414,8 @@ public WriteFilesResult expand(PCollection input) { writeFiles = writeFiles.withNoSpilling(); } if (getBadRecordErrorHandler() != null) { - writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler(),getBadRecordMatcher()); + writeFiles = + writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler(), getBadRecordMatcher()); } return input.apply(writeFiles); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index c33051bdd8dc3..6a133f9b8855b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -199,7 +199,7 @@ public static WriteFiles getShardingFunction(); - public abstract ErrorHandler getBadRecordErrorHandler(); + public abstract ErrorHandler getBadRecordErrorHandler(); public abstract BadRecordRouter getBadRecordRouter(); @@ -232,13 +232,13 @@ abstract Builder setShardingFunction( @Nullable ShardingFunction shardingFunction); abstract Builder setBadRecordErrorHandler( - ErrorHandler badRecordErrorHandler); + ErrorHandler badRecordErrorHandler); abstract Builder setBadRecordRouter( BadRecordRouter badRecordRouter); abstract Builder setBadRecordMatcher( - SerializableFunction badRecordMatcher); + SerializableFunction badRecordMatcher); abstract WriteFiles build(); } @@ -355,8 +355,14 @@ public WriteFiles withSkipIfEmpty() { return toBuilder().setSkipIfEmpty(true).build(); } - public WriteFiles withBadRecordErrorHandler(ErrorHandler errorHandler, SerializableFunction badRecordMatcher) { - return toBuilder().setBadRecordErrorHandler(errorHandler).setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER).setBadRecordMatcher(badRecordMatcher).build(); + public WriteFiles withBadRecordErrorHandler( + ErrorHandler errorHandler, + SerializableFunction badRecordMatcher) { + return toBuilder() + .setBadRecordErrorHandler(errorHandler) + .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) + .setBadRecordMatcher(badRecordMatcher) + .build(); } @Override @@ -525,15 +531,18 @@ private WriteUnshardedBundlesToTempFiles( @Override public PCollection> expand(PCollection input) { TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords"); - TupleTag, UserT>> unwrittenRecordsTag = new TupleTag<>("unwrittenRecords"); + TupleTag, UserT>> unwrittenRecordsTag = + new TupleTag<>("unwrittenRecords"); Coder inputCoder = input.getCoder(); if (getMaxNumWritersPerBundle() < 0) { - PCollectionTuple writeTuple = input - .apply( + PCollectionTuple writeTuple = + input.apply( "WritedUnshardedBundles", ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder, inputCoder)) .withSideInputs(getSideInputs()) - .withOutputTags(writtenRecordsTag, TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); + .withOutputTags( + writtenRecordsTag, + TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); addErrorCollection(writeTuple); return writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); } @@ -541,9 +550,13 @@ public PCollection> expand(PCollection input) { PCollectionTuple writeTuple = input.apply( "WriteUnshardedBundles", - ParDo.of(new WriteUnshardedTempFilesFn(unwrittenRecordsTag, destinationCoder, inputCoder)) + ParDo.of( + new WriteUnshardedTempFilesFn( + unwrittenRecordsTag, destinationCoder, inputCoder)) .withSideInputs(getSideInputs()) - .withOutputTags(writtenRecordsTag, TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); + .withOutputTags( + writtenRecordsTag, + TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); addErrorCollection(writeTuple); PCollection> writtenBundleFiles = @@ -563,11 +576,15 @@ public PCollection> expand(PCollection input) { .apply("GroupUnwritten", GroupByKey.create()) .apply( "WriteUnwritten", - ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())).withSideInputs(getSideInputs()).withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); + ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())) + .withSideInputs(getSideInputs()) + .withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); addErrorCollection(spilledWriteTuple); - PCollection> writtenSpilledFiles = spilledWriteTuple.get(writtenRecordsTag) + PCollection> writtenSpilledFiles = + spilledWriteTuple + .get(writtenRecordsTag) .setCoder(fileResultCoder) .apply( "DropShardNum", @@ -617,7 +634,9 @@ public void startBundle(StartBundleContext c) { } @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver) throws Exception { + public void processElement( + ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver) + throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(c); PaneInfo paneInfo = c.pane(); // If we are doing windowed writes, we need to ensure that we have separate files for @@ -625,10 +644,12 @@ public void processElement(ProcessContext c, BoundedWindow window, MultiOutputRe // destinations go to different writers. // In the case of unwindowed writes, the window and the pane will always be the same, and // the map will only have a single element. - DestinationT destination = getDestinationWithErrorHandling(c.element(), outputReceiver, inputCoder); - if (destination == null) { + MaybeDestination maybeDestination = + getDestinationWithErrorHandling(c.element(), outputReceiver, inputCoder); + if (!maybeDestination.isValid) { return; } + DestinationT destination = maybeDestination.destination; WriterKey key = new WriterKey<>(window, c.pane(), destination); Writer writer = writers.get(key); if (writer == null) { @@ -652,15 +673,19 @@ public void processElement(ProcessContext c, BoundedWindow window, MultiOutputRe } else { spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR; } - outputReceiver.get(unwrittenRecordsTag).output( - KV.of( - ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum), - c.element())); + outputReceiver + .get(unwrittenRecordsTag) + .output( + KV.of( + ShardedKey.of( + hashDestination(destination, destinationCoder), spilledShardNum), + c.element())); return; } } - OutputT formattedRecord = formatRecordWithErrorHandling(c.element(),outputReceiver,inputCoder); - if (formattedRecord == null){ + OutputT formattedRecord = + formatRecordWithErrorHandling(c.element(), outputReceiver, inputCoder); + if (formattedRecord == null) { return; } writeOrClose(writer, formattedRecord); @@ -749,38 +774,62 @@ private static int hashDestination( .asInt(); } + private static class MaybeDestination { + final DestinationT destination; + final boolean isValid; + + MaybeDestination(DestinationT destination, boolean isValid) { + this.destination = destination; + this.isValid = true; + } + } // Utility method to get the dynamic destination based on a record. If the operation fails, and is - // output to the bad record router, this returns null - private @Nullable DestinationT getDestinationWithErrorHandling(UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception{ + // output to the bad record router, this returns null. Returns a MaybeDestination because some + // implementations of dynamic destinations return null, despite this being prohibited by the + // interface + private MaybeDestination getDestinationWithErrorHandling( + UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception { try { - return getDynamicDestinations().getDestination(input); + return new MaybeDestination<>(getDynamicDestinations().getDestination(input), true); } catch (Exception e) { - if (getBadRecordMatcher().apply(e)){ - getBadRecordRouter().route(outputReceiver,input,inputCoder,e,"Unable to get dynamic destination for record"); - return null; + if (getBadRecordMatcher().apply(e)) { + getBadRecordRouter() + .route( + outputReceiver, + input, + inputCoder, + e, + "Unable to get dynamic destination for record"); + return new MaybeDestination<>(null, false); } else { throw e; } } } - private void addErrorCollection(PCollectionTuple sourceTuple){ + private void addErrorCollection(PCollectionTuple sourceTuple) { getBadRecordErrorHandler() .addErrorCollection( sourceTuple .get(BAD_RECORD_TAG) .setCoder(BadRecord.getCoder(sourceTuple.getPipeline()))); - } // Utility method to format a record based on the dynamic destination. If the operation fails, and // is output to the bad record router, this returns null - private @Nullable OutputT formatRecordWithErrorHandling(UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception{ + private @Nullable OutputT formatRecordWithErrorHandling( + UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception { try { return getDynamicDestinations().formatRecord(input); } catch (Exception e) { - if (getBadRecordMatcher().apply(e)){ - getBadRecordRouter().route(outputReceiver,input,inputCoder,e,"Unable to format record for Dynamic Destination"); + if (getBadRecordMatcher().apply(e)) { + getBadRecordRouter() + .route( + outputReceiver, + input, + inputCoder, + e, + "Unable to format record for Dynamic Destination"); return null; } else { throw e; @@ -818,19 +867,26 @@ public PCollection> expand(PCollection input) { TupleTag, UserT>> shardedRecords = new TupleTag<>("shardedRecords"); TupleTag> writtenRecordsTag = new TupleTag<>("writtenRecords"); - PCollectionTuple shardedFiles = input - .apply( + PCollectionTuple shardedFiles = + input.apply( "ApplyShardingKey", - ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView, input.getCoder())) - .withSideInputs(shardingSideInputs).withOutputTags(shardedRecords, TupleTagList.of(BAD_RECORD_TAG))); + ParDo.of( + new ApplyShardingFunctionFn( + shardingFunction, numShardsView, input.getCoder())) + .withSideInputs(shardingSideInputs) + .withOutputTags(shardedRecords, TupleTagList.of(BAD_RECORD_TAG))); addErrorCollection(shardedFiles); - PCollectionTuple writtenFiles = shardedFiles.get(shardedRecords) - .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) - .apply("GroupIntoShards", GroupByKey.create()) - .apply( - "WriteShardsIntoTempFiles", - ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())).withSideInputs(getSideInputs()).withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); + PCollectionTuple writtenFiles = + shardedFiles + .get(shardedRecords) + .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) + .apply("GroupIntoShards", GroupByKey.create()) + .apply( + "WriteShardsIntoTempFiles", + ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())) + .withSideInputs(getSideInputs()) + .withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); addErrorCollection(writtenFiles); return writtenFiles.get(writtenRecordsTag).setCoder(fileResultCoder); @@ -858,30 +914,37 @@ public PCollection>> expand(PCollection inp // // TODO(https://github.com/apache/beam/issues/20928): The implementation doesn't currently // work with merging windows. - TupleTag> shardTag = new TupleTag<>("shardTag"); + TupleTag> shardTag = new TupleTag<>("shardTag"); PCollectionTuple shardedElements = - input - .apply( - "KeyedByDestinationHash", - ParDo.of( + input.apply( + "KeyedByDestinationHash", + ParDo.of( new DoFn>() { @ProcessElement - public void processElement(@Element UserT element, ProcessContext context, MultiOutputReceiver outputReceiver) + public void processElement( + @Element UserT element, + ProcessContext context, + MultiOutputReceiver outputReceiver) throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(context); - DestinationT destination = getDestinationWithErrorHandling(context.element(), outputReceiver, input.getCoder()); - if (destination == null) { + MaybeDestination maybeDestination = + getDestinationWithErrorHandling( + context.element(), outputReceiver, input.getCoder()); + if (!maybeDestination.isValid) { return; } + DestinationT destination = maybeDestination.destination; context.output( KV.of(hashDestination(destination, destinationCoder), element)); } - }).withOutputTags(shardTag, TupleTagList.of(BAD_RECORD_TAG))); + }) + .withOutputTags(shardTag, TupleTagList.of(BAD_RECORD_TAG))); addErrorCollection(shardedElements); PCollection, Iterable>> shardedInput = - shardedElements.get(shardTag) + shardedElements + .get(shardTag) .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder())) .apply( "ShardAndBatch", @@ -920,11 +983,15 @@ public KV, Iterable> apply( ShardedKeyCoder.of(VarIntCoder.of()), IterableCoder.of(input.getCoder()))) .apply( "WriteShardsIntoTempFiles", - ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())).withSideInputs(getSideInputs()).withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); + ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder())) + .withSideInputs(getSideInputs()) + .withOutputTags(writtenRecordsTag, TupleTagList.of(BAD_RECORD_TAG))); addErrorCollection(writtenFiles); - PCollection> tempFiles = writtenFiles.get(writtenRecordsTag) + PCollection> tempFiles = + writtenFiles + .get(writtenRecordsTag) .setCoder(fileResultCoder) .apply( "DropShardNum", @@ -1023,7 +1090,8 @@ private class ApplyShardingFunctionFn extends DoFn } @ProcessElement - public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) throws Exception { + public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) + throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(context); final int shardCount; if (numShardsView != null) { @@ -1039,10 +1107,12 @@ public void processElement(ProcessContext context, MultiOutputReceiver outputRec + " Got %s", shardCount); - DestinationT destination = getDestinationWithErrorHandling(context.element(), outputReceiver, inputCoder); - if (destination == null) { + MaybeDestination maybeDestination = + getDestinationWithErrorHandling(context.element(), outputReceiver, inputCoder); + if (!maybeDestination.isValid) { return; } + DestinationT destination = maybeDestination.destination; ShardedKey shardKey = shardingFn.assignShardKey(destination, context.element(), shardCount); context.output(KV.of(shardKey, context.element())); @@ -1057,6 +1127,7 @@ private class WriteShardsIntoTempFilesFn public WriteShardsIntoTempFilesFn(Coder inputCoder) { this.inputCoder = inputCoder; } + private transient List> closeFutures = new ArrayList<>(); private transient List>> deferredOutput = new ArrayList<>(); @@ -1070,17 +1141,21 @@ private void readObject(java.io.ObjectInputStream in) } @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver) throws Exception { + public void processElement( + ProcessContext c, BoundedWindow window, MultiOutputReceiver outputReceiver) + throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(c); // Since we key by a 32-bit hash of the destination, there might be multiple destinations // in this iterable. The number of destinations is generally very small (1000s or less), so // there will rarely be hash collisions. Map> writers = Maps.newHashMap(); for (UserT input : c.element().getValue()) { - DestinationT destination = getDestinationWithErrorHandling(input, outputReceiver, inputCoder); - if (destination == null) { - continue; + MaybeDestination maybeDestination = + getDestinationWithErrorHandling(input, outputReceiver, inputCoder); + if (!maybeDestination.isValid) { + return; } + DestinationT destination = maybeDestination.destination; Writer writer = writers.get(destination); if (writer == null) { String uuid = UUID.randomUUID().toString(); @@ -1096,8 +1171,8 @@ public void processElement(ProcessContext c, BoundedWindow window, MultiOutputRe writers.put(destination, writer); } - OutputT formattedRecord = formatRecordWithErrorHandling(input,outputReceiver,inputCoder); - if (formattedRecord == null){ + OutputT formattedRecord = formatRecordWithErrorHandling(input, outputReceiver, inputCoder); + if (formattedRecord == null) { return; } writeOrClose(writer, formattedRecord); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index a46ad52550efb..ff1330cb98b42 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -79,9 +79,7 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; -import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -639,7 +637,6 @@ private void testDynamicDestinationsHelper(boolean bounded, boolean emptyShards) } } - // Test FailingDynamicDestinations class. Expects user values to be string-encoded integers. // Throws exceptions when trying to format records or get destinations based on the mod // of the element @@ -684,24 +681,25 @@ public FilenamePolicy getFilenamePolicy(Integer destination) { @Test @Category(NeedsRunner.class) public void testFailingDynamicDestinationsBounded() throws Exception { - testFailingDynamicDestinationsHelper(true,false); + testFailingDynamicDestinationsHelper(true, false); } @Test @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) public void testFailingDynamicDestinationsUnbounded() throws Exception { - testFailingDynamicDestinationsHelper(false,false); + testFailingDynamicDestinationsHelper(false, false); } @Test @Category({NeedsRunner.class, UsesUnboundedPCollections.class}) public void testFailingDynamicDestinationsAutosharding() throws Exception { - testFailingDynamicDestinationsHelper(false,true); + testFailingDynamicDestinationsHelper(false, true); } private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autosharding) throws IOException { - FailingTestDestinations dynamicDestinations = new FailingTestDestinations(getBaseOutputDirectory()); + FailingTestDestinations dynamicDestinations = + new FailingTestDestinations(getBaseOutputDirectory()); SimpleSink sink = new SimpleSink<>(getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED); @@ -715,7 +713,7 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos List inputs = Lists.newArrayList(); for (int i = 0; i < numInputs; ++i) { inputs.add(Integer.toString(i)); - if(i % 2 != 0 && i % 3 != 0){ + if (i % 2 != 0 && i % 3 != 0) { expectedFailures++; } } @@ -725,9 +723,13 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos timestamps.add(i + 1); } - BadRecordErrorHandler> errorHandler = p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + BadRecordErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); int numShards = autosharding ? 0 : 2; - WriteFiles writeFiles = WriteFiles.to(sink).withNumShards(numShards).withBadRecordErrorHandler(errorHandler, (e) -> true); + WriteFiles writeFiles = + WriteFiles.to(sink) + .withNumShards(numShards) + .withBadRecordErrorHandler(errorHandler, (e) -> true); errorHandler.close(); PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(expectedFailures); @@ -757,7 +759,6 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos Optional.of(numShards), bounded /* expectRemovedTempDirectory */); } - } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java index ce01ec881d6f1..1de2891ccb861 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.transforms.errorhandling; import org.apache.beam.sdk.transforms.Combine; @@ -22,5 +39,4 @@ public static class ErrorSinkTransform .apply("Combine", Combine.globally(Count.combineFn()).withoutDefaults()); } } - } From 3d76ccddb34721d2df22db2b8e36a0b5ce7dd5d9 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Mon, 11 Dec 2023 12:11:19 -0500 Subject: [PATCH 03/16] fix tests, add a safety check to the error handler --- .../org/apache/beam/sdk/io/WriteFiles.java | 83 +++++++++++-------- .../errorhandling/ErrorHandler.java | 3 + .../apache/beam/sdk/io/WriteFilesTest.java | 22 +++-- .../errorhandling/ErrorHandlingTestUtils.java | 14 +++- 4 files changed, 75 insertions(+), 47 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 6a133f9b8855b..7c0ce22638b2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -544,6 +544,8 @@ public PCollection> expand(PCollection input) { writtenRecordsTag, TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); addErrorCollection(writeTuple); + writeTuple.get(unwrittenRecordsTag) + .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())); return writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); } @@ -780,13 +782,12 @@ private static class MaybeDestination { MaybeDestination(DestinationT destination, boolean isValid) { this.destination = destination; - this.isValid = true; + this.isValid = isValid; } } - // Utility method to get the dynamic destination based on a record. If the operation fails, and is - // output to the bad record router, this returns null. Returns a MaybeDestination because some - // implementations of dynamic destinations return null, despite this being prohibited by the - // interface + // Utility method to get the dynamic destination based on a record. Returns a MaybeDestination + // because some implementations of dynamic destinations return null, despite this being prohibited + // by the interface private MaybeDestination getDestinationWithErrorHandling( UserT input, MultiOutputReceiver outputReceiver, Coder inputCoder) throws Exception { try { @@ -807,14 +808,6 @@ private MaybeDestination getDestinationWithErrorHandling( } } - private void addErrorCollection(PCollectionTuple sourceTuple) { - getBadRecordErrorHandler() - .addErrorCollection( - sourceTuple - .get(BAD_RECORD_TAG) - .setCoder(BadRecord.getCoder(sourceTuple.getPipeline()))); - } - // Utility method to format a record based on the dynamic destination. If the operation fails, and // is output to the bad record router, this returns null private @Nullable OutputT formatRecordWithErrorHandling( @@ -837,6 +830,14 @@ private void addErrorCollection(PCollectionTuple sourceTuple) { } } + private void addErrorCollection(PCollectionTuple sourceTuple) { + getBadRecordErrorHandler() + .addErrorCollection( + sourceTuple + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(sourceTuple.getPipeline()))); + } + private class WriteShardedBundlesToTempFiles extends PTransform, PCollection>> { private final Coder destinationCoder; @@ -919,26 +920,7 @@ public PCollection>> expand(PCollection inp PCollectionTuple shardedElements = input.apply( "KeyedByDestinationHash", - ParDo.of( - new DoFn>() { - @ProcessElement - public void processElement( - @Element UserT element, - ProcessContext context, - MultiOutputReceiver outputReceiver) - throws Exception { - getDynamicDestinations().setSideInputAccessorFromProcessContext(context); - MaybeDestination maybeDestination = - getDestinationWithErrorHandling( - context.element(), outputReceiver, input.getCoder()); - if (!maybeDestination.isValid) { - return; - } - DestinationT destination = maybeDestination.destination; - context.output( - KV.of(hashDestination(destination, destinationCoder), element)); - } - }) + ParDo.of(new KeyByDestinationHash(input.getCoder(), destinationCoder)) .withOutputTags(shardTag, TupleTagList.of(BAD_RECORD_TAG))); addErrorCollection(shardedElements); @@ -1040,6 +1022,37 @@ public void processElement( } } + private class KeyByDestinationHash extends DoFn> { + + private final Coder inputCoder; + + private final Coder destinationCoder; + + public KeyByDestinationHash(Coder inputCoder, Coder destinationCoder) { + this.inputCoder = inputCoder; + this.destinationCoder = destinationCoder; + } + + @ProcessElement + public void processElement( + @Element UserT element, + ProcessContext context, + MultiOutputReceiver outputReceiver) + throws Exception { + getDynamicDestinations().setSideInputAccessorFromProcessContext(context); + MaybeDestination maybeDestination = + getDestinationWithErrorHandling( + context.element(), outputReceiver, inputCoder); + if (!maybeDestination.isValid) { + return; + } + DestinationT destination = maybeDestination.destination; + context.output( + KV.of(hashDestination(destination, destinationCoder), element)); + } + } + + private class RandomShardingFunction implements ShardingFunction { private final Coder destinationCoder; @@ -1153,7 +1166,7 @@ public void processElement( MaybeDestination maybeDestination = getDestinationWithErrorHandling(input, outputReceiver, inputCoder); if (!maybeDestination.isValid) { - return; + continue; } DestinationT destination = maybeDestination.destination; Writer writer = writers.get(destination); @@ -1173,7 +1186,7 @@ public void processElement( OutputT formattedRecord = formatRecordWithErrorHandling(input, outputReceiver, inputCoder); if (formattedRecord == null) { - return; + continue; } writeOrClose(writer, formattedRecord); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java index e02965b72022e..f93a409993b96 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java @@ -119,6 +119,9 @@ private void readObject(ObjectInputStream aInputStream) @Override public void addErrorCollection(PCollection errorCollection) { + if (isClosed()) { + throw new IllegalStateException("Error collections cannot be added after Error Handler is closed"); + } errorCollections.add(errorCollection); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index ff1330cb98b42..1b44e799a91ab 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -650,7 +650,8 @@ static class FailingTestDestinations extends DynamicDestinations inputs = Lists.newArrayList(); for (int i = 0; i < numInputs; ++i) { inputs.add(Integer.toString(i)); - if (i % 2 != 0 && i % 3 != 0) { + if (i % 7 == 0 || i % 3 == 0) { expectedFailures++; } } @@ -730,9 +732,6 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos WriteFiles.to(sink) .withNumShards(numShards) .withBadRecordErrorHandler(errorHandler, (e) -> true); - errorHandler.close(); - - PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(expectedFailures); PCollection input = p.apply(Create.timestamped(inputs, timestamps)); WriteFilesResult res; @@ -743,6 +742,11 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos } else { res = input.apply(writeFiles); } + + errorHandler.close(); + + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(expectedFailures); + res.getPerDestinationOutputFilenames().apply(new VerifyFilesExist<>()); p.run(); @@ -751,12 +755,14 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE); List expected = Lists.newArrayList(); for (int j = i; j < numInputs; j += 5) { - expected.add("record_" + j); + if (j % 3 != 0 && j % 7 != 0) { + expected.add("record_" + j); + } } checkFileContents( base.toString(), expected, - Optional.of(numShards), + Optional.fromNullable(autosharding ? null : numShards), bounded /* expectRemovedTempDirectory */); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java index 1de2891ccb861..41367765b9208 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java @@ -20,12 +20,14 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.CalendarWindows; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.joda.time.Duration; public class ErrorHandlingTestUtils { public static class ErrorSinkTransform @@ -34,9 +36,13 @@ public static class ErrorSinkTransform @Override public @UnknownKeyFor @NonNull @Initialized PCollection expand( PCollection input) { - return input - .apply("Window", Window.into(CalendarWindows.years(1))) - .apply("Combine", Combine.globally(Count.combineFn()).withoutDefaults()); + if (input.isBounded() == IsBounded.BOUNDED) { + return input.apply("Combine", Combine.globally(Count.combineFn())); + } else { + return input + .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1)))) + .apply("Combine", Combine.globally(Count.combineFn()).withoutDefaults()); + } } } } From abde5a340da2ae0c2157f99d3de2af923e1e2773 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Mon, 11 Dec 2023 12:34:33 -0500 Subject: [PATCH 04/16] spotless --- .../org/apache/beam/sdk/io/WriteFiles.java | 30 ++++++++----------- .../errorhandling/ErrorHandler.java | 3 +- .../apache/beam/sdk/io/WriteFilesTest.java | 4 +-- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 7c0ce22638b2a..b33b66c541a26 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -544,7 +544,8 @@ public PCollection> expand(PCollection input) { writtenRecordsTag, TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG)))); addErrorCollection(writeTuple); - writeTuple.get(unwrittenRecordsTag) + writeTuple + .get(unwrittenRecordsTag) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())); return writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); } @@ -1034,25 +1035,20 @@ public KeyByDestinationHash(Coder inputCoder, Coder destina } @ProcessElement - public void processElement( - @Element UserT element, - ProcessContext context, - MultiOutputReceiver outputReceiver) - throws Exception { - getDynamicDestinations().setSideInputAccessorFromProcessContext(context); - MaybeDestination maybeDestination = - getDestinationWithErrorHandling( - context.element(), outputReceiver, inputCoder); - if (!maybeDestination.isValid) { - return; - } - DestinationT destination = maybeDestination.destination; - context.output( - KV.of(hashDestination(destination, destinationCoder), element)); + public void processElement( + @Element UserT element, ProcessContext context, MultiOutputReceiver outputReceiver) + throws Exception { + getDynamicDestinations().setSideInputAccessorFromProcessContext(context); + MaybeDestination maybeDestination = + getDestinationWithErrorHandling(context.element(), outputReceiver, inputCoder); + if (!maybeDestination.isValid) { + return; } + DestinationT destination = maybeDestination.destination; + context.output(KV.of(hashDestination(destination, destinationCoder), element)); + } } - private class RandomShardingFunction implements ShardingFunction { private final Coder destinationCoder; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java index f93a409993b96..cf040470d608b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java @@ -120,7 +120,8 @@ private void readObject(ObjectInputStream aInputStream) @Override public void addErrorCollection(PCollection errorCollection) { if (isClosed()) { - throw new IllegalStateException("Error collections cannot be added after Error Handler is closed"); + throw new IllegalStateException( + "Error collections cannot be added after Error Handler is closed"); } errorCollections.add(errorCollection); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 1b44e799a91ab..8437ac030910d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -650,7 +650,7 @@ static class FailingTestDestinations extends DynamicDestinations Date: Tue, 12 Dec 2023 11:49:34 -0500 Subject: [PATCH 05/16] add documentation --- .../java/org/apache/beam/sdk/io/FileIO.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index c48e13a7ade4d..2aef300f43d23 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -238,6 +238,29 @@ * destination-dependent: every window/pane for every destination will use the same number of shards * specified via {@link Write#withNumShards} or {@link Write#withSharding}. * + *

Handling Errors

+ * + *

When using dynamic destinations, or when using a formatting function to format a record for + * writing, it's possible for an individual record to be malformed, causing an exception. By default, + * these exceptions are propagated to the runner, and are usually retried, though this depends on + * the runner. Alternately, these errors can be routed to another {@link PTransform} by using + * {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The ErrorHandler + * is registered with the pipeline (see below), and the SerializableFunction lets you filter which + * exceptions should be sent to the error handler, and which should be handled by the runner. See + * {@link ErrorHandler} for more documentation. Of note, this error handling only handles errors + * related to specific records. It does not handle errors related to connectivity, authorization, + * etc. as those should be retried by the runner.

+ * + *
{@code
+ * PCollection<> records = ...;
+ * PTransform,?> alternateSink = ...;
+ * try (BadRecordErrorHandler handler = pipeline.registerBadRecordErrorHandler(alternateSink) {
+ *    records.apply("Write", FileIO.writeDynamic().otherConfigs()
+ *        .withBadRecordErrorHandler(handler, (exception) -> true));
+ * }
+ * }
+ * + * *

Writing custom types to sinks

* *

Normally, when writing a collection of a custom type using a {@link Sink} that takes a @@ -1300,6 +1323,12 @@ public Write withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + public Write withBadRecordErrorHandler( + ErrorHandler errorHandler) { + return withBadRecordErrorHandler(errorHandler, (e) -> true); + } + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ public Write withBadRecordErrorHandler( ErrorHandler errorHandler, From c61243d4153712b448c1cb049cac8336a7145d0e Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 12 Dec 2023 15:30:58 -0500 Subject: [PATCH 06/16] add textio error handler pass-through --- .../java/org/apache/beam/sdk/io/FileIO.java | 19 +++++----- .../java/org/apache/beam/sdk/io/TextIO.java | 35 +++++++++++++++++++ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 2aef300f43d23..88b0dcadf1b8d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -241,15 +241,15 @@ *

Handling Errors

* *

When using dynamic destinations, or when using a formatting function to format a record for - * writing, it's possible for an individual record to be malformed, causing an exception. By default, - * these exceptions are propagated to the runner, and are usually retried, though this depends on - * the runner. Alternately, these errors can be routed to another {@link PTransform} by using - * {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The ErrorHandler - * is registered with the pipeline (see below), and the SerializableFunction lets you filter which - * exceptions should be sent to the error handler, and which should be handled by the runner. See - * {@link ErrorHandler} for more documentation. Of note, this error handling only handles errors - * related to specific records. It does not handle errors related to connectivity, authorization, - * etc. as those should be retried by the runner.

+ * writing, it's possible for an individual record to be malformed, causing an exception. By + * default, these exceptions are propagated to the runner, and are usually retried, though this + * depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by + * using {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The + * ErrorHandler is registered with the pipeline (see below), and the SerializableFunction lets you + * filter which exceptions should be sent to the error handler, and which should be handled by the + * runner. See {@link ErrorHandler} for more documentation. Of note, this error handling only + * handles errors related to specific records. It does not handle errors related to connectivity, + * authorization, etc. as those should be retried by the runner. * *
{@code
  * PCollection<> records = ...;
@@ -260,7 +260,6 @@
  * }
  * }
* - * *

Writing custom types to sinks

* *

Normally, when writing a collection of a custom type using a {@link Sink} that takes a diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 2c7a4fc5d4f5c..fb3b746b87a7b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -51,6 +51,8 @@ import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -176,6 +178,10 @@ * *

For backwards compatibility, {@link TextIO} also supports the legacy {@link * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}. + * + *

Error handling for records that are malformed can be handled by using {@link + * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in + * {@link FileIO} for details on usage */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -708,6 +714,10 @@ public abstract static class TypedWrite */ abstract WritableByteChannelFactory getWritableByteChannelFactory(); + abstract @Nullable ErrorHandler getBadRecordErrorHandler(); + + abstract @Nullable SerializableFunction getBadRecordMatcher(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -754,6 +764,12 @@ abstract Builder setNumShards( abstract Builder setWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory); + abstract Builder setBadRecordErrorHandler( + @Nullable ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordMatcher( + @Nullable SerializableFunction badRecordMatcher); + abstract TypedWrite build(); } @@ -993,6 +1009,22 @@ public TypedWrite withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + public TypedWrite withBadRecordErrorHandler( + ErrorHandler errorHandler) { + return withBadRecordErrorHandler(errorHandler, (e) -> true); + } + + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + public TypedWrite withBadRecordErrorHandler( + ErrorHandler errorHandler, + SerializableFunction badRecordMatcher) { + return toBuilder() + .setBadRecordErrorHandler(errorHandler) + .setBadRecordMatcher(badRecordMatcher) + .build(); + } + /** Don't write any output files if the PCollection is empty. */ public TypedWrite skipIfEmpty() { return toBuilder().setSkipIfEmpty(true).build(); @@ -1083,6 +1115,9 @@ public WriteFilesResult expand(PCollection input) { if (getNoSpilling()) { write = write.withNoSpilling(); } + if (getBadRecordErrorHandler() != null) { + write = write.withBadRecordErrorHandler(getBadRecordErrorHandler(), getBadRecordMatcher()); + } if (getSkipIfEmpty()) { write = write.withSkipIfEmpty(); } From de5bd4c39abb3e70958723e1d9f975b3cdf1716e Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Mon, 18 Dec 2023 11:18:46 -0500 Subject: [PATCH 07/16] add avroio error handler pass-through --- .../beam/sdk/extensions/avro/io/AvroIO.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java index a65db5a90bad1..d01cd8cb84bc8 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java @@ -69,6 +69,8 @@ import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -1427,6 +1429,10 @@ public abstract static class TypedWrite abstract AvroSink.@Nullable DatumWriterFactory getDatumWriterFactory(); + abstract @Nullable ErrorHandler getBadRecordErrorHandler(); + + abstract @Nullable SerializableFunction getBadRecordMatcher(); + /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html @@ -1489,6 +1495,12 @@ abstract Builder setDynamicDestinations( abstract Builder setDatumWriterFactory( AvroSink.DatumWriterFactory datumWriterFactory); + abstract Builder setBadRecordErrorHandler( + @Nullable ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordMatcher( + @Nullable SerializableFunction badRecordMatcher); + abstract TypedWrite build(); } @@ -1713,6 +1725,22 @@ public TypedWrite withMetadata(Map return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + public TypedWrite withBadRecordErrorHandler( + ErrorHandler errorHandler) { + return withBadRecordErrorHandler(errorHandler, (e) -> true); + } + + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + public TypedWrite withBadRecordErrorHandler( + ErrorHandler errorHandler, + SerializableFunction badRecordMatcher) { + return toBuilder() + .setBadRecordErrorHandler(errorHandler) + .setBadRecordMatcher(badRecordMatcher) + .build(); + } + DynamicAvroDestinations resolveDynamicDestinations() { DynamicAvroDestinations dynamicDestinations = getDynamicDestinations(); @@ -1782,6 +1810,9 @@ public WriteFilesResult expand(PCollection input) { if (getNoSpilling()) { write = write.withNoSpilling(); } + if (getBadRecordErrorHandler() != null && getBadRecordMatcher() != null) { + write = write.withBadRecordErrorHandler(getBadRecordErrorHandler(), getBadRecordMatcher()); + } return input.apply("Write", write); } From 92948aa1da3b40def5173d2faed9ad41b1d33caa Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Mon, 18 Dec 2023 12:02:16 -0500 Subject: [PATCH 08/16] add documentation to avroio --- .../java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java index d01cd8cb84bc8..3c69a4ae033a2 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java @@ -339,6 +339,10 @@ * events.apply("WriteAvros", AvroIO.writeCustomTypeToGenericRecords() * .to(new UserDynamicAvroDestinations(userToSchemaMap))); * } + * + *

Error handling for writing records that are malformed can be handled by using {@link + * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in + * {@link FileIO} for details on usage */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) From 54f163622ff0af56e5e4969438399292885809ab Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 19 Dec 2023 10:00:21 -0500 Subject: [PATCH 09/16] add documentation to WriteFiles --- .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index b33b66c541a26..5a56590fc8326 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -355,6 +355,15 @@ public WriteFiles withSkipIfEmpty() { return toBuilder().setSkipIfEmpty(true).build(); } + /** + * Configures a new {@link WriteFiles} with an ErrorHandler. For configuring an ErrorHandler, see + * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination is + * performed, and that operation fails, the exception is checked by the passed badRecordMatcher. + * If the matcher returns true, the exception is passed to the error handler. If the matcher + * returns false, the exception is rethrown to be handled by the runner. This is intended to + * handle any errors related to the data of a record, but not any connectivity or IO errors + * related to the literal writing of a record. + */ public WriteFiles withBadRecordErrorHandler( ErrorHandler errorHandler, SerializableFunction badRecordMatcher) { From 2c0356322ed2f18b76a72500a0a3dc99790db510 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 19 Dec 2023 15:46:50 -0500 Subject: [PATCH 10/16] remove function to check if the exception is bad, because that isn't portable --- .../java/org/apache/beam/sdk/io/FileIO.java | 20 +++----------- .../java/org/apache/beam/sdk/io/TextIO.java | 17 ++---------- .../org/apache/beam/sdk/io/WriteFiles.java | 26 +++---------------- .../apache/beam/sdk/io/WriteFilesTest.java | 2 +- .../beam/sdk/extensions/avro/io/AvroIO.java | 20 +++----------- 5 files changed, 14 insertions(+), 71 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 88b0dcadf1b8d..b08772f4673e5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -1042,8 +1042,6 @@ public static FileNaming relativeFileNaming( abstract @Nullable ErrorHandler getBadRecordErrorHandler(); - abstract @Nullable SerializableFunction getBadRecordMatcher(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -1093,9 +1091,6 @@ abstract Builder setSharding( abstract Builder setBadRecordErrorHandler( @Nullable ErrorHandler badRecordErrorHandler); - abstract Builder setBadRecordMatcher( - @Nullable SerializableFunction badRecordMatcher); - abstract Write build(); } @@ -1322,19 +1317,10 @@ public Write withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } - /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ - public Write withBadRecordErrorHandler( - ErrorHandler errorHandler) { - return withBadRecordErrorHandler(errorHandler, (e) -> true); - } - - /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ - public Write withBadRecordErrorHandler( - ErrorHandler errorHandler, - SerializableFunction badRecordMatcher) { + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ + public Write withBadRecordErrorHandler(ErrorHandler errorHandler) { return toBuilder() .setBadRecordErrorHandler(errorHandler) - .setBadRecordMatcher(badRecordMatcher) .build(); } @@ -1443,7 +1429,7 @@ public WriteFilesResult expand(PCollection input) { } if (getBadRecordErrorHandler() != null) { writeFiles = - writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler(), getBadRecordMatcher()); + writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler()); } return input.apply(writeFiles); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index fb3b746b87a7b..8d8d8e4c8bb8d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -716,8 +716,6 @@ public abstract static class TypedWrite abstract @Nullable ErrorHandler getBadRecordErrorHandler(); - abstract @Nullable SerializableFunction getBadRecordMatcher(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -767,9 +765,6 @@ abstract Builder setWritableByteChannelFactory( abstract Builder setBadRecordErrorHandler( @Nullable ErrorHandler badRecordErrorHandler); - abstract Builder setBadRecordMatcher( - @Nullable SerializableFunction badRecordMatcher); - abstract TypedWrite build(); } @@ -1009,19 +1004,11 @@ public TypedWrite withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } - /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ public TypedWrite withBadRecordErrorHandler( ErrorHandler errorHandler) { - return withBadRecordErrorHandler(errorHandler, (e) -> true); - } - - /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ - public TypedWrite withBadRecordErrorHandler( - ErrorHandler errorHandler, - SerializableFunction badRecordMatcher) { return toBuilder() .setBadRecordErrorHandler(errorHandler) - .setBadRecordMatcher(badRecordMatcher) .build(); } @@ -1116,7 +1103,7 @@ public WriteFilesResult expand(PCollection input) { write = write.withNoSpilling(); } if (getBadRecordErrorHandler() != null) { - write = write.withBadRecordErrorHandler(getBadRecordErrorHandler(), getBadRecordMatcher()); + write = write.withBadRecordErrorHandler(getBadRecordErrorHandler()); } if (getSkipIfEmpty()) { write = write.withSkipIfEmpty(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 5a56590fc8326..ac2f40f676549 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -175,7 +175,6 @@ public static WriteFiles()) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) - .setBadRecordMatcher((e) -> true) .build(); } @@ -203,8 +202,6 @@ public static WriteFiles getBadRecordMatcher(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -237,9 +234,6 @@ abstract Builder setBadRecordErrorHandler( abstract Builder setBadRecordRouter( BadRecordRouter badRecordRouter); - abstract Builder setBadRecordMatcher( - SerializableFunction badRecordMatcher); - abstract WriteFiles build(); } @@ -358,19 +352,15 @@ public WriteFiles withSkipIfEmpty() { /** * Configures a new {@link WriteFiles} with an ErrorHandler. For configuring an ErrorHandler, see * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination is - * performed, and that operation fails, the exception is checked by the passed badRecordMatcher. - * If the matcher returns true, the exception is passed to the error handler. If the matcher - * returns false, the exception is rethrown to be handled by the runner. This is intended to - * handle any errors related to the data of a record, but not any connectivity or IO errors - * related to the literal writing of a record. + * performed, and that operation fails, the exception is passed to the error handler. This is + * intended to handle any errors related to the data of a record, but not any connectivity or IO + * errors related to the literal writing of a record. */ public WriteFiles withBadRecordErrorHandler( - ErrorHandler errorHandler, - SerializableFunction badRecordMatcher) { + ErrorHandler errorHandler) { return toBuilder() .setBadRecordErrorHandler(errorHandler) .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) - .setBadRecordMatcher(badRecordMatcher) .build(); } @@ -803,7 +793,6 @@ private MaybeDestination getDestinationWithErrorHandling( try { return new MaybeDestination<>(getDynamicDestinations().getDestination(input), true); } catch (Exception e) { - if (getBadRecordMatcher().apply(e)) { getBadRecordRouter() .route( outputReceiver, @@ -812,9 +801,6 @@ private MaybeDestination getDestinationWithErrorHandling( e, "Unable to get dynamic destination for record"); return new MaybeDestination<>(null, false); - } else { - throw e; - } } } @@ -825,7 +811,6 @@ private MaybeDestination getDestinationWithErrorHandling( try { return getDynamicDestinations().formatRecord(input); } catch (Exception e) { - if (getBadRecordMatcher().apply(e)) { getBadRecordRouter() .route( outputReceiver, @@ -834,9 +819,6 @@ private MaybeDestination getDestinationWithErrorHandling( e, "Unable to format record for Dynamic Destination"); return null; - } else { - throw e; - } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 8437ac030910d..f1cc85ee391a9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -731,7 +731,7 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos WriteFiles writeFiles = WriteFiles.to(sink) .withNumShards(numShards) - .withBadRecordErrorHandler(errorHandler, (e) -> true); + .withBadRecordErrorHandler(errorHandler); PCollection input = p.apply(Create.timestamped(inputs, timestamps)); WriteFilesResult res; diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java index 3c69a4ae033a2..ff2139ee191f5 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java @@ -1435,8 +1435,6 @@ public abstract static class TypedWrite abstract @Nullable ErrorHandler getBadRecordErrorHandler(); - abstract @Nullable SerializableFunction getBadRecordMatcher(); - /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html @@ -1502,9 +1500,6 @@ abstract Builder setDatumWriterFactory( abstract Builder setBadRecordErrorHandler( @Nullable ErrorHandler badRecordErrorHandler); - abstract Builder setBadRecordMatcher( - @Nullable SerializableFunction badRecordMatcher); - abstract TypedWrite build(); } @@ -1729,19 +1724,12 @@ public TypedWrite withMetadata(Map return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } - /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ - public TypedWrite withBadRecordErrorHandler( - ErrorHandler errorHandler) { - return withBadRecordErrorHandler(errorHandler, (e) -> true); - } - /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. */ + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ public TypedWrite withBadRecordErrorHandler( - ErrorHandler errorHandler, - SerializableFunction badRecordMatcher) { + ErrorHandler errorHandler) { return toBuilder() .setBadRecordErrorHandler(errorHandler) - .setBadRecordMatcher(badRecordMatcher) .build(); } @@ -1814,8 +1802,8 @@ public WriteFilesResult expand(PCollection input) { if (getNoSpilling()) { write = write.withNoSpilling(); } - if (getBadRecordErrorHandler() != null && getBadRecordMatcher() != null) { - write = write.withBadRecordErrorHandler(getBadRecordErrorHandler(), getBadRecordMatcher()); + if (getBadRecordErrorHandler() != null) { + write = write.withBadRecordErrorHandler(getBadRecordErrorHandler()); } return input.apply("Write", write); } From a31602c095edb059d6ede4cca0b3b36211ea50cc Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 20 Dec 2023 11:59:11 -0500 Subject: [PATCH 11/16] spotless --- .../java/org/apache/beam/sdk/io/FileIO.java | 10 +++---- .../java/org/apache/beam/sdk/io/TextIO.java | 4 +-- .../org/apache/beam/sdk/io/WriteFiles.java | 29 ++++++++----------- .../apache/beam/sdk/io/WriteFilesTest.java | 4 +-- 4 files changed, 18 insertions(+), 29 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index b08772f4673e5..857ba081bb441 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -1318,10 +1318,9 @@ public Write withNoSpilling() { } /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ - public Write withBadRecordErrorHandler(ErrorHandler errorHandler) { - return toBuilder() - .setBadRecordErrorHandler(errorHandler) - .build(); + public Write withBadRecordErrorHandler( + ErrorHandler errorHandler) { + return toBuilder().setBadRecordErrorHandler(errorHandler).build(); } @VisibleForTesting @@ -1428,8 +1427,7 @@ public WriteFilesResult expand(PCollection input) { writeFiles = writeFiles.withNoSpilling(); } if (getBadRecordErrorHandler() != null) { - writeFiles = - writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler()); + writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler()); } return input.apply(writeFiles); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 8d8d8e4c8bb8d..d73b1d34118c9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -1007,9 +1007,7 @@ public TypedWrite withNoSpilling() { /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ public TypedWrite withBadRecordErrorHandler( ErrorHandler errorHandler) { - return toBuilder() - .setBadRecordErrorHandler(errorHandler) - .build(); + return toBuilder().setBadRecordErrorHandler(errorHandler).build(); } /** Don't write any output files if the PCollection is empty. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index ac2f40f676549..a965c9d46358d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reify; import org.apache.beam.sdk.transforms.Reshuffle; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; @@ -793,14 +792,10 @@ private MaybeDestination getDestinationWithErrorHandling( try { return new MaybeDestination<>(getDynamicDestinations().getDestination(input), true); } catch (Exception e) { - getBadRecordRouter() - .route( - outputReceiver, - input, - inputCoder, - e, - "Unable to get dynamic destination for record"); - return new MaybeDestination<>(null, false); + getBadRecordRouter() + .route( + outputReceiver, input, inputCoder, e, "Unable to get dynamic destination for record"); + return new MaybeDestination<>(null, false); } } @@ -811,14 +806,14 @@ private MaybeDestination getDestinationWithErrorHandling( try { return getDynamicDestinations().formatRecord(input); } catch (Exception e) { - getBadRecordRouter() - .route( - outputReceiver, - input, - inputCoder, - e, - "Unable to format record for Dynamic Destination"); - return null; + getBadRecordRouter() + .route( + outputReceiver, + input, + inputCoder, + e, + "Unable to format record for Dynamic Destination"); + return null; } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index f1cc85ee391a9..2db20b92f27fc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -729,9 +729,7 @@ private void testFailingDynamicDestinationsHelper(boolean bounded, boolean autos p.registerBadRecordErrorHandler(new ErrorSinkTransform()); int numShards = autosharding ? 0 : 2; WriteFiles writeFiles = - WriteFiles.to(sink) - .withNumShards(numShards) - .withBadRecordErrorHandler(errorHandler); + WriteFiles.to(sink).withNumShards(numShards).withBadRecordErrorHandler(errorHandler); PCollection input = p.apply(Create.timestamped(inputs, timestamps)); WriteFilesResult res; From c86f821e671b02a2d155ccacef31bfdd529c9905 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 20 Dec 2023 12:38:13 -0500 Subject: [PATCH 12/16] spotless --- .../java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java index ff2139ee191f5..b5e349df876f1 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java @@ -1724,13 +1724,10 @@ public TypedWrite withMetadata(Map return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } - /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ public TypedWrite withBadRecordErrorHandler( ErrorHandler errorHandler) { - return toBuilder() - .setBadRecordErrorHandler(errorHandler) - .build(); + return toBuilder().setBadRecordErrorHandler(errorHandler).build(); } DynamicAvroDestinations resolveDynamicDestinations() { From 215b8c5b96ae8f2bd96c10d5dd999293b9644d6f Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 21 Dec 2023 10:41:27 -0500 Subject: [PATCH 13/16] clean up documentation --- .../src/main/java/org/apache/beam/sdk/io/FileIO.java | 12 +++++------- .../src/main/java/org/apache/beam/sdk/io/TextIO.java | 2 +- .../apache/beam/sdk/extensions/avro/io/AvroIO.java | 2 +- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 857ba081bb441..6722a87cc9472 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -244,19 +244,17 @@ * writing, it's possible for an individual record to be malformed, causing an exception. By * default, these exceptions are propagated to the runner, and are usually retried, though this * depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by - * using {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The - * ErrorHandler is registered with the pipeline (see below), and the SerializableFunction lets you - * filter which exceptions should be sent to the error handler, and which should be handled by the - * runner. See {@link ErrorHandler} for more documentation. Of note, this error handling only - * handles errors related to specific records. It does not handle errors related to connectivity, - * authorization, etc. as those should be retried by the runner. + * using {@link Write#withBadRecordErrorHandler(ErrorHandler)}. The ErrorHandler is registered with + * the pipeline (see below). See {@link ErrorHandler} for more documentation. Of note, this error + * handling only handles errors related to specific records. It does not handle errors related to + * connectivity, authorization, etc. as those should be retried by the runner. * *

{@code
  * PCollection<> records = ...;
  * PTransform,?> alternateSink = ...;
  * try (BadRecordErrorHandler handler = pipeline.registerBadRecordErrorHandler(alternateSink) {
  *    records.apply("Write", FileIO.writeDynamic().otherConfigs()
- *        .withBadRecordErrorHandler(handler, (exception) -> true));
+ *        .withBadRecordErrorHandler(handler));
  * }
  * }
* diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index d73b1d34118c9..4458c7f9e1fe9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -180,7 +180,7 @@ * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}. * *

Error handling for records that are malformed can be handled by using {@link - * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in + * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in * {@link FileIO} for details on usage */ @SuppressWarnings({ diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java index b5e349df876f1..17b76981a14e2 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java @@ -341,7 +341,7 @@ * } * *

Error handling for writing records that are malformed can be handled by using {@link - * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in + * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in * {@link FileIO} for details on usage */ @SuppressWarnings({ From cf9e57cb68ec0186ed2656b07a08d5c5a551f393 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 27 Dec 2023 11:29:20 -0500 Subject: [PATCH 14/16] clean up documentation, remove unnecessary unwritten records tag --- .../java/org/apache/beam/sdk/io/FileIO.java | 21 ++++++++++++------- .../java/org/apache/beam/sdk/io/TextIO.java | 4 +++- .../org/apache/beam/sdk/io/WriteFiles.java | 11 ++-------- .../beam/sdk/extensions/avro/io/AvroIO.java | 4 +++- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 6722a87cc9472..96bf0e0793c23 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -242,12 +242,13 @@ * *

When using dynamic destinations, or when using a formatting function to format a record for * writing, it's possible for an individual record to be malformed, causing an exception. By - * default, these exceptions are propagated to the runner, and are usually retried, though this - * depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by - * using {@link Write#withBadRecordErrorHandler(ErrorHandler)}. The ErrorHandler is registered with - * the pipeline (see below). See {@link ErrorHandler} for more documentation. Of note, this error - * handling only handles errors related to specific records. It does not handle errors related to - * connectivity, authorization, etc. as those should be retried by the runner. + * default, these exceptions are propagated to the runner causing the bundle to fail. These are + * usually retried, though this depends on the runner. Alternately, these errors can be routed to + * another {@link PTransform} by using {@link Write#withBadRecordErrorHandler(ErrorHandler)}. The + * ErrorHandler is registered with the pipeline (see below). See {@link ErrorHandler} for more + * documentation. Of note, this error handling only handles errors related to specific records. It + * does not handle errors related to connectivity, authorization, etc. as those should be retried by + * the runner. * *

{@code
  * PCollection<> records = ...;
@@ -1315,7 +1316,13 @@ public Write withNoSpilling() {
       return toBuilder().setNoSpilling(true).build();
     }
 
-    /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */
+    /**
+     * Configures a new {@link Write} with an ErrorHandler. For configuring an ErrorHandler, see
+     * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination is
+     * performed, and that operation fails, the exception is passed to the error handler. This is
+     * intended to handle any errors related to the data of a record, but not any connectivity or IO
+     * errors related to the literal writing of a record.
+     */
     public Write withBadRecordErrorHandler(
         ErrorHandler errorHandler) {
       return toBuilder().setBadRecordErrorHandler(errorHandler).build();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 4458c7f9e1fe9..dbadb7dd0d480 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -1004,7 +1004,9 @@ public TypedWrite withNoSpilling() {
       return toBuilder().setNoSpilling(true).build();
     }
 
-    /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */
+    /**
+     * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage
+     */
     public TypedWrite withBadRecordErrorHandler(
         ErrorHandler errorHandler) {
       return toBuilder().setBadRecordErrorHandler(errorHandler).build();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index a965c9d46358d..934462799cafe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -349,11 +349,7 @@ public WriteFiles withSkipIfEmpty() {
   }
 
   /**
-   * Configures a new {@link WriteFiles} with an ErrorHandler. For configuring an ErrorHandler, see
-   * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination is
-   * performed, and that operation fails, the exception is passed to the error handler. This is
-   * intended to handle any errors related to the data of a record, but not any connectivity or IO
-   * errors related to the literal writing of a record.
+   * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage
    */
   public WriteFiles withBadRecordErrorHandler(
       ErrorHandler errorHandler) {
@@ -540,11 +536,8 @@ public PCollection> expand(PCollection input) {
                     .withSideInputs(getSideInputs())
                     .withOutputTags(
                         writtenRecordsTag,
-                        TupleTagList.of(ImmutableList.of(unwrittenRecordsTag, BAD_RECORD_TAG))));
+                        TupleTagList.of(ImmutableList.of(BAD_RECORD_TAG))));
         addErrorCollection(writeTuple);
-        writeTuple
-            .get(unwrittenRecordsTag)
-            .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()));
         return writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
       }
 
diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
index 17b76981a14e2..1d138f85b724c 100644
--- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
+++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
@@ -1724,7 +1724,9 @@ public TypedWrite withMetadata(Map
       return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
     }
 
-    /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */
+    /**
+     * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage
+     */
     public TypedWrite withBadRecordErrorHandler(
         ErrorHandler errorHandler) {
       return toBuilder().setBadRecordErrorHandler(errorHandler).build();

From c6283994e823141582679a1509c311b5536159aa Mon Sep 17 00:00:00 2001
From: johnjcasey 
Date: Wed, 27 Dec 2023 12:53:40 -0500
Subject: [PATCH 15/16] spotless

---
 .../core/src/main/java/org/apache/beam/sdk/io/FileIO.java | 4 ++--
 .../core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 8 +++-----
 .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java  | 7 ++-----
 .../org/apache/beam/sdk/extensions/avro/io/AvroIO.java    | 2 +-
 4 files changed, 8 insertions(+), 13 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 96bf0e0793c23..0bc9848772174 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -1318,8 +1318,8 @@ public Write withNoSpilling() {
 
     /**
      * Configures a new {@link Write} with an ErrorHandler. For configuring an ErrorHandler, see
-     * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination is
-     * performed, and that operation fails, the exception is passed to the error handler. This is
+     * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination
+     * is performed, and that operation fails, the exception is passed to the error handler. This is
      * intended to handle any errors related to the data of a record, but not any connectivity or IO
      * errors related to the literal writing of a record.
      */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index dbadb7dd0d480..96635a37fac1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -180,8 +180,8 @@
  * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}.
  *
  * 

Error handling for records that are malformed can be handled by using {@link - * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in - * {@link FileIO} for details on usage + * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in {@link FileIO} for + * details on usage */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -1004,9 +1004,7 @@ public TypedWrite withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } - /** - * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage - */ + /** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */ public TypedWrite withBadRecordErrorHandler( ErrorHandler errorHandler) { return toBuilder().setBadRecordErrorHandler(errorHandler).build(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 934462799cafe..7359141c5b87e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -348,9 +348,7 @@ public WriteFiles withSkipIfEmpty() { return toBuilder().setSkipIfEmpty(true).build(); } - /** - * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage - */ + /** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */ public WriteFiles withBadRecordErrorHandler( ErrorHandler errorHandler) { return toBuilder() @@ -535,8 +533,7 @@ public PCollection> expand(PCollection input) { ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder, inputCoder)) .withSideInputs(getSideInputs()) .withOutputTags( - writtenRecordsTag, - TupleTagList.of(ImmutableList.of(BAD_RECORD_TAG)))); + writtenRecordsTag, TupleTagList.of(ImmutableList.of(BAD_RECORD_TAG)))); addErrorCollection(writeTuple); return writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); } diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java index 1d138f85b724c..48ac2a361dfe1 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java @@ -1725,7 +1725,7 @@ public TypedWrite withMetadata(Map } /** - * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage + * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */ public TypedWrite withBadRecordErrorHandler( ErrorHandler errorHandler) { From 0177d5aff3728c86650f1e770aa57f27f88cd433 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 27 Dec 2023 13:07:25 -0500 Subject: [PATCH 16/16] spotless --- .../org/apache/beam/sdk/extensions/avro/io/AvroIO.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java index 48ac2a361dfe1..2e4939560ad16 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java @@ -341,8 +341,8 @@ * }

* *

Error handling for writing records that are malformed can be handled by using {@link - * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in - * {@link FileIO} for details on usage + * TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in {@link FileIO} for + * details on usage */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -1724,9 +1724,7 @@ public TypedWrite withMetadata(Map return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } - /** - * See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. - */ + /** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */ public TypedWrite withBadRecordErrorHandler( ErrorHandler errorHandler) { return toBuilder().setBadRecordErrorHandler(errorHandler).build();