Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13015] Update the SDK harness grouping table to be memory bounded based upon the amount of assigned cache memory and to use an LRU eviction policy. #17327

Merged
merged 3 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
*/
public class NullSideInputReader implements SideInputReader {

/** The default empty instance. */
private static final NullSideInputReader EMPTY_INSTANCE = of(Collections.emptySet());

private Set<PCollectionView<?>> views;

public static NullSideInputReader empty() {
return new NullSideInputReader(Collections.emptySet());
return EMPTY_INSTANCE;
}

public static NullSideInputReader of(Iterable<? extends PCollectionView<?>> views) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,26 @@ enum LogLevel {
void setSdkHarnessLogLevelOverrides(SdkHarnessLogLevelOverrides value);

/**
* Size (in MB) of each grouping table used to pre-combine elements. If unset, defaults to 100 MB.
* Size (in MB) of each grouping table used to pre-combine elements. Larger values may reduce the
* amount of data shuffled. If unset, defaults to 100 MB.
*
* <p>CAUTION: If set too large, workers may run into OOM conditions more easily, each worker may
* have many grouping tables in-memory concurrently.
*
* <p>CAUTION: This option does not apply to portable runners such as Dataflow Prime. See {@link
* #setMaxCacheMemoryUsageMb}, {@link #setMaxCacheMemoryUsagePercent}, or {@link
* #setMaxCacheMemoryUsageMbClass} to configure memory thresholds that apply to the grouping table
* and other cached objects.
*/
@Description(
"The size (in MB) of the grouping tables used to pre-combine elements before "
+ "shuffling. Larger values may reduce the amount of data shuffled.")
"The size (in MB) of the grouping tables used to pre-combine elements before shuffling. If "
+ "unset, defaults to 100 MB. Larger values may reduce the amount of data shuffled. "
+ "CAUTION: If set too large, workers may run into OOM conditions more easily, each "
+ "worker may have many grouping tables in-memory concurrently. CAUTION: This option "
+ "does not apply to portable runners such as Dataflow Prime. See "
+ "--maxCacheMemoryUsageMb, --maxCacheMemoryUsagePercent, or "
+ "--maxCacheMemoryUsageMbClass to configure memory thresholds that apply to the "
+ "grouping table and other cached objects.")
@Default.Integer(100)
int getGroupingTableMaxSizeMb();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ public interface Cache<K, V> {
*
* <p>Types should consider implementing {@link org.apache.beam.sdk.util.Weighted} to not invoke
* the overhead of using the {@link Caches#weigh default weigher} multiple times.
*
* <p>This interface may be invoked from any other thread that manipulates the cache causing this
* value to be shrunk. Implementers must ensure thread safety with respect to any side effects
* caused.
*/
@ThreadSafe
@FunctionalInterface
interface Shrinkable<V> {
/**
* Returns a new object that is smaller than the object being evicted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ public final class Caches {
private static final MemoryMeter MEMORY_METER =
MemoryMeter.builder().withGuessing(Guess.BEST).build();

/** The size of a reference. */
public static final long REFERENCE_SIZE = 8;

public static long weigh(Object o) {
if (o == null) {
return 8;
return REFERENCE_SIZE;
}
try {
return MEMORY_METER.measureDeep(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
Expand Down Expand Up @@ -68,40 +69,46 @@ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
}

private static class PrecombineRunner<KeyT, InputT, AccumT> {
private PipelineOptions options;
private CombineFn<InputT, AccumT, ?> combineFn;
private FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output;
private Coder<KeyT> keyCoder;
private GroupingTable<WindowedValue<KeyT>, InputT, AccumT> groupingTable;
private Coder<AccumT> accumCoder;
private final PipelineOptions options;
private final String ptransformId;
private final Supplier<Cache<?, ?>> bundleCache;
private final CombineFn<InputT, AccumT, ?> combineFn;
private final FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output;
private final Coder<KeyT> keyCoder;
private PrecombineGroupingTable<KeyT, InputT, AccumT> groupingTable;

PrecombineRunner(
PipelineOptions options,
String ptransformId,
Supplier<Cache<?, ?>> bundleCache,
CombineFn<InputT, AccumT, ?> combineFn,
FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output,
Coder<KeyT> keyCoder,
Coder<AccumT> accumCoder) {
Coder<KeyT> keyCoder) {
this.options = options;
this.ptransformId = ptransformId;
this.bundleCache = bundleCache;
this.combineFn = combineFn;
this.output = output;
this.keyCoder = keyCoder;
this.accumCoder = accumCoder;
}

void startBundle() {
groupingTable =
PrecombineGroupingTable.combiningAndSampling(
options, combineFn, keyCoder, accumCoder, 0.001 /*sizeEstimatorSampleRate*/);
options,
Caches.subCache(bundleCache.get(), ptransformId),
combineFn,
keyCoder,
0.001 /*sizeEstimatorSampleRate*/);
}

void processElement(WindowedValue<KV<KeyT, InputT>> elem) throws Exception {
groupingTable.put(
elem, (Object outputElem) -> output.accept((WindowedValue<KV<KeyT, AccumT>>) outputElem));
groupingTable.put(elem, output::accept);
}

void finishBundle() throws Exception {
groupingTable.flush(
(Object outputElem) -> output.accept((WindowedValue<KV<KeyT, AccumT>>) outputElem));
groupingTable.flush(output::accept);
groupingTable = null;
}
}

Expand Down Expand Up @@ -144,8 +151,6 @@ public PrecombineRunner<KeyT, InputT, AccumT> createRunnerForPTransform(Context
(CombineFn)
SerializableUtils.deserializeFromByteArray(
combinePayload.getCombineFn().getPayload().toByteArray(), "CombineFn");
Coder<AccumT> accumCoder =
(Coder<AccumT>) rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId());

FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> consumer =
(FnDataReceiver)
Expand All @@ -154,7 +159,12 @@ public PrecombineRunner<KeyT, InputT, AccumT> createRunnerForPTransform(Context

PrecombineRunner<KeyT, InputT, AccumT> runner =
new PrecombineRunner<>(
context.getPipelineOptions(), combineFn, consumer, keyCoder, accumCoder);
context.getPipelineOptions(),
context.getPTransformId(),
context.getBundleCacheSupplier(),
combineFn,
consumer,
keyCoder);

// Register the appropriate handlers.
context.addStartBundleFunction(runner::startBundle);
Expand Down

This file was deleted.

Loading