diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 59d09ae999664..8eae5be177a5d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.StateInitializationContext; import org.joda.time.Duration; @@ -126,7 +127,11 @@ public void initializeState(StateInitializationContext context) throws Exception // this will implicitly be keyed like the StateInternalsFactory TimerInternalsFactory timerInternalsFactory = key -> timerInternals; - executorService = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()); + if (this.executorService == null) { + this.executorService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("flink-sdf-executor-%d").build()); + } ((ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory); ((ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory); @@ -191,10 +196,12 @@ public void close() throws Exception { "The scheduled executor service did not properly terminate. Shutting " + "it down now."); executorService.shutdownNow(); + executorService = null; } } catch (InterruptedException e) { LOG.debug("Could not properly await the termination of the scheduled executor service.", e); executorService.shutdownNow(); + executorService = null; } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java index f31d71374983a..4bd130769150e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItem; @@ -56,6 +58,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.joda.time.Instant; @@ -115,7 +118,10 @@ private static class ProcessFnExtractor implements UserParDoFnFactory.DoFnExtrac private static class SplittableDoFnRunnerFactory< InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> implements DoFnRunnerFactory>, OutputT> { + private final AtomicReference ses = new AtomicReference<>(); + @Override + @SuppressWarnings("nullness") // nullable atomic reference guaranteed nonnull when get public DoFnRunner>, OutputT> createRunner( DoFn>, OutputT> fn, PipelineOptions options, @@ -131,6 +137,13 @@ public DoFnRunner>, OutputT> crea OutputManager outputManager, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping) { + if (this.ses.get() == null) { + this.ses.compareAndSet( + null, + Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors(), + new ThreadFactoryBuilder().setNameFormat("df-sdf-executor-%d").build())); + } ProcessFn processFn = (ProcessFn) fn; processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals()); @@ -162,7 +175,7 @@ public void outputWindowedValue( } }, sideInputReader, - Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), + ses.get(), // Commit at least once every 10 seconds or 10k records. This keeps the watermark // advancing smoothly, and ensures that not too much work will have to be reprocessed // in the event of a crash. diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java index d532b7b6fd91a..2a6ba3e62a78c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.KeyedWorkItem; @@ -49,9 +50,11 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.samza.config.Config; import org.apache.samza.context.Context; import org.apache.samza.operators.Scheduler; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -81,6 +84,7 @@ public class SplittableParDoProcessKeyedElementsOp< private transient SamzaTimerInternalsFactory timerInternalsFactory; private transient DoFnRunner>, OutputT> fnRunner; private transient SamzaPipelineOptions pipelineOptions; + private transient @MonotonicNonNull ScheduledExecutorService ses = null; public SplittableParDoProcessKeyedElementsOp( TupleTag mainOutputTag, @@ -137,6 +141,12 @@ public void open( isBounded, pipelineOptions); + if (this.ses == null) { + this.ses = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("samza-sdf-executor-%d").build()); + } + final KeyedInternals keyedInternals = new KeyedInternals<>(stateInternalsFactory, timerInternalsFactory); @@ -172,7 +182,7 @@ public void outputWindowedValue( } }, NullSideInputReader.empty(), - Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), + ses, 10000, Duration.standardSeconds(10), () -> {