Skip to content

Commit

Permalink
feat: tracing for aggregate queries, bulkwriter, partition queries, a…
Browse files Browse the repository at this point in the history
…nd listDocuments.
  • Loading branch information
ehsannas committed Feb 27, 2024
1 parent 77107e2 commit ee18489
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 72 deletions.
24 changes: 24 additions & 0 deletions google-cloud-firestore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,36 @@
<scope>test</scope>
</dependency>
<!-- OpenTelemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>1.32.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.29.0-alpha</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>1.29.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.29.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-common</artifactId>
<version>1.29.0</version>
<scope>test</scope>
</dependency>
<!-- END OpenTelemetry -->
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.firestore;

import static com.google.cloud.firestore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.SettableApiFuture;
Expand All @@ -24,7 +26,10 @@
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1.RunAggregationQueryRequest;
import com.google.firestore.v1.RunAggregationQueryResponse;
import com.google.firestore.v1.RunQueryRequest;
Expand All @@ -34,6 +39,7 @@
import com.google.firestore.v1.Value;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,6 +65,11 @@ public class AggregateQuery {
this.aliasMap = new HashMap<>();
}

@Nonnull
private TraceUtil getTraceUtil() {
return query.getFirestore().getOptions().getTraceUtil();
}

/** Returns the query whose aggregations will be calculated by this object. */
@Nonnull
public Query getQuery() {
Expand All @@ -77,17 +88,30 @@ public ApiFuture<AggregateQuerySnapshot> get() {

@Nonnull
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer);
return responseDeliverer.getFuture();
TraceUtil.Span span =
getTraceUtil()
.startSpan(
transactionId == null
? TraceUtil.SPAN_NAME_AGGREGATION_QUERY_GET
: TraceUtil.SPAN_NAME_TRANSACTION_GET_AGGREGATION_QUERY);
try (Scope ignored = span.makeCurrent()) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer, /* attempt= */ 0);
ApiFuture<AggregateQuerySnapshot> result = responseDeliverer.getFuture();
span.endAtFuture(result);
return result;
} catch (Exception error) {
span.end(error);
throw error;
}
}

private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
private void runQuery(AggregateQueryResponseDeliverer responseDeliverer, int attempt) {
RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId());
AggregateQueryResponseObserver responseObserver =
new AggregateQueryResponseObserver(responseDeliverer);
new AggregateQueryResponseObserver(responseDeliverer, attempt);
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
query.rpcContext.getClient().runAggregationQueryCallable();
query.rpcContext.streamRequest(request, responseObserver, callable);
Expand Down Expand Up @@ -138,18 +162,36 @@ private final class AggregateQueryResponseObserver

private final AggregateQueryResponseDeliverer responseDeliverer;
private StreamController streamController;
private int attempt;

AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer) {
AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer, int attempt) {
this.responseDeliverer = responseDeliverer;
this.attempt = attempt;
}

Map<String, Object> getAttemptAttributes() {
ImmutableMap.Builder<String, Object> builder =
new ImmutableMap.Builder<String, Object>().put("isRetryAttempt", attempt > 0);
if (attempt > 0) {
builder.put("attemptNumber", attempt);
}
return builder.build();
}

@Override
public void onStart(StreamController streamController) {
getTraceUtil()
.currentSpan()
.addEvent(SPAN_NAME_RUN_AGGREGATION_QUERY + " Stream started.", getAttemptAttributes());
this.streamController = streamController;
}

@Override
public void onResponse(RunAggregationQueryResponse response) {
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + " Response Received.", getAttemptAttributes());
// Close the stream to avoid it dangling, since we're not expecting any more responses.
streamController.cancel();

Expand All @@ -165,8 +207,19 @@ public void onResponse(RunAggregationQueryResponse response) {
@Override
public void onError(Throwable throwable) {
if (shouldRetry(throwable)) {
runQuery(responseDeliverer);
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + ": Retryable Error",
Collections.singletonMap("error.message", throwable.getMessage()));

runQuery(responseDeliverer, attempt + 1);
} else {
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + ": Error",
Collections.singletonMap("error.message", throwable.getMessage()));
responseDeliverer.deliverError(throwable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Context;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -221,6 +224,8 @@ enum OperationType {
@GuardedBy("lock")
private Executor errorExecutor;

Context traceContext;

/**
* Used to track when writes are enqueued. The user handler executors cannot be changed after a
* write has been enqueued.
Expand All @@ -237,6 +242,7 @@ enum OperationType {
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
this.traceContext = firestore.getOptions().getTraceUtil().currentContext();

if (!options.getThrottlingEnabled()) {
this.rateLimiter =
Expand Down Expand Up @@ -899,21 +905,32 @@ private void scheduleCurrentBatchLocked(final boolean flush) {

/** Sends the provided batch once the rate limiter does not require any delay. */
private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
// Send the batch if it is does not require any delay, or schedule another attempt after the
// Send the batch if it does not require any delay, or schedule another attempt after the
// appropriate timeout.
boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize());
if (underRateLimit) {
batch
.bulkCommit()
.addListener(
() -> {
if (flush) {
synchronized (lock) {
scheduleCurrentBatchLocked(/* flush= */ true);
}
TraceUtil.Span span =
firestore
.getOptions()
.getTraceUtil()
.startSpan(TraceUtil.SPAN_NAME_BULK_WRITER_COMMIT, traceContext)
.setAttribute("numDocuments", batch.getWrites().size());
try (Scope ignored = span.makeCurrent()) {
ApiFuture<Void> result = batch.bulkCommit();
result.addListener(
() -> {
if (flush) {
synchronized (lock) {
scheduleCurrentBatchLocked(/* flush= */ true);
}
},
bulkWriterExecutor);
}
},
bulkWriterExecutor);
span.endAtFuture(result);
} catch (Exception error) {
span.end(error);
throw error;
}
} else {
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -102,21 +104,32 @@ public ApiFuture<List<QueryPartition>> getPartitions(long desiredPartitionCount)
} else {
PartitionQueryRequest request = buildRequest(desiredPartitionCount);

try {
return ApiFutures.transform(
rpcContext.sendRequest(request, rpcContext.getClient().partitionQueryPagedCallable()),
response -> {
final ImmutableList.Builder<QueryPartition> partitions = ImmutableList.builder();
consumePartitions(
response,
queryPartition -> {
partitions.add(queryPartition);
return null;
});
return partitions.build();
},
MoreExecutors.directExecutor());
TraceUtil.Span span =
rpcContext
.getFirestore()
.getOptions()
.getTraceUtil()
.startSpan(TraceUtil.SPAN_NAME_PARTITION_QUERY);
try (Scope ignored = span.makeCurrent()) {
ApiFuture<List<QueryPartition>> result =
ApiFutures.transform(
rpcContext.sendRequest(
request, rpcContext.getClient().partitionQueryPagedCallable()),
response -> {
final ImmutableList.Builder<QueryPartition> partitions = ImmutableList.builder();
consumePartitions(
response,
queryPartition -> {
partitions.add(queryPartition);
return null;
});
return partitions.build();
},
MoreExecutors.directExecutor());
span.endAtFuture(result);
return result;
} catch (ApiException exception) {
span.end(exception);
throw FirestoreException.forApiException(exception);
}
}
Expand Down
Loading

0 comments on commit ee18489

Please sign in to comment.