From 91d3036c0a7e3f400c8c5ac6c794020e69b8e311 Mon Sep 17 00:00:00 2001 From: bbaker Date: Fri, 24 May 2024 11:21:12 +1000 Subject: [PATCH 1/4] This moves the reactive code pout into its own package because DataLoaderHelper is way too big --- .../java/org/dataloader/DataLoaderHelper.java | 271 ++---------------- .../DataLoaderMapEntrySubscriber.java | 104 +++++++ .../reactive/DataLoaderSubscriber.java | 86 ++++++ .../reactive/DataLoaderSubscriberBase.java | 104 +++++++ .../reactive/HelperIntegration.java | 19 ++ 5 files changed, 337 insertions(+), 247 deletions(-) create mode 100644 src/main/java/org/dataloader/reactive/DataLoaderMapEntrySubscriber.java create mode 100644 src/main/java/org/dataloader/reactive/DataLoaderSubscriber.java create mode 100644 src/main/java/org/dataloader/reactive/DataLoaderSubscriberBase.java create mode 100644 src/main/java/org/dataloader/reactive/HelperIntegration.java diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index edbf348..88bc73b 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,7 +3,9 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; -import org.dataloader.impl.DataLoaderAssertionException; +import org.dataloader.reactive.HelperIntegration; +import org.dataloader.reactive.DataLoaderMapEntrySubscriber; +import org.dataloader.reactive.DataLoaderSubscriber; import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; @@ -12,13 +14,11 @@ import org.dataloader.stats.context.IncrementLoadCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -510,7 +510,7 @@ private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoade private CompletableFuture> invokeBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); - Subscriber subscriber = new DataLoaderSubscriber(loadResult, keys, keyContexts, queuedFutures); + Subscriber subscriber = new DataLoaderSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof BatchPublisherWithContext) { @@ -535,9 +535,28 @@ private CompletableFuture> invokeBatchPublisher(List keys, List helperIntegration() { + return new HelperIntegration<>() { + @Override + public StatisticsCollector getStats() { + return stats; + } + + @Override + public void clearCacheView(K key) { + dataLoader.clear(key); + } + + @Override + public void clearCacheEntriesOnExceptions(List keys) { + possiblyClearCacheEntriesOnExceptions(keys); + } + }; + } + private CompletableFuture> invokeMappedBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); - Subscriber> subscriber = new DataLoaderMapEntrySubscriber(loadResult, keys, keyContexts, queuedFutures); + Subscriber> subscriber = new DataLoaderMapEntrySubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchPublisherWithContext) { @@ -625,246 +644,4 @@ private static DispatchResult emptyDispatchResult() { return (DispatchResult) EMPTY_DISPATCH_RESULT; } - /********************************************************************************************** - * ******************************************************************************************** - *

- * The reactive support classes start here - * - * @param for two - ********************************************************************************************** - ********************************************************************************************** - */ - private abstract class DataLoaderSubscriberBase implements Subscriber { - - final CompletableFuture> valuesFuture; - final List keys; - final List callContexts; - final List> queuedFutures; - - List clearCacheKeys = new ArrayList<>(); - List completedValues = new ArrayList<>(); - boolean onErrorCalled = false; - boolean onCompleteCalled = false; - - DataLoaderSubscriberBase( - CompletableFuture> valuesFuture, - List keys, - List callContexts, - List> queuedFutures - ) { - this.valuesFuture = valuesFuture; - this.keys = keys; - this.callContexts = callContexts; - this.queuedFutures = queuedFutures; - } - - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(keys.size()); - } - - @Override - public void onNext(T v) { - assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked."); - assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked."); - } - - @Override - public void onComplete() { - assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked."); - onCompleteCalled = true; - } - - @Override - public void onError(Throwable throwable) { - assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked."); - onErrorCalled = true; - - stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); - } - - /* - * A value has arrived - how do we complete the future that's associated with it in a common way - */ - void onNextValue(K key, V value, Object callContext, List> futures) { - if (value instanceof Try) { - // we allow the batch loader to return a Try so we can better represent a computation - // that might have worked or not. - //noinspection unchecked - Try tryValue = (Try) value; - if (tryValue.isSuccess()) { - futures.forEach(f -> f.complete(tryValue.get())); - } else { - stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); - futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable())); - clearCacheKeys.add(key); - } - } else { - futures.forEach(f -> f.complete(value)); - } - } - - Throwable unwrapThrowable(Throwable ex) { - if (ex instanceof CompletionException) { - ex = ex.getCause(); - } - return ex; - } - } - - private class DataLoaderSubscriber extends DataLoaderSubscriberBase { - - private int idx = 0; - - private DataLoaderSubscriber( - CompletableFuture> valuesFuture, - List keys, - List callContexts, - List> queuedFutures - ) { - super(valuesFuture, keys, callContexts, queuedFutures); - } - - // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee - // correctness (at the cost of speed). - @Override - public synchronized void onNext(V value) { - super.onNext(value); - - if (idx >= keys.size()) { - // hang on they have given us more values than we asked for in keys - // we cant handle this - return; - } - K key = keys.get(idx); - Object callContext = callContexts.get(idx); - CompletableFuture future = queuedFutures.get(idx); - onNextValue(key, value, callContext, List.of(future)); - - completedValues.add(value); - idx++; - } - - - @Override - public synchronized void onComplete() { - super.onComplete(); - if (keys.size() != completedValues.size()) { - // we have more or less values than promised - // we will go through all the outstanding promises and mark those that - // have not finished as failed - for (CompletableFuture queuedFuture : queuedFutures) { - if (!queuedFuture.isDone()) { - queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); - } - } - } - possiblyClearCacheEntriesOnExceptions(clearCacheKeys); - valuesFuture.complete(completedValues); - } - - @Override - public synchronized void onError(Throwable ex) { - super.onError(ex); - ex = unwrapThrowable(ex); - // Set the remaining keys to the exception. - for (int i = idx; i < queuedFutures.size(); i++) { - K key = keys.get(i); - CompletableFuture future = queuedFutures.get(i); - if (! future.isDone()) { - future.completeExceptionally(ex); - // clear any cached view of this key because it failed - dataLoader.clear(key); - } - } - valuesFuture.completeExceptionally(ex); - } - - } - - private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase> { - - private final Map callContextByKey; - private final Map>> queuedFuturesByKey; - private final Map completedValuesByKey = new HashMap<>(); - - - private DataLoaderMapEntrySubscriber( - CompletableFuture> valuesFuture, - List keys, - List callContexts, - List> queuedFutures - ) { - super(valuesFuture, keys, callContexts, queuedFutures); - this.callContextByKey = new HashMap<>(); - this.queuedFuturesByKey = new HashMap<>(); - for (int idx = 0; idx < queuedFutures.size(); idx++) { - K key = keys.get(idx); - Object callContext = callContexts.get(idx); - CompletableFuture queuedFuture = queuedFutures.get(idx); - callContextByKey.put(key, callContext); - queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture); - } - } - - - @Override - public synchronized void onNext(Map.Entry entry) { - super.onNext(entry); - K key = entry.getKey(); - V value = entry.getValue(); - - Object callContext = callContextByKey.get(key); - List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); - - onNextValue(key, value, callContext, futures); - - // did we have an actual key for this value - ignore it if they send us one outside the key set - if (!futures.isEmpty()) { - completedValuesByKey.put(key, value); - } - } - - @Override - public synchronized void onComplete() { - super.onComplete(); - - possiblyClearCacheEntriesOnExceptions(clearCacheKeys); - List values = new ArrayList<>(keys.size()); - for (K key : keys) { - V value = completedValuesByKey.get(key); - values.add(value); - - List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); - for (CompletableFuture future : futures) { - if (! future.isDone()) { - // we have a future that never came back for that key - // but the publisher is done sending in data - it must be null - // e.g. for key X when found no value - future.complete(null); - } - } - } - valuesFuture.complete(values); - } - - @Override - public synchronized void onError(Throwable ex) { - super.onError(ex); - ex = unwrapThrowable(ex); - // Complete the futures for the remaining keys with the exception. - for (int idx = 0; idx < queuedFutures.size(); idx++) { - K key = keys.get(idx); - List> futures = queuedFuturesByKey.get(key); - if (!completedValuesByKey.containsKey(key)) { - for (CompletableFuture future : futures) { - future.completeExceptionally(ex); - } - // clear any cached view of this key because they all failed - dataLoader.clear(key); - } - } - valuesFuture.completeExceptionally(ex); - } - } } diff --git a/src/main/java/org/dataloader/reactive/DataLoaderMapEntrySubscriber.java b/src/main/java/org/dataloader/reactive/DataLoaderMapEntrySubscriber.java new file mode 100644 index 0000000..839a8c6 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/DataLoaderMapEntrySubscriber.java @@ -0,0 +1,104 @@ +package org.dataloader.reactive; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * This class can be used to subscribe to a {@link org.reactivestreams.Publisher} and then + * have the values it receives complete the data loader keys in a map lookup fashion. + *

+ * This is a reactive version of {@link org.dataloader.MappedBatchLoader} + * + * @param the type of keys + * @param the type of values + */ +public class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase> { + + private final Map callContextByKey; + private final Map>> queuedFuturesByKey; + private final Map completedValuesByKey = new HashMap<>(); + + + public DataLoaderMapEntrySubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + HelperIntegration helperIntegration + + ) { + super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + this.callContextByKey = new HashMap<>(); + this.queuedFuturesByKey = new HashMap<>(); + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture queuedFuture = queuedFutures.get(idx); + callContextByKey.put(key, callContext); + queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture); + } + } + + + @Override + public synchronized void onNext(Map.Entry entry) { + super.onNext(entry); + K key = entry.getKey(); + V value = entry.getValue(); + + Object callContext = callContextByKey.get(key); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + + onNextValue(key, value, callContext, futures); + + // did we have an actual key for this value - ignore it if they send us one outside the key set + if (!futures.isEmpty()) { + completedValuesByKey.put(key, value); + } + } + + @Override + public synchronized void onComplete() { + super.onComplete(); + + possiblyClearCacheEntriesOnExceptions(); + List values = new ArrayList<>(keys.size()); + for (K key : keys) { + V value = completedValuesByKey.get(key); + values.add(value); + + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + for (CompletableFuture future : futures) { + if (!future.isDone()) { + // we have a future that never came back for that key + // but the publisher is done sending in data - it must be null + // e.g. for key X when found no value + future.complete(null); + } + } + } + valuesFuture.complete(values); + } + + @Override + public synchronized void onError(Throwable ex) { + super.onError(ex); + ex = unwrapThrowable(ex); + // Complete the futures for the remaining keys with the exception. + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + List> futures = queuedFuturesByKey.get(key); + if (!completedValuesByKey.containsKey(key)) { + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } + // clear any cached view of this key because they all failed + helperIntegration.clearCacheView(key); + } + } + valuesFuture.completeExceptionally(ex); + } +} diff --git a/src/main/java/org/dataloader/reactive/DataLoaderSubscriber.java b/src/main/java/org/dataloader/reactive/DataLoaderSubscriber.java new file mode 100644 index 0000000..a0d3ee6 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/DataLoaderSubscriber.java @@ -0,0 +1,86 @@ +package org.dataloader.reactive; + +import org.dataloader.impl.DataLoaderAssertionException; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * This class can be used to subscribe to a {@link org.reactivestreams.Publisher} and then + * have the values it receives complete the data loader keys. The keys and values must be + * in index order. + *

+ * This is a reactive version of {@link org.dataloader.BatchLoader} + * + * @param the type of keys + * @param the type of values + */ +public class DataLoaderSubscriber extends DataLoaderSubscriberBase { + + private int idx = 0; + + public DataLoaderSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + HelperIntegration helperIntegration + ) { + super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee + // correctness (at the cost of speed). + @Override + public synchronized void onNext(V value) { + super.onNext(value); + + if (idx >= keys.size()) { + // hang on they have given us more values than we asked for in keys + // we cant handle this + return; + } + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + onNextValue(key, value, callContext, List.of(future)); + + completedValues.add(value); + idx++; + } + + + @Override + public synchronized void onComplete() { + super.onComplete(); + if (keys.size() != completedValues.size()) { + // we have more or less values than promised + // we will go through all the outstanding promises and mark those that + // have not finished as failed + for (CompletableFuture queuedFuture : queuedFutures) { + if (!queuedFuture.isDone()) { + queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + } + } + } + possiblyClearCacheEntriesOnExceptions(); + valuesFuture.complete(completedValues); + } + + @Override + public synchronized void onError(Throwable ex) { + super.onError(ex); + ex = unwrapThrowable(ex); + // Set the remaining keys to the exception. + for (int i = idx; i < queuedFutures.size(); i++) { + K key = keys.get(i); + CompletableFuture future = queuedFutures.get(i); + if (!future.isDone()) { + future.completeExceptionally(ex); + // clear any cached view of this key because it failed + helperIntegration.clearCacheView(key); + } + } + valuesFuture.completeExceptionally(ex); + } +} diff --git a/src/main/java/org/dataloader/reactive/DataLoaderSubscriberBase.java b/src/main/java/org/dataloader/reactive/DataLoaderSubscriberBase.java new file mode 100644 index 0000000..e2cb01d --- /dev/null +++ b/src/main/java/org/dataloader/reactive/DataLoaderSubscriberBase.java @@ -0,0 +1,104 @@ +package org.dataloader.reactive; + +import org.dataloader.Try; +import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext; +import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static org.dataloader.impl.Assertions.assertState; + +/** + * The base class for our reactive subscriber support + * + * @param for two + */ +abstract class DataLoaderSubscriberBase implements Subscriber { + + final CompletableFuture> valuesFuture; + final List keys; + final List callContexts; + final List> queuedFutures; + final HelperIntegration helperIntegration; + + List clearCacheKeys = new ArrayList<>(); + List completedValues = new ArrayList<>(); + boolean onErrorCalled = false; + boolean onCompleteCalled = false; + + DataLoaderSubscriberBase( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + HelperIntegration helperIntegration + ) { + this.valuesFuture = valuesFuture; + this.keys = keys; + this.callContexts = callContexts; + this.queuedFutures = queuedFutures; + this.helperIntegration = helperIntegration; + } + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(keys.size()); + } + + @Override + public void onNext(T v) { + assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked."); + assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked."); + } + + @Override + public void onComplete() { + assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked."); + onCompleteCalled = true; + } + + @Override + public void onError(Throwable throwable) { + assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked."); + onErrorCalled = true; + + helperIntegration.getStats().incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); + } + + /* + * A value has arrived - how do we complete the future that's associated with it in a common way + */ + void onNextValue(K key, V value, Object callContext, List> futures) { + if (value instanceof Try) { + // we allow the batch loader to return a Try so we can better represent a computation + // that might have worked or not. + //noinspection unchecked + Try tryValue = (Try) value; + if (tryValue.isSuccess()) { + futures.forEach(f -> f.complete(tryValue.get())); + } else { + helperIntegration.getStats().incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable())); + clearCacheKeys.add(key); + } + } else { + futures.forEach(f -> f.complete(value)); + } + } + + Throwable unwrapThrowable(Throwable ex) { + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + return ex; + } + + void possiblyClearCacheEntriesOnExceptions() { + helperIntegration.clearCacheEntriesOnExceptions(clearCacheKeys); + } +} diff --git a/src/main/java/org/dataloader/reactive/HelperIntegration.java b/src/main/java/org/dataloader/reactive/HelperIntegration.java new file mode 100644 index 0000000..49724cb --- /dev/null +++ b/src/main/java/org/dataloader/reactive/HelperIntegration.java @@ -0,0 +1,19 @@ +package org.dataloader.reactive; + +import org.dataloader.stats.StatisticsCollector; + +import java.util.List; + +/** + * Just some callbacks to the data loader code to do common tasks + * + * @param for keys + */ +public interface HelperIntegration { + + StatisticsCollector getStats(); + + void clearCacheView(K key); + + void clearCacheEntriesOnExceptions(List keys); +} From e98621bb24e25bbfc859b9d7a84b30c407d55794 Mon Sep 17 00:00:00 2001 From: bbaker Date: Fri, 24 May 2024 11:30:42 +1000 Subject: [PATCH 2/4] renamed classes inline with their counterparts --- src/main/java/org/dataloader/DataLoaderHelper.java | 8 ++++---- ...erSubscriberBase.java => AbstractBatchSubscriber.java} | 4 ++-- .../{DataLoaderSubscriber.java => BatchSubscriber.java} | 4 ++-- ...MapEntrySubscriber.java => MappedBatchSubscriber.java} | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) rename src/main/java/org/dataloader/reactive/{DataLoaderSubscriberBase.java => AbstractBatchSubscriber.java} (97%) rename src/main/java/org/dataloader/reactive/{DataLoaderSubscriber.java => BatchSubscriber.java} (96%) rename src/main/java/org/dataloader/reactive/{DataLoaderMapEntrySubscriber.java => MappedBatchSubscriber.java} (96%) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 88bc73b..33833bd 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -4,8 +4,8 @@ import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; import org.dataloader.reactive.HelperIntegration; -import org.dataloader.reactive.DataLoaderMapEntrySubscriber; -import org.dataloader.reactive.DataLoaderSubscriber; +import org.dataloader.reactive.MappedBatchSubscriber; +import org.dataloader.reactive.BatchSubscriber; import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; @@ -510,7 +510,7 @@ private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoade private CompletableFuture> invokeBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); - Subscriber subscriber = new DataLoaderSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + Subscriber subscriber = new BatchSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof BatchPublisherWithContext) { @@ -556,7 +556,7 @@ public void clearCacheEntriesOnExceptions(List keys) { private CompletableFuture> invokeMappedBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); - Subscriber> subscriber = new DataLoaderMapEntrySubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + Subscriber> subscriber = new MappedBatchSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchPublisherWithContext) { diff --git a/src/main/java/org/dataloader/reactive/DataLoaderSubscriberBase.java b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java similarity index 97% rename from src/main/java/org/dataloader/reactive/DataLoaderSubscriberBase.java rename to src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java index e2cb01d..578a33a 100644 --- a/src/main/java/org/dataloader/reactive/DataLoaderSubscriberBase.java +++ b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java @@ -18,7 +18,7 @@ * * @param for two */ -abstract class DataLoaderSubscriberBase implements Subscriber { +abstract class AbstractBatchSubscriber implements Subscriber { final CompletableFuture> valuesFuture; final List keys; @@ -31,7 +31,7 @@ abstract class DataLoaderSubscriberBase implements Subscriber { boolean onErrorCalled = false; boolean onCompleteCalled = false; - DataLoaderSubscriberBase( + AbstractBatchSubscriber( CompletableFuture> valuesFuture, List keys, List callContexts, diff --git a/src/main/java/org/dataloader/reactive/DataLoaderSubscriber.java b/src/main/java/org/dataloader/reactive/BatchSubscriber.java similarity index 96% rename from src/main/java/org/dataloader/reactive/DataLoaderSubscriber.java rename to src/main/java/org/dataloader/reactive/BatchSubscriber.java index a0d3ee6..41b97dd 100644 --- a/src/main/java/org/dataloader/reactive/DataLoaderSubscriber.java +++ b/src/main/java/org/dataloader/reactive/BatchSubscriber.java @@ -15,11 +15,11 @@ * @param the type of keys * @param the type of values */ -public class DataLoaderSubscriber extends DataLoaderSubscriberBase { +public class BatchSubscriber extends AbstractBatchSubscriber { private int idx = 0; - public DataLoaderSubscriber( + public BatchSubscriber( CompletableFuture> valuesFuture, List keys, List callContexts, diff --git a/src/main/java/org/dataloader/reactive/DataLoaderMapEntrySubscriber.java b/src/main/java/org/dataloader/reactive/MappedBatchSubscriber.java similarity index 96% rename from src/main/java/org/dataloader/reactive/DataLoaderMapEntrySubscriber.java rename to src/main/java/org/dataloader/reactive/MappedBatchSubscriber.java index 839a8c6..127061c 100644 --- a/src/main/java/org/dataloader/reactive/DataLoaderMapEntrySubscriber.java +++ b/src/main/java/org/dataloader/reactive/MappedBatchSubscriber.java @@ -15,14 +15,14 @@ * @param the type of keys * @param the type of values */ -public class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase> { +public class MappedBatchSubscriber extends AbstractBatchSubscriber> { private final Map callContextByKey; private final Map>> queuedFuturesByKey; private final Map completedValuesByKey = new HashMap<>(); - public DataLoaderMapEntrySubscriber( + public MappedBatchSubscriber( CompletableFuture> valuesFuture, List keys, List callContexts, From 6523015d12cd73a11e0622cb5e67d0e7e5e7c2e2 Mon Sep 17 00:00:00 2001 From: bbaker Date: Fri, 24 May 2024 11:40:52 +1000 Subject: [PATCH 3/4] made them non public and created a static factory support class --- .../java/org/dataloader/DataLoaderHelper.java | 12 +++-- .../reactive/AbstractBatchSubscriber.java | 4 +- ...bscriber.java => BatchSubscriberImpl.java} | 6 +-- .../reactive/HelperIntegration.java | 19 -------- ...er.java => MappedBatchSubscriberImpl.java} | 7 ++- .../dataloader/reactive/ReactiveSupport.java | 45 +++++++++++++++++++ 6 files changed, 58 insertions(+), 35 deletions(-) rename src/main/java/org/dataloader/reactive/{BatchSubscriber.java => BatchSubscriberImpl.java} (94%) delete mode 100644 src/main/java/org/dataloader/reactive/HelperIntegration.java rename src/main/java/org/dataloader/reactive/{MappedBatchSubscriber.java => MappedBatchSubscriberImpl.java} (95%) create mode 100644 src/main/java/org/dataloader/reactive/ReactiveSupport.java diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 33833bd..14ed9bf 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,9 +3,7 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; -import org.dataloader.reactive.HelperIntegration; -import org.dataloader.reactive.MappedBatchSubscriber; -import org.dataloader.reactive.BatchSubscriber; +import org.dataloader.reactive.ReactiveSupport; import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; @@ -510,7 +508,7 @@ private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoade private CompletableFuture> invokeBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); - Subscriber subscriber = new BatchSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + Subscriber subscriber = ReactiveSupport.batchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof BatchPublisherWithContext) { @@ -535,8 +533,8 @@ private CompletableFuture> invokeBatchPublisher(List keys, List helperIntegration() { - return new HelperIntegration<>() { + private ReactiveSupport.HelperIntegration helperIntegration() { + return new ReactiveSupport.HelperIntegration<>() { @Override public StatisticsCollector getStats() { return stats; @@ -556,7 +554,7 @@ public void clearCacheEntriesOnExceptions(List keys) { private CompletableFuture> invokeMappedBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); - Subscriber> subscriber = new MappedBatchSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + Subscriber> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchPublisherWithContext) { diff --git a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java index 578a33a..c2f5438 100644 --- a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java +++ b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java @@ -24,7 +24,7 @@ abstract class AbstractBatchSubscriber implements Subscriber { final List keys; final List callContexts; final List> queuedFutures; - final HelperIntegration helperIntegration; + final ReactiveSupport.HelperIntegration helperIntegration; List clearCacheKeys = new ArrayList<>(); List completedValues = new ArrayList<>(); @@ -36,7 +36,7 @@ abstract class AbstractBatchSubscriber implements Subscriber { List keys, List callContexts, List> queuedFutures, - HelperIntegration helperIntegration + ReactiveSupport.HelperIntegration helperIntegration ) { this.valuesFuture = valuesFuture; this.keys = keys; diff --git a/src/main/java/org/dataloader/reactive/BatchSubscriber.java b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java similarity index 94% rename from src/main/java/org/dataloader/reactive/BatchSubscriber.java rename to src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java index 41b97dd..d0b8110 100644 --- a/src/main/java/org/dataloader/reactive/BatchSubscriber.java +++ b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java @@ -15,16 +15,16 @@ * @param the type of keys * @param the type of values */ -public class BatchSubscriber extends AbstractBatchSubscriber { +class BatchSubscriberImpl extends AbstractBatchSubscriber { private int idx = 0; - public BatchSubscriber( + BatchSubscriberImpl( CompletableFuture> valuesFuture, List keys, List callContexts, List> queuedFutures, - HelperIntegration helperIntegration + ReactiveSupport.HelperIntegration helperIntegration ) { super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); } diff --git a/src/main/java/org/dataloader/reactive/HelperIntegration.java b/src/main/java/org/dataloader/reactive/HelperIntegration.java deleted file mode 100644 index 49724cb..0000000 --- a/src/main/java/org/dataloader/reactive/HelperIntegration.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.dataloader.reactive; - -import org.dataloader.stats.StatisticsCollector; - -import java.util.List; - -/** - * Just some callbacks to the data loader code to do common tasks - * - * @param for keys - */ -public interface HelperIntegration { - - StatisticsCollector getStats(); - - void clearCacheView(K key); - - void clearCacheEntriesOnExceptions(List keys); -} diff --git a/src/main/java/org/dataloader/reactive/MappedBatchSubscriber.java b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java similarity index 95% rename from src/main/java/org/dataloader/reactive/MappedBatchSubscriber.java rename to src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java index 127061c..d56efa0 100644 --- a/src/main/java/org/dataloader/reactive/MappedBatchSubscriber.java +++ b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java @@ -15,20 +15,19 @@ * @param the type of keys * @param the type of values */ -public class MappedBatchSubscriber extends AbstractBatchSubscriber> { +class MappedBatchSubscriberImpl extends AbstractBatchSubscriber> { private final Map callContextByKey; private final Map>> queuedFuturesByKey; private final Map completedValuesByKey = new HashMap<>(); - public MappedBatchSubscriber( + MappedBatchSubscriberImpl( CompletableFuture> valuesFuture, List keys, List callContexts, List> queuedFutures, - HelperIntegration helperIntegration - + ReactiveSupport.HelperIntegration helperIntegration ) { super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); this.callContextByKey = new HashMap<>(); diff --git a/src/main/java/org/dataloader/reactive/ReactiveSupport.java b/src/main/java/org/dataloader/reactive/ReactiveSupport.java new file mode 100644 index 0000000..fc03bb0 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/ReactiveSupport.java @@ -0,0 +1,45 @@ +package org.dataloader.reactive; + +import org.dataloader.stats.StatisticsCollector; +import org.reactivestreams.Subscriber; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class ReactiveSupport { + + public static Subscriber batchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + return new BatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + public static Subscriber> mappedBatchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + return new MappedBatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + /** + * Just some callbacks to the data loader code to do common tasks + * + * @param for keys + */ + public interface HelperIntegration { + + StatisticsCollector getStats(); + + void clearCacheView(K key); + + void clearCacheEntriesOnExceptions(List keys); + } +} From 170ccf8308cb4e86ed96b7bcd8a77cd6342999b5 Mon Sep 17 00:00:00 2001 From: bbaker Date: Fri, 24 May 2024 11:43:11 +1000 Subject: [PATCH 4/4] reorged method placement --- .../java/org/dataloader/DataLoaderHelper.java | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 14ed9bf..62a7cb6 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -533,25 +533,6 @@ private CompletableFuture> invokeBatchPublisher(List keys, List helperIntegration() { - return new ReactiveSupport.HelperIntegration<>() { - @Override - public StatisticsCollector getStats() { - return stats; - } - - @Override - public void clearCacheView(K key) { - dataLoader.clear(key); - } - - @Override - public void clearCacheEntriesOnExceptions(List keys) { - possiblyClearCacheEntriesOnExceptions(keys); - } - }; - } - private CompletableFuture> invokeMappedBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { CompletableFuture> loadResult = new CompletableFuture<>(); Subscriber> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); @@ -642,4 +623,22 @@ private static DispatchResult emptyDispatchResult() { return (DispatchResult) EMPTY_DISPATCH_RESULT; } + private ReactiveSupport.HelperIntegration helperIntegration() { + return new ReactiveSupport.HelperIntegration<>() { + @Override + public StatisticsCollector getStats() { + return stats; + } + + @Override + public void clearCacheView(K key) { + dataLoader.clear(key); + } + + @Override + public void clearCacheEntriesOnExceptions(List keys) { + possiblyClearCacheEntriesOnExceptions(keys); + } + }; + } }