From d66ed905ae0213206d3dfde1662978c0dd32166f Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Tue, 12 Mar 2024 12:58:26 -0400 Subject: [PATCH 1/7] Add BulkWriter buffer limit --- .../google/cloud/firestore/BulkWriter.java | 75 ++++++++---- .../cloud/firestore/BulkWriterOptions.java | 16 +++ .../cloud/firestore/FirestoreException.java | 10 +- .../cloud/firestore/BulkWriterTest.java | 108 +++++++++++++++++- 4 files changed, 182 insertions(+), 27 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 4c71a3f18..984a0de47 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -29,15 +29,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nonnull; @@ -170,15 +171,13 @@ enum OperationType { * The number of pending operations enqueued on this BulkWriter instance. An operation is * considered pending if BulkWriter has sent it via RPC and is awaiting the result. */ - @GuardedBy("lock") - private int pendingOpsCount = 0; + private AtomicInteger pendingOpsCount = new AtomicInteger(); /** * An array containing buffered BulkWriter operations after the maximum number of pending * operations has been enqueued. */ - @GuardedBy("lock") - private final List bufferedOperations = new ArrayList<>(); + private final BlockingQueue bufferedOperations; /** * The maximum number of pending operations that can be enqueued onto this BulkWriter instance. @@ -235,6 +234,11 @@ enum OperationType { this.successExecutor = MoreExecutors.directExecutor(); this.errorExecutor = MoreExecutors.directExecutor(); this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize); + if (options.getMaxPending() == null) { + this.bufferedOperations = new LinkedBlockingQueue<>(); + } else { + this.bufferedOperations = new LinkedBlockingQueue<>(options.getMaxPending()); + } if (!options.getThrottlingEnabled()) { this.rateLimiter = @@ -577,6 +581,10 @@ public ApiFuture update( batch.update(documentReference, precondition, fieldPath, value, moreFieldsAndValues)); } + boolean isReady() { + return bufferedOperations.remainingCapacity() > 0; + } + /** * Schedules the provided write operation and runs the user success callback when the write result * is obtained. @@ -617,20 +625,26 @@ private ApiFuture executeWrite( lastOperation, aVoid -> silenceFuture(operation.getFuture()), MoreExecutors.directExecutor()); + } - // Schedule the operation if the BulkWriter has fewer than the maximum number of allowed - // pending operations, or add the operation to the buffer. - if (pendingOpsCount < maxPendingOpCount) { - pendingOpsCount++; + // Schedule the operation if the BulkWriter has fewer than the maximum number of allowed + // pending operations, or add the operation to the buffer. + if (incrementPendingOpsIfLessThanMax()) { + synchronized (lock) { sendOperationLocked(enqueueOperationOnBatchCallback, operation); - } else { - bufferedOperations.add( + } + } else { + try { + bufferedOperations.put( () -> { synchronized (lock) { - pendingOpsCount++; sendOperationLocked(enqueueOperationOnBatchCallback, operation); } }); + } catch (InterruptedException exception) { + operation.onException(new FirestoreException(exception)); + Thread.currentThread().interrupt(); + return ApiFutures.immediateFailedFuture(exception); } } @@ -638,18 +652,22 @@ private ApiFuture executeWrite( ApiFutures.transformAsync( operation.getFuture(), result -> { - pendingOpsCount--; - processBufferedOperations(); + // pendingOpsCount remains the same if a buffered operation is sent. + if (!processNextBufferedOperation()) { + pendingOpsCount.decrementAndGet(); + } return ApiFutures.immediateFuture(result); }, MoreExecutors.directExecutor()); - return ApiFutures.catchingAsync( + return ApiFutures.catching( processedOperationFuture, ApiException.class, e -> { - pendingOpsCount--; - processBufferedOperations(); + // pendingOpsCount remains the same if a buffered operation is sent. + if (!processNextBufferedOperation()) { + pendingOpsCount.decrementAndGet(); + } throw e; }, MoreExecutors.directExecutor()); @@ -659,11 +677,22 @@ private ApiFuture executeWrite( * Manages the pending operation counter and schedules the next BulkWriter operation if we're * under the maximum limit. */ - private void processBufferedOperations() { - if (pendingOpsCount < maxPendingOpCount && bufferedOperations.size() > 0) { - Runnable nextOp = bufferedOperations.remove(0); - nextOp.run(); - } + private boolean processNextBufferedOperation() { + Runnable nextOp = bufferedOperations.poll(); + if (nextOp == null) return false; + nextOp.run(); + return true; + } + + /** + * Atomically increments pendingOpCount if less than `maxPendingOpCount` + * + * @return boolean indicating whether increment occurred. + */ + private boolean incrementPendingOpsIfLessThanMax() { + int previousPendingOpsCount = + pendingOpsCount.getAndAccumulate(0, (v, x) -> v < maxPendingOpCount ? v + 1 : v); + return previousPendingOpsCount < maxPendingOpCount; } /** diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java index 8cfe0fdbf..35ff5f263 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java @@ -57,11 +57,20 @@ public abstract class BulkWriterOptions { @Nullable public abstract ScheduledExecutorService getExecutor(); + /** + * Limit on the total number of mutations in-memory. + * + * @return The maximum number of operations that will be queued. + */ + @Nullable + abstract Integer getMaxPending(); + public static Builder builder() { return new AutoValue_BulkWriterOptions.Builder() .setMaxOpsPerSecond(null) .setInitialOpsPerSecond(null) .setThrottlingEnabled(true) + .setMaxPending(null) .setExecutor(null); } @@ -121,6 +130,13 @@ public Builder setMaxOpsPerSecond(int maxOpsPerSecond) { */ public abstract Builder setExecutor(@Nullable ScheduledExecutorService executor); + /** + * Limit on the total number of mutations in-memory. + * + * @return The maximum number of operations that will be queued. + */ + abstract Builder setMaxPending(@Nullable Integer maxPending); + public abstract BulkWriterOptions autoBuild(); @Nonnull diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java index 0c1c8dab2..5ee667a39 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java @@ -25,7 +25,7 @@ /** A Firestore Service exception. */ public class FirestoreException extends BaseGrpcServiceException { - private Status status; + private final Status status; FirestoreException(String reason, Status status) { this(reason, status, null); @@ -33,20 +33,26 @@ public class FirestoreException extends BaseGrpcServiceException { private FirestoreException(String reason, Status status, @Nullable Throwable cause) { super(reason, cause, status.getCode().value(), false); - this.status = status; } + FirestoreException(InterruptedException cause) { + super("Executing thread was interrupted", cause, Status.ABORTED.getCode().value(), true); + this.status = Status.ABORTED; + } + private FirestoreException(String reason, ApiException exception) { super( reason, exception, exception.getStatusCode().getCode().getHttpStatusCode(), exception.isRetryable()); + this.status = null; } private FirestoreException(IOException exception, boolean retryable) { super(exception, retryable); + this.status = null; } /** diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index 90e29d50f..7639317ff 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -24,6 +24,9 @@ import static com.google.cloud.firestore.LocalFirestoreHelper.update; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.*; @@ -45,6 +48,7 @@ import com.google.protobuf.GeneratedMessageV3; import com.google.rpc.Code; import io.grpc.Status; +import java.lang.Thread.State; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -56,6 +60,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import org.junit.After; import org.junit.Assert; @@ -94,8 +99,9 @@ public class BulkWriterTest { new IllegalStateException("Mock batchWrite failed in test"), GrpcStatusCode.of(Status.Code.RESOURCE_EXHAUSTED), true)); + public static final int VERIFY_TIMEOUT_MS = 200; - @Rule public Timeout timeout = new Timeout(2, TimeUnit.SECONDS); + @Rule public Timeout timeout = new Timeout(2, TimeUnit.HOURS); @Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class); @@ -121,14 +127,19 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) private BulkWriter bulkWriter; private DocumentReference doc1; private DocumentReference doc2; + private DocumentReference doc3; private ScheduledExecutorService timeoutExecutor; public static ApiFuture successResponse(int updateTimeSeconds) { + return ApiFutures.immediateFuture(success(updateTimeSeconds)); + } + + private static BatchWriteResponse success(int updateTimeSeconds) { BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder(); response.addWriteResultsBuilder().getUpdateTimeBuilder().setSeconds(updateTimeSeconds).build(); response.addStatusBuilder().build(); - return ApiFutures.immediateFuture(response.build()); + return response.build(); } public static ApiFuture failedResponse(int code) { @@ -171,6 +182,7 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build()); doc1 = firestoreMock.document("coll/doc1"); doc2 = firestoreMock.document("coll/doc2"); + doc3 = firestoreMock.document("coll/doc2"); } @After @@ -1401,4 +1413,96 @@ public void optionsInitialAndMaxRatesAreProperlySet() throws Exception { assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), Integer.MAX_VALUE); assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), Integer.MAX_VALUE); } + + @Test + public void blocksWriteWhenBufferIsFull() throws Exception { + BulkWriter bulkWriter = + firestoreMock.bulkWriter( + BulkWriterOptions.builder().setMaxPending(1).setExecutor(timeoutExecutor).build()); + + SettableApiFuture response1 = SettableApiFuture.create(); + SettableApiFuture response2 = SettableApiFuture.create(); + SettableApiFuture response3 = SettableApiFuture.create(); + + Mockito.doReturn(response1, response2, response3).when(firestoreMock).sendRequest(any(), any()); + + bulkWriter.setMaxBatchSize(1); + bulkWriter.setMaxPendingOpCount(1); + + // First call sent as part of batch immediately. + ApiFuture result1 = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); + + // The buffer is not full, so bulk writer is ready to receive more writes. + assertTrue(bulkWriter.isReady()); + + // Second call is buffered. + ApiFuture result2 = bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP); + + // The buffer is full, so bulk writer is not ready to write. + assertFalse(bulkWriter.isReady()); + + // Third call is blocked. + AtomicReference> threadResult3 = new AtomicReference<>(); + Thread thread = + new Thread( + () -> { + try { + threadResult3.set(bulkWriter.set(doc3, LocalFirestoreHelper.SINGLE_FIELD_MAP)); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + + // Only one batch should have been sent, since everything else is buffered or blocked. + verify(firestoreMock, timeout(VERIFY_TIMEOUT_MS).times(1)).sendRequest(any(), any()); + + // Expect thread to be waiting because blocked by adding write to buffer. + assertEquals(State.WAITING, threadStateAfterRunning(thread)); + + // Thread should not have received `ApiFuture` yet. + assertNull(threadResult3.get()); + + // First call should not have returned a result yet. + assertFalse(result1.isDone()); + assertFalse(result2.isDone()); + + // Complete first call, and verify calls 2 and 3 make progress. + response1.set(success(1)); + assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(1, 0)), result1.get()); + + // Thread should be unblocked, and therefore run to termination. + assertEquals(State.TERMINATED, threadStateAfterRunning(thread)); + ApiFuture result3 = threadResult3.get(); + assertNotNull(result3); + + // Now two batches should have been sent. + verify(firestoreMock, timeout(200).times(2)).sendRequest(any(), any()); + assertFalse(result2.isDone()); + assertFalse(result3.isDone()); + + // Buffer should still be full + assertFalse(bulkWriter.isReady()); + + // Complete second call, and verify third call makes progress. + response2.set(success(2)); + assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(2, 0)), result2.get()); + + // Now three batches should have been sent. + verify(firestoreMock, timeout(200).times(3)).sendRequest(any(), any()); + assertFalse(result3.isDone()); + + // Complete thirs call, and verify response. + response3.set(success(3)); + assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(3, 0)), result3.get()); + } + + private static State threadStateAfterRunning(Thread thread) throws InterruptedException { + State state = thread.getState(); + while (state == State.RUNNABLE) { + Thread.sleep(1); + state = thread.getState(); + } + return state; + } } From bea6fc3aa417a7e596595ec9417475ee396b5469 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Tue, 12 Mar 2024 16:14:13 -0400 Subject: [PATCH 2/7] Change BulkWriterOptions according to team discussion --- .../google/cloud/firestore/BulkWriter.java | 44 ++++++++++--------- .../cloud/firestore/BulkWriterOptions.java | 30 +++++++++++-- .../cloud/firestore/BulkWriterTest.java | 41 +++++++++++++---- 3 files changed, 82 insertions(+), 33 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 984a0de47..de301bcac 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -117,11 +117,11 @@ enum OperationType { /** * The default maximum number of pending operations that can be enqueued onto a BulkWriter - * instance. An operation is considered pending if BulkWriter has sent it via RPC and is awaiting - * the result. BulkWriter buffers additional writes after this many pending operations in order to - * avoiding going OOM. + * instance. An operation is considered in-flight if BulkWriter has sent it via RPC and is awaiting + * the result. BulkWriter buffers additional writes after this many in-flight operations in order to + * avoid going OOM. */ - private static final int DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500; + static final int DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS = 500; /** * The default jitter to apply to the exponential backoff used in retries. For example, a factor @@ -168,10 +168,10 @@ enum OperationType { private final RateLimiter rateLimiter; /** - * The number of pending operations enqueued on this BulkWriter instance. An operation is + * The number of in-flight operations enqueued on this BulkWriter instance. An operation is * considered pending if BulkWriter has sent it via RPC and is awaiting the result. */ - private AtomicInteger pendingOpsCount = new AtomicInteger(); + private AtomicInteger inFlightCount = new AtomicInteger(); /** * An array containing buffered BulkWriter operations after the maximum number of pending @@ -180,10 +180,10 @@ enum OperationType { private final BlockingQueue bufferedOperations; /** - * The maximum number of pending operations that can be enqueued onto this BulkWriter instance. - * Once the this number of writes have been enqueued, subsequent writes are buffered. + * The maximum number of concurrent in-flight operations that can be sent by BulkWriter instance. + * Once this number of writes have been sent, subsequent writes are buffered. */ - private int maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT; + private int maxInFlight; /** * The batch that is currently used to schedule operations. Once this batch reaches maximum @@ -234,10 +234,12 @@ enum OperationType { this.successExecutor = MoreExecutors.directExecutor(); this.errorExecutor = MoreExecutors.directExecutor(); this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize); - if (options.getMaxPending() == null) { + this.maxInFlight = options.getMaxInFlightOps(); + if (options.getMaxPendingOps() == null) { this.bufferedOperations = new LinkedBlockingQueue<>(); } else { - this.bufferedOperations = new LinkedBlockingQueue<>(options.getMaxPending()); + int maxBufferedOps = options.getMaxPendingOps() - options.getMaxInFlightOps(); + this.bufferedOperations = new LinkedBlockingQueue<>(maxBufferedOps); } if (!options.getThrottlingEnabled()) { @@ -629,7 +631,7 @@ private ApiFuture executeWrite( // Schedule the operation if the BulkWriter has fewer than the maximum number of allowed // pending operations, or add the operation to the buffer. - if (incrementPendingOpsIfLessThanMax()) { + if (incrementInFlightCountIfLessThanMax()) { synchronized (lock) { sendOperationLocked(enqueueOperationOnBatchCallback, operation); } @@ -654,7 +656,7 @@ private ApiFuture executeWrite( result -> { // pendingOpsCount remains the same if a buffered operation is sent. if (!processNextBufferedOperation()) { - pendingOpsCount.decrementAndGet(); + inFlightCount.decrementAndGet(); } return ApiFutures.immediateFuture(result); }, @@ -666,7 +668,7 @@ private ApiFuture executeWrite( e -> { // pendingOpsCount remains the same if a buffered operation is sent. if (!processNextBufferedOperation()) { - pendingOpsCount.decrementAndGet(); + inFlightCount.decrementAndGet(); } throw e; }, @@ -685,14 +687,14 @@ private boolean processNextBufferedOperation() { } /** - * Atomically increments pendingOpCount if less than `maxPendingOpCount` + * Atomically increments inFlightCount if less than `maxInFlight` * * @return boolean indicating whether increment occurred. */ - private boolean incrementPendingOpsIfLessThanMax() { - int previousPendingOpsCount = - pendingOpsCount.getAndAccumulate(0, (v, x) -> v < maxPendingOpCount ? v + 1 : v); - return previousPendingOpsCount < maxPendingOpCount; + private boolean incrementInFlightCountIfLessThanMax() { + int previousInFlightCount = + inFlightCount.getAndAccumulate(0, (v, x) -> v < maxInFlight ? v + 1 : v); + return previousInFlightCount < maxInFlight; } /** @@ -974,8 +976,8 @@ int getBufferedOperationsCount() { } @VisibleForTesting - void setMaxPendingOpCount(int newMax) { - maxPendingOpCount = newMax; + void setMaxInFlight(int newMax) { + maxInFlight = newMax; } /** diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java index 35ff5f263..ce68aa0c5 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java @@ -16,6 +16,8 @@ package com.google.cloud.firestore; +import static com.google.cloud.firestore.BulkWriter.DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS; + import com.google.auto.value.AutoValue; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nonnull; @@ -63,14 +65,17 @@ public abstract class BulkWriterOptions { * @return The maximum number of operations that will be queued. */ @Nullable - abstract Integer getMaxPending(); + public abstract Integer getMaxPendingOps(); + + abstract int getMaxInFlightOps(); public static Builder builder() { return new AutoValue_BulkWriterOptions.Builder() .setMaxOpsPerSecond(null) .setInitialOpsPerSecond(null) .setThrottlingEnabled(true) - .setMaxPending(null) + .setMaxInFlightOps(DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS) + .setMaxPendingOps(null) .setExecutor(null); } @@ -135,7 +140,13 @@ public Builder setMaxOpsPerSecond(int maxOpsPerSecond) { * * @return The maximum number of operations that will be queued. */ - abstract Builder setMaxPending(@Nullable Integer maxPending); + public Builder setMaxPendingOps(int maxPending) { + return setMaxPendingOps(Integer.valueOf(maxPending)); + } + + abstract Builder setMaxPendingOps(@Nullable Integer maxPending); + + abstract Builder setMaxInFlightOps(int maxInFlight); public abstract BulkWriterOptions autoBuild(); @@ -144,6 +155,8 @@ public BulkWriterOptions build() { BulkWriterOptions options = autoBuild(); Double initialRate = options.getInitialOpsPerSecond(); Double maxRate = options.getMaxOpsPerSecond(); + int maxInFlightOps = options.getMaxInFlightOps(); + Integer maxPendingOps = options.getMaxPendingOps(); if (initialRate != null && initialRate < 1) { throw FirestoreException.forInvalidArgument( @@ -166,6 +179,17 @@ public BulkWriterOptions build() { throw FirestoreException.forInvalidArgument( "Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false."); } + + if (maxInFlightOps < 1) { + throw FirestoreException.forInvalidArgument( + "Value for argument 'maxInFlightOps' must be greater than 1, but was :" + maxInFlightOps); + } + + if (maxPendingOps != null && maxInFlightOps > maxPendingOps) { + throw FirestoreException.forInvalidArgument( + "Value for argument 'maxPendingOps' must be greater than `maxInFlightOps`, but was :" + maxPendingOps); + } + return options; } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index 7639317ff..b910b6529 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -128,6 +128,7 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) private DocumentReference doc1; private DocumentReference doc2; private DocumentReference doc3; + private DocumentReference doc4; private ScheduledExecutorService timeoutExecutor; @@ -182,7 +183,8 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build()); doc1 = firestoreMock.document("coll/doc1"); doc2 = firestoreMock.document("coll/doc2"); - doc3 = firestoreMock.document("coll/doc2"); + doc3 = firestoreMock.document("coll/doc3"); + doc4 = firestoreMock.document("coll/doc4"); } @After @@ -471,7 +473,7 @@ public void buffersSubsequentOpsAfterReachingMaxPendingOpCount() throws Exceptio }; responseStubber.initializeStub(batchWriteCapture, firestoreMock); - bulkWriter.setMaxPendingOpCount(3); + bulkWriter.setMaxInFlight(3); bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP); bulkWriter.set(firestoreMock.document("coll/doc3"), LocalFirestoreHelper.SINGLE_FIELD_MAP); @@ -1418,16 +1420,22 @@ public void optionsInitialAndMaxRatesAreProperlySet() throws Exception { public void blocksWriteWhenBufferIsFull() throws Exception { BulkWriter bulkWriter = firestoreMock.bulkWriter( - BulkWriterOptions.builder().setMaxPending(1).setExecutor(timeoutExecutor).build()); + BulkWriterOptions.builder() + .setMaxInFlightOps(1) + .setMaxPendingOps(2) + .setExecutor(timeoutExecutor) + .build()); SettableApiFuture response1 = SettableApiFuture.create(); SettableApiFuture response2 = SettableApiFuture.create(); SettableApiFuture response3 = SettableApiFuture.create(); + SettableApiFuture response4 = SettableApiFuture.create(); - Mockito.doReturn(response1, response2, response3).when(firestoreMock).sendRequest(any(), any()); + Mockito.doReturn(response1, response2, response3, response4) + .when(firestoreMock) + .sendRequest(any(), any()); bulkWriter.setMaxBatchSize(1); - bulkWriter.setMaxPendingOpCount(1); // First call sent as part of batch immediately. ApiFuture result1 = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); @@ -1441,13 +1449,15 @@ public void blocksWriteWhenBufferIsFull() throws Exception { // The buffer is full, so bulk writer is not ready to write. assertFalse(bulkWriter.isReady()); - // Third call is blocked. + // Third call is blocked, fourth hasn't been attempted. AtomicReference> threadResult3 = new AtomicReference<>(); + AtomicReference> threadResult4 = new AtomicReference<>(); Thread thread = new Thread( () -> { try { threadResult3.set(bulkWriter.set(doc3, LocalFirestoreHelper.SINGLE_FIELD_MAP)); + threadResult4.set(bulkWriter.set(doc4, LocalFirestoreHelper.SINGLE_FIELD_MAP)); } catch (Exception e) { e.printStackTrace(); } @@ -1462,6 +1472,7 @@ public void blocksWriteWhenBufferIsFull() throws Exception { // Thread should not have received `ApiFuture` yet. assertNull(threadResult3.get()); + assertNull(threadResult4.get()); // First call should not have returned a result yet. assertFalse(result1.isDone()); @@ -1471,10 +1482,11 @@ public void blocksWriteWhenBufferIsFull() throws Exception { response1.set(success(1)); assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(1, 0)), result1.get()); - // Thread should be unblocked, and therefore run to termination. - assertEquals(State.TERMINATED, threadStateAfterRunning(thread)); + // Thread should be unblocked, run set method for doc3, then blocked again. + assertEquals(State.WAITING, threadStateAfterRunning(thread)); ApiFuture result3 = threadResult3.get(); assertNotNull(result3); + assertNull(threadResult4.get()); // Now two batches should have been sent. verify(firestoreMock, timeout(200).times(2)).sendRequest(any(), any()); @@ -1488,13 +1500,24 @@ public void blocksWriteWhenBufferIsFull() throws Exception { response2.set(success(2)); assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(2, 0)), result2.get()); + // Thread should be unblocked, run set method for doc4, then terminate. + assertEquals(State.TERMINATED, threadStateAfterRunning(thread)); + ApiFuture result4 = threadResult4.get(); + assertNotNull(result4); + // Now three batches should have been sent. verify(firestoreMock, timeout(200).times(3)).sendRequest(any(), any()); assertFalse(result3.isDone()); + assertFalse(result4.isDone()); - // Complete thirs call, and verify response. + // Complete third call, and verify response. response3.set(success(3)); assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(3, 0)), result3.get()); + assertFalse(result4.isDone()); + + // Complete fourth call, and verify response. + response4.set(success(4)); + assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(4, 0)), result4.get()); } private static State threadStateAfterRunning(Thread thread) throws InterruptedException { From 4ee942ce0238be3a1000c52a4c81937c80686d20 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Tue, 12 Mar 2024 16:15:03 -0400 Subject: [PATCH 3/7] Pretty --- .../main/java/com/google/cloud/firestore/BulkWriter.java | 6 +++--- .../java/com/google/cloud/firestore/BulkWriterOptions.java | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index de301bcac..14dda5f7c 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -117,9 +117,9 @@ enum OperationType { /** * The default maximum number of pending operations that can be enqueued onto a BulkWriter - * instance. An operation is considered in-flight if BulkWriter has sent it via RPC and is awaiting - * the result. BulkWriter buffers additional writes after this many in-flight operations in order to - * avoid going OOM. + * instance. An operation is considered in-flight if BulkWriter has sent it via RPC and is + * awaiting the result. BulkWriter buffers additional writes after this many in-flight operations + * in order to avoid going OOM. */ static final int DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS = 500; diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java index ce68aa0c5..58e86d647 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java @@ -182,12 +182,14 @@ public BulkWriterOptions build() { if (maxInFlightOps < 1) { throw FirestoreException.forInvalidArgument( - "Value for argument 'maxInFlightOps' must be greater than 1, but was :" + maxInFlightOps); + "Value for argument 'maxInFlightOps' must be greater than 1, but was :" + + maxInFlightOps); } if (maxPendingOps != null && maxInFlightOps > maxPendingOps) { throw FirestoreException.forInvalidArgument( - "Value for argument 'maxPendingOps' must be greater than `maxInFlightOps`, but was :" + maxPendingOps); + "Value for argument 'maxPendingOps' must be greater than `maxInFlightOps`, but was :" + + maxPendingOps); } return options; From 5ee8f5a25a71b4a9abc2b5204284d0525d6fb6f6 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Tue, 12 Mar 2024 16:20:21 -0400 Subject: [PATCH 4/7] API Change --- google-cloud-firestore/clirr-ignored-differences.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml index 7a4c83689..226ebd46e 100644 --- a/google-cloud-firestore/clirr-ignored-differences.xml +++ b/google-cloud-firestore/clirr-ignored-differences.xml @@ -209,6 +209,11 @@ com/google/cloud/firestore/Firestore com.google.cloud.firestore.BulkWriter bulkWriter(*) + + 7013 + com/google/cloud/firestore/BulkWriterOptions + java.lang.Integer getMaxPendingOps() + 7006 com/google/cloud/firestore/UpdateBuilder From 98c2b97813e5c35e605781579f3729d030ac2467 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Tue, 12 Mar 2024 16:26:07 -0400 Subject: [PATCH 5/7] Revert --- .../test/java/com/google/cloud/firestore/BulkWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index b910b6529..c5f3b89de 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -101,7 +101,7 @@ public class BulkWriterTest { true)); public static final int VERIFY_TIMEOUT_MS = 200; - @Rule public Timeout timeout = new Timeout(2, TimeUnit.HOURS); + @Rule public Timeout timeout = new Timeout(2, TimeUnit.SECONDS); @Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class); From b089e4e07bac7128139ccd0bb9d09f457a4f31b0 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Tue, 12 Mar 2024 16:28:27 -0400 Subject: [PATCH 6/7] Fix --- .../test/java/com/google/cloud/firestore/BulkWriterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index c5f3b89de..6621138f8 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -1489,7 +1489,7 @@ public void blocksWriteWhenBufferIsFull() throws Exception { assertNull(threadResult4.get()); // Now two batches should have been sent. - verify(firestoreMock, timeout(200).times(2)).sendRequest(any(), any()); + verify(firestoreMock, timeout(VERIFY_TIMEOUT_MS).times(2)).sendRequest(any(), any()); assertFalse(result2.isDone()); assertFalse(result3.isDone()); @@ -1506,7 +1506,7 @@ public void blocksWriteWhenBufferIsFull() throws Exception { assertNotNull(result4); // Now three batches should have been sent. - verify(firestoreMock, timeout(200).times(3)).sendRequest(any(), any()); + verify(firestoreMock, timeout(VERIFY_TIMEOUT_MS).times(3)).sendRequest(any(), any()); assertFalse(result3.isDone()); assertFalse(result4.isDone()); From b8ca7697e993d8aaa2329d4294c8a4e594665eb2 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Wed, 13 Mar 2024 10:15:04 -0400 Subject: [PATCH 7/7] Fix --- .../main/java/com/google/cloud/firestore/BulkWriter.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 14dda5f7c..fd0feafa2 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -648,6 +648,15 @@ private ApiFuture executeWrite( Thread.currentThread().interrupt(); return ApiFutures.immediateFailedFuture(exception); } + + // If another operation completed during buffering, then we can process the next buffered + // operation now. This overcomes the small chance that an in-flight operation completes + // before another operation has been added to buffer. + if (incrementInFlightCountIfLessThanMax()) { + if (!processNextBufferedOperation()) { + inFlightCount.decrementAndGet(); + } + } } ApiFuture processedOperationFuture =