diff --git a/google-cloud-firestore/pom.xml b/google-cloud-firestore/pom.xml index fdc8368ab..c8397d959 100644 --- a/google-cloud-firestore/pom.xml +++ b/google-cloud-firestore/pom.xml @@ -182,12 +182,36 @@ test + + io.opentelemetry + opentelemetry-sdk-testing + 1.32.0 + test + + + io.opentelemetry + opentelemetry-semconv + 1.29.0-alpha + test + + + io.opentelemetry + opentelemetry-sdk-trace + 1.29.0 + test + io.opentelemetry opentelemetry-sdk 1.29.0 test + + io.opentelemetry + opentelemetry-sdk-common + 1.29.0 + test + diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java index 1613b74dd..c14d5d4c5 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java @@ -16,6 +16,8 @@ package com.google.cloud.firestore; +import static com.google.cloud.firestore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; + import com.google.api.core.ApiFuture; import com.google.api.core.InternalExtensionOnly; import com.google.api.core.SettableApiFuture; @@ -24,7 +26,10 @@ import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StreamController; import com.google.cloud.Timestamp; +import com.google.cloud.firestore.telemetry.TraceUtil; +import com.google.cloud.firestore.telemetry.TraceUtil.Scope; import com.google.cloud.firestore.v1.FirestoreSettings; +import com.google.common.collect.ImmutableMap; import com.google.firestore.v1.RunAggregationQueryRequest; import com.google.firestore.v1.RunAggregationQueryResponse; import com.google.firestore.v1.RunQueryRequest; @@ -34,6 +39,7 @@ import com.google.firestore.v1.Value; import com.google.protobuf.ByteString; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -59,6 +65,11 @@ public class AggregateQuery { this.aliasMap = new HashMap<>(); } + @Nonnull + private TraceUtil getTraceUtil() { + return query.getFirestore().getOptions().getTraceUtil(); + } + /** Returns the query whose aggregations will be calculated by this object. */ @Nonnull public Query getQuery() { @@ -77,17 +88,30 @@ public ApiFuture get() { @Nonnull ApiFuture get(@Nullable final ByteString transactionId) { - AggregateQueryResponseDeliverer responseDeliverer = - new AggregateQueryResponseDeliverer( - transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime()); - runQuery(responseDeliverer); - return responseDeliverer.getFuture(); + TraceUtil.Span span = + getTraceUtil() + .startSpan( + transactionId == null + ? TraceUtil.SPAN_NAME_AGGREGATION_QUERY_GET + : TraceUtil.SPAN_NAME_TRANSACTION_GET_AGGREGATION_QUERY); + try (Scope ignored = span.makeCurrent()) { + AggregateQueryResponseDeliverer responseDeliverer = + new AggregateQueryResponseDeliverer( + transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime()); + runQuery(responseDeliverer, /* attempt= */ 0); + ApiFuture result = responseDeliverer.getFuture(); + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } - private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) { + private void runQuery(AggregateQueryResponseDeliverer responseDeliverer, int attempt) { RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId()); AggregateQueryResponseObserver responseObserver = - new AggregateQueryResponseObserver(responseDeliverer); + new AggregateQueryResponseObserver(responseDeliverer, attempt); ServerStreamingCallable callable = query.rpcContext.getClient().runAggregationQueryCallable(); query.rpcContext.streamRequest(request, responseObserver, callable); @@ -138,18 +162,36 @@ private final class AggregateQueryResponseObserver private final AggregateQueryResponseDeliverer responseDeliverer; private StreamController streamController; + private int attempt; - AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer) { + AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer, int attempt) { this.responseDeliverer = responseDeliverer; + this.attempt = attempt; + } + + Map getAttemptAttributes() { + ImmutableMap.Builder builder = + new ImmutableMap.Builder().put("isRetryAttempt", attempt > 0); + if (attempt > 0) { + builder.put("attemptNumber", attempt); + } + return builder.build(); } @Override public void onStart(StreamController streamController) { + getTraceUtil() + .currentSpan() + .addEvent(SPAN_NAME_RUN_AGGREGATION_QUERY + " Stream started.", getAttemptAttributes()); this.streamController = streamController; } @Override public void onResponse(RunAggregationQueryResponse response) { + getTraceUtil() + .currentSpan() + .addEvent( + SPAN_NAME_RUN_AGGREGATION_QUERY + " Response Received.", getAttemptAttributes()); // Close the stream to avoid it dangling, since we're not expecting any more responses. streamController.cancel(); @@ -165,8 +207,19 @@ public void onResponse(RunAggregationQueryResponse response) { @Override public void onError(Throwable throwable) { if (shouldRetry(throwable)) { - runQuery(responseDeliverer); + getTraceUtil() + .currentSpan() + .addEvent( + SPAN_NAME_RUN_AGGREGATION_QUERY + ": Retryable Error", + Collections.singletonMap("error.message", throwable.getMessage())); + + runQuery(responseDeliverer, attempt + 1); } else { + getTraceUtil() + .currentSpan() + .addEvent( + SPAN_NAME_RUN_AGGREGATION_QUERY + ": Error", + Collections.singletonMap("error.message", throwable.getMessage())); responseDeliverer.deliverError(throwable); } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 560a2d63a..7e6f5bac8 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -26,6 +26,9 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.firestore.telemetry.TraceUtil; +import com.google.cloud.firestore.telemetry.TraceUtil.Context; +import com.google.cloud.firestore.telemetry.TraceUtil.Scope; import com.google.cloud.firestore.v1.FirestoreSettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -221,6 +224,8 @@ enum OperationType { @GuardedBy("lock") private Executor errorExecutor; + Context traceContext; + /** * Used to track when writes are enqueued. The user handler executors cannot be changed after a * write has been enqueued. @@ -237,6 +242,7 @@ enum OperationType { this.successExecutor = MoreExecutors.directExecutor(); this.errorExecutor = MoreExecutors.directExecutor(); this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize); + this.traceContext = firestore.getOptions().getTraceUtil().currentContext(); if (!options.getThrottlingEnabled()) { this.rateLimiter = @@ -899,21 +905,32 @@ private void scheduleCurrentBatchLocked(final boolean flush) { /** Sends the provided batch once the rate limiter does not require any delay. */ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) { - // Send the batch if it is does not require any delay, or schedule another attempt after the + // Send the batch if it does not require any delay, or schedule another attempt after the // appropriate timeout. boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize()); if (underRateLimit) { - batch - .bulkCommit() - .addListener( - () -> { - if (flush) { - synchronized (lock) { - scheduleCurrentBatchLocked(/* flush= */ true); - } + TraceUtil.Span span = + firestore + .getOptions() + .getTraceUtil() + .startSpan(TraceUtil.SPAN_NAME_BULK_WRITER_COMMIT, traceContext) + .setAttribute("numDocuments", batch.getWrites().size()); + try (Scope ignored = span.makeCurrent()) { + ApiFuture result = batch.bulkCommit(); + result.addListener( + () -> { + if (flush) { + synchronized (lock) { + scheduleCurrentBatchLocked(/* flush= */ true); } - }, - bulkWriterExecutor); + } + }, + bulkWriterExecutor); + span.endAtFuture(result); + } catch (Exception error) { + span.end(error); + throw error; + } } else { long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize()); logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000)); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java index 1c196363c..f5f58e289 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java @@ -21,6 +21,8 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.cloud.firestore.telemetry.TraceUtil; +import com.google.cloud.firestore.telemetry.TraceUtil.Scope; import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -102,21 +104,32 @@ public ApiFuture> getPartitions(long desiredPartitionCount) } else { PartitionQueryRequest request = buildRequest(desiredPartitionCount); - try { - return ApiFutures.transform( - rpcContext.sendRequest(request, rpcContext.getClient().partitionQueryPagedCallable()), - response -> { - final ImmutableList.Builder partitions = ImmutableList.builder(); - consumePartitions( - response, - queryPartition -> { - partitions.add(queryPartition); - return null; - }); - return partitions.build(); - }, - MoreExecutors.directExecutor()); + TraceUtil.Span span = + rpcContext + .getFirestore() + .getOptions() + .getTraceUtil() + .startSpan(TraceUtil.SPAN_NAME_PARTITION_QUERY); + try (Scope ignored = span.makeCurrent()) { + ApiFuture> result = + ApiFutures.transform( + rpcContext.sendRequest( + request, rpcContext.getClient().partitionQueryPagedCallable()), + response -> { + final ImmutableList.Builder partitions = ImmutableList.builder(); + consumePartitions( + response, + queryPartition -> { + partitions.add(queryPartition); + return null; + }); + return partitions.build(); + }, + MoreExecutors.directExecutor()); + span.endAtFuture(result); + return result; } catch (ApiException exception) { + span.end(exception); throw FirestoreException.forApiException(exception); } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionReference.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionReference.java index d262a642a..e81a99a1b 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionReference.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionReference.java @@ -23,6 +23,8 @@ import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.firestore.spi.v1.FirestoreRpc; +import com.google.cloud.firestore.telemetry.TraceUtil; +import com.google.cloud.firestore.telemetry.TraceUtil.Scope; import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; @@ -127,48 +129,56 @@ public DocumentReference document(@Nonnull String childPath) { */ @Nonnull public Iterable listDocuments() { - ListDocumentsRequest.Builder request = ListDocumentsRequest.newBuilder(); - request.setParent(options.getParentPath().toString()); - request.setCollectionId(options.getCollectionId()); - request.setMask(DocumentMask.getDefaultInstance()); - request.setShowMissing(true); - - final ListDocumentsPagedResponse response; - try { + TraceUtil.Span span = + rpcContext + .getFirestore() + .getOptions() + .getTraceUtil() + .startSpan(TraceUtil.SPAN_NAME_COL_REF_LIST_DOCUMENTS); + try (Scope ignored = span.makeCurrent()) { + ListDocumentsRequest.Builder request = ListDocumentsRequest.newBuilder(); + request.setParent(options.getParentPath().toString()); + request.setCollectionId(options.getCollectionId()); + request.setMask(DocumentMask.getDefaultInstance()); + request.setShowMissing(true); + final ListDocumentsPagedResponse response; FirestoreRpc client = rpcContext.getClient(); UnaryCallable callable = client.listDocumentsPagedCallable(); ListDocumentsRequest build = request.build(); ApiFuture future = rpcContext.sendRequest(build, callable); response = ApiExceptions.callAndTranslateApiException(future); + Iterable result = + new Iterable() { + @Override + @Nonnull + public Iterator iterator() { + final Iterator iterator = response.iterateAll().iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public DocumentReference next() { + ResourcePath path = ResourcePath.create(iterator.next().getName()); + return document(path.getId()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + }; + } + }; + span.end(); + return result; } catch (ApiException exception) { + span.end(exception); throw FirestoreException.forApiException(exception); } - - return new Iterable() { - @Override - @Nonnull - public Iterator iterator() { - final Iterator iterator = response.iterateAll().iterator(); - return new Iterator() { - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public DocumentReference next() { - ResourcePath path = ResourcePath.create(iterator.next().getName()); - return document(path.getId()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - }; - } - }; } /** @@ -182,11 +192,24 @@ public void remove() { */ @Nonnull public ApiFuture add(@Nonnull final Map fields) { - final DocumentReference documentReference = document(); - ApiFuture createFuture = documentReference.create(fields); - - return ApiFutures.transform( - createFuture, writeResult -> documentReference, MoreExecutors.directExecutor()); + TraceUtil.Span span = + rpcContext + .getFirestore() + .getOptions() + .getTraceUtil() + .startSpan(TraceUtil.SPAN_NAME_COL_REF_ADD); + try (Scope ignored = span.makeCurrent()) { + final DocumentReference documentReference = document(); + ApiFuture createFuture = documentReference.create(fields); + ApiFuture result = + ApiFutures.transform( + createFuture, writeResult -> documentReference, MoreExecutors.directExecutor()); + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } /** diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITTracingTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITTracingTest.java new file mode 100644 index 000000000..104b6c374 --- /dev/null +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITTracingTest.java @@ -0,0 +1,304 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore.it; + +import static com.google.cloud.firestore.telemetry.TraceUtil.*; +import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; +import static org.junit.Assert.*; + +import com.google.cloud.firestore.*; +import com.google.common.base.Preconditions; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ITTracingTest { + private static final Logger logger = + Logger.getLogger(com.google.cloud.firestore.it.ITBaseTest.class.getName()); + + private static final String SERVICE = "google.firestore.v1.Firestore/"; + private static final String LIST_DOCUMENTS_RPC_NAME = "ListDocuments"; + private static final String BATCH_WRITE_RPC_NAME = "BatchWrite"; + + // We use an InMemorySpanExporter for testing which keeps all generated trace spans + // in memory so that we can check their correctness. + protected static InMemorySpanExporter inMemorySpanExporter = InMemorySpanExporter.create(); + + protected Firestore firestore; + + HashMap spanNameToSpanId = new HashMap<>();; + HashMap spanIdToParentSpanId = new HashMap<>();; + HashMap spanNameToSpanData = new HashMap<>();; + + @Rule public TestName testName = new TestName(); + + @BeforeClass + public static void beforeAll() { + // Create Firestore with an in-memory span exporter. + GlobalOpenTelemetry.resetForTest(); + Resource resource = + Resource.getDefault().merge(Resource.builder().put(SERVICE_NAME, "Sparky").build()); + SpanProcessor inMemorySpanProcessor = SimpleSpanProcessor.create(inMemorySpanExporter); + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(inMemorySpanProcessor) + .setSampler(Sampler.alwaysOn()) + .build()) + .buildAndRegisterGlobal(); + } + + @Before + public void before() { + FirestoreOptions.Builder optionsBuilder = + FirestoreOptions.newBuilder() + .setOpenTelemetryOptions( + FirestoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()); + String namedDb = System.getProperty("FIRESTORE_NAMED_DATABASE"); + if (namedDb != null) { + logger.log(Level.INFO, "Integration test using named database " + namedDb); + optionsBuilder = optionsBuilder.setDatabaseId(namedDb); + } else { + logger.log(Level.INFO, "Integration test using default database."); + } + firestore = optionsBuilder.build().getService(); + + // Clean up existing maps. + spanNameToSpanId.clear(); + spanIdToParentSpanId.clear(); + spanNameToSpanData.clear(); + } + + @After + public void after() throws Exception { + Preconditions.checkNotNull( + firestore, + "Error instantiating Firestore. Check that the service account credentials were properly set."); + firestore.shutdown(); + inMemorySpanExporter.reset(); + } + + void waitForTracesToComplete() throws Exception { + // We need to call `firestore.close()` because that will also close the + // gRPC channel and hence force the gRPC instrumentation library to flush + // its spans. + firestore.close(); + } + + // Prepares all the spans in memory for inspection. + List prepareSpans() throws Exception { + waitForTracesToComplete(); + List spans = inMemorySpanExporter.getFinishedSpanItems(); + buildSpanMaps(spans); + printSpans(); + return spans; + } + + void buildSpanMaps(List spans) { + for (SpanData spanData : spans) { + spanNameToSpanData.put(spanData.getName(), spanData); + spanNameToSpanId.put(spanData.getName(), spanData.getSpanId()); + spanIdToParentSpanId.put(spanData.getSpanId(), spanData.getParentSpanId()); + } + } + + // Returns the SpanData object for the span with the given name. + // Returns null if no span with the given name exists. + @Nullable + SpanData getSpanByName(String spanName) { + return spanNameToSpanData.get(spanName); + } + + // Returns the SpanData object for the gRPC span with the given RPC name. + // Returns null if no such span exists. + @Nullable + SpanData getGrpcSpanByName(String rpcName) { + return getSpanByName(SERVICE + rpcName); + } + + String grpcSpanName(String rpcName) { + return SERVICE + rpcName; + } + + void assertSameTrace(SpanData... spans) { + if (spans.length > 1) { + String traceId = spans[0].getTraceId(); + for (SpanData spanData : spans) { + assertEquals(traceId, spanData.getTraceId()); + } + } + } + + // Helper to see the spans in standard output while developing tests + void printSpans() { + for (SpanData spanData : spanNameToSpanData.values()) { + System.out.printf( + "SPAN ID:%s, ParentID:%s, KIND:%s, TRACE ID:%s, NAME:%s, ATTRIBUTES:%s, EVENTS:%s\n", + spanData.getSpanId(), + spanData.getParentSpanId(), + spanData.getKind(), + spanData.getTraceId(), + spanData.getName(), + spanData.getAttributes().toString(), + spanData.getEvents().toString()); + } + } + + // Asserts that the span hierarchy exists for the given span names. The hierarchy starts with the + // root span, followed + // by the child span, grandchild span, and so on. It also asserts that all the given spans belong + // to the same trace, + // and that Firestore-generated spans contain the expected Firestore attributes. + void assertSpanHierarchy(String... spanNamesHierarchy) { + List spanNames = Arrays.asList(spanNamesHierarchy); + + for (int i = 0; i + 1 < spanNames.size(); ++i) { + String parentSpanName = spanNames.get(i); + String childSpanName = spanNames.get(i + 1); + SpanData parentSpan = getSpanByName(parentSpanName); + SpanData childSpan = getSpanByName(childSpanName); + assertNotNull(parentSpan); + assertNotNull(childSpan); + assertEquals(childSpan.getParentSpanId(), parentSpan.getSpanId()); + assertSameTrace(childSpan, parentSpan); + // gRPC spans do not have Firestore attributes. + if (!parentSpanName.startsWith(SERVICE)) { + assertHasExpectedAttributes(parentSpan); + } + if (!childSpanName.startsWith(SERVICE)) { + assertHasExpectedAttributes(childSpan); + } + } + } + + void assertHasExpectedAttributes(SpanData spanData, String... additionalExpectedAttributes) { + // All Firestore-generated spans have the settings attributes. + List expectedAttributes = + Arrays.asList( + "gcp.firestore.memoryUtilization", + "gcp.firestore.settings.host", + "gcp.firestore.settings.databaseId", + "gcp.firestore.settings.channel.needsCredentials", + "gcp.firestore.settings.channel.needsEndpoint", + "gcp.firestore.settings.channel.needsHeaders", + "gcp.firestore.settings.channel.shouldAutoClose", + "gcp.firestore.settings.channel.transportName", + "gcp.firestore.settings.retrySettings.maxRpcTimeout", + "gcp.firestore.settings.retrySettings.retryDelayMultiplier", + "gcp.firestore.settings.retrySettings.initialRetryDelay", + "gcp.firestore.settings.credentials.authenticationType", + "gcp.firestore.settings.retrySettings.maxAttempts", + "gcp.firestore.settings.retrySettings.maxRetryDelay", + "gcp.firestore.settings.retrySettings.rpcTimeoutMultiplier", + "gcp.firestore.settings.retrySettings.totalTimeout", + "gcp.firestore.settings.retrySettings.initialRpcTimeout"); + + expectedAttributes.addAll(Arrays.asList(additionalExpectedAttributes)); + + Attributes spanAttributes = spanData.getAttributes(); + for (String expectedAttribute : expectedAttributes) { + assertNotNull(spanAttributes.get(AttributeKey.stringKey(expectedAttribute))); + } + } + + @Test + public void aggregateQueryGet() throws Exception { + firestore.collection("col").count().get().get(); + waitForTracesToComplete(); + List spans = inMemorySpanExporter.getFinishedSpanItems(); + buildSpanMaps(spans); + printSpans(); + assertEquals(2, spans.size()); + SpanData getSpan = getSpanByName(SPAN_NAME_AGGREGATION_QUERY_GET); + SpanData grpcSpan = getGrpcSpanByName(SPAN_NAME_RUN_AGGREGATION_QUERY); + assertNotNull(getSpan); + assertNotNull(grpcSpan); + assertEquals(grpcSpan.getParentSpanId(), getSpan.getSpanId()); + assertSameTrace(getSpan, grpcSpan); + assertHasExpectedAttributes(getSpan); + List events = getSpan.getEvents(); + assertTrue(events.size() > 0); + assertTrue(events.get(0).getAttributes().size() > 0); + assertEquals(events.get(0).getName(), "RunAggregationQuery Stream started."); + assertEquals( + events.get(0).getAttributes().get(AttributeKey.booleanKey("isRetryAttempt")), false); + } + + @Test + public void bulkWriterCommit() throws Exception { + ScheduledExecutorService bulkWriterExecutor = Executors.newSingleThreadScheduledExecutor(); + BulkWriter bulkWriter = + firestore.bulkWriter(BulkWriterOptions.builder().setExecutor(bulkWriterExecutor).build()); + bulkWriter.set( + firestore.collection("col").document("foo"), + Collections.singletonMap("bulk-foo", "bulk-bar")); + bulkWriter.close(); + bulkWriterExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); + + List spans = prepareSpans(); + assertEquals(2, spans.size()); + assertSpanHierarchy(SPAN_NAME_BULK_WRITER_COMMIT, grpcSpanName(BATCH_WRITE_RPC_NAME)); + } + + @Test + public void partitionQuery() throws Exception { + CollectionGroup collectionGroup = firestore.collectionGroup("col"); + collectionGroup.getPartitions(3).get(); + + List spans = prepareSpans(); + assertEquals(2, spans.size()); + assertSpanHierarchy(SPAN_NAME_PARTITION_QUERY, grpcSpanName(SPAN_NAME_PARTITION_QUERY)); + } + + @Test + public void collectionListDocuments() throws Exception { + firestore.collection("col").listDocuments(); + + List spans = prepareSpans(); + assertEquals(2, spans.size()); + assertSpanHierarchy(SPAN_NAME_COL_REF_LIST_DOCUMENTS, grpcSpanName(LIST_DOCUMENTS_RPC_NAME)); + } +}