diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java index 51127c2dc2fa4..a1c93cdc57821 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; import org.slf4j.Logger; @@ -70,6 +71,8 @@ public static void main(String[] args) throws Exception { "%s cannot be main() class with beam_fn_api enabled", DataflowBatchWorkerHarness.class.getSimpleName()); + CoderTranslation.verifyModelCodersRegistered(); + JvmInitializers.runBeforeProcessing(pipelineOptions); batchHarness.run(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 9f822f80a3bdc..82a9ff15b6b89 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -131,6 +131,7 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; @@ -601,6 +602,8 @@ public static void main(String[] args) throws Exception { "%s cannot be main() class with beam_fn_api enabled", StreamingDataflowWorker.class.getSimpleName()); + CoderTranslation.verifyModelCodersRegistered(); + LOG.debug("Creating StreamingDataflowWorker from options: {}", options); StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 9bace292d6ad5..d3b58dd26bd24 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; @@ -524,6 +525,7 @@ public static OutputT applyTran private final List> errorHandlers = new ArrayList<>(); private Pipeline(TransformHierarchy transforms, PipelineOptions options) { + CoderTranslation.verifyModelCodersRegistered(); this.transforms = transforms; this.defaultOptions = options; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java index 5a0d8c3cae259..f7250a0d17972 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java @@ -201,4 +201,20 @@ private static Coder fromCustomCoder(RunnerApi.Coder protoCoder) throws IOExc SerializableUtils.deserializeFromByteArray( protoCoder.getSpec().getPayload().toByteArray(), "Custom Coder Bytes"); } + + /** + * Explicitly validate that required coders are registered. + * + *

Called early to give avoid significantly more obscure error later if this precondition is + * not satisfied. + */ + public static void verifyModelCodersRegistered() { + for (String urn : new ModelCoderRegistrar().getCoderURNs().values()) + if (!getKnownCoderUrns().inverse().containsKey(urn)) { + throw new IllegalStateException( + "Model coder not registered for " + + urn + + ". Perhaps this is a fat jar built with missing ServiceLoader entries?"); + } + } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 3301fd17e1bcf..9df9f12bc52b6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -62,6 +62,7 @@ import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; @@ -288,6 +289,7 @@ public static void main( LOG.info("Fn Harness started"); // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); + CoderTranslation.verifyModelCodersRegistered(); EnumMap< BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction>