Skip to content

Commit

Permalink
Better and earlier error for apache#30994.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Apr 18, 2024
1 parent a62dfa7 commit 43509a1
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,6 +71,8 @@ public static void main(String[] args) throws Exception {
"%s cannot be main() class with beam_fn_api enabled",
DataflowBatchWorkerHarness.class.getSimpleName());

CoderTranslation.verifyModelCodersRegistered();

JvmInitializers.runBeforeProcessing(pipelineOptions);
batchHarness.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -601,6 +602,8 @@ public static void main(String[] args) throws Exception {
"%s cannot be main() class with beam_fn_api enabled",
StreamingDataflowWorker.class.getSimpleName());

CoderTranslation.verifyModelCodersRegistered();

LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
Expand Down Expand Up @@ -524,6 +525,7 @@ public static <InputT extends PInput, OutputT extends POutput> OutputT applyTran
private final List<ErrorHandler<?, ?>> errorHandlers = new ArrayList<>();

private Pipeline(TransformHierarchy transforms, PipelineOptions options) {
CoderTranslation.verifyModelCodersRegistered();
this.transforms = transforms;
this.defaultOptions = options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,20 @@ private static Coder<?> fromCustomCoder(RunnerApi.Coder protoCoder) throws IOExc
SerializableUtils.deserializeFromByteArray(
protoCoder.getSpec().getPayload().toByteArray(), "Custom Coder Bytes");
}

/**
* Explicitly validate that required coders are registered.
*
* <p>Called early to give avoid significantly more obscure error later if this precondition is
* not satisfied.
*/
public static void verifyModelCodersRegistered() {
for (String urn : new ModelCoderRegistrar().getCoderURNs().values())
if (!getKnownCoderUrns().inverse().containsKey(urn)) {
throw new IllegalStateException(
"Model coder not registered for "
+ urn
+ ". Perhaps this is a fat jar built with missing ServiceLoader entries?");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
Expand Down Expand Up @@ -288,6 +289,7 @@ public static void main(
LOG.info("Fn Harness started");
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
CoderTranslation.verifyModelCodersRegistered();
EnumMap<
BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<InstructionRequest, BeamFnApi.InstructionResponse.Builder>>
Expand Down

0 comments on commit 43509a1

Please sign in to comment.