Skip to content

Commit

Permalink
Merge pull request #17335: [BEAM-14121] Fix SpannerIO service call me…
Browse files Browse the repository at this point in the history
…trics and improve tests.
  • Loading branch information
chamikaramj authored Jun 7, 2022
2 parents 77b813a + ef68037 commit 380110e
Show file tree
Hide file tree
Showing 8 changed files with 1,073 additions and 851 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ public static String datastoreResource(String projectId, String namespace) {
"//bigtable.googleapis.com/projects/%s/namespaces/%s", projectId, namespace);
}

public static String spannerTable(String projectId, String databaseId, String tableId) {
public static String spannerTable(
String projectId, String instanceId, String databaseId, String tableId) {
return String.format(
"//spanner.googleapis.com/projects/%s/topics/%s/tables/%s", projectId, databaseId, tableId);
"//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/tables/%s",
projectId, instanceId, databaseId, tableId);
}

public static String spannerQuery(String projectId, String queryName) {
return String.format("//spanner.googleapis.com/projects/%s/queries/%s", projectId, queryName);
public static String spannerQuery(
String projectId, String instanceId, String databaseId, String queryName) {
return String.format(
"//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/queries/%s",
projectId, instanceId, databaseId, queryName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import java.util.HashMap;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.ReadAll;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,21 @@ public static BatchSpannerRead create(

abstract TimestampBound getTimestampBound();

/**
* Container class to combine a ReadOperation with a Partition so that Metrics are implemented
* properly.
*/
@AutoValue
protected abstract static class PartitionedReadOperation implements Serializable {
abstract ReadOperation getReadOperation();

abstract Partition getPartition();

static PartitionedReadOperation create(ReadOperation readOperation, Partition partition) {
return new AutoValue_BatchSpannerRead_PartitionedReadOperation(readOperation, partition);
}
}

@Override
public PCollection<Struct> expand(PCollection<ReadOperation> input) {
PCollectionView<Transaction> txView = getTxView();
Expand All @@ -84,14 +100,14 @@ public PCollection<Struct> expand(PCollection<ReadOperation> input) {
.apply(
"Generate Partitions",
ParDo.of(new GeneratePartitionsFn(getSpannerConfig(), txView)).withSideInputs(txView))
.apply("Shuffle partitions", Reshuffle.<Partition>viaRandomKey())
.apply("Shuffle partitions", Reshuffle.viaRandomKey())
.apply(
"Read from Partitions",
ParDo.of(new ReadFromPartitionFn(getSpannerConfig(), txView)).withSideInputs(txView));
}

@VisibleForTesting
static class GeneratePartitionsFn extends DoFn<ReadOperation, Partition> {
static class GeneratePartitionsFn extends DoFn<ReadOperation, PartitionedReadOperation> {

private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;
Expand All @@ -102,6 +118,8 @@ public GeneratePartitionsFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
this.config = config;
this.txView = txView;
Preconditions.checkNotNull(config.getRpcPriority());
Preconditions.checkNotNull(config.getRpcPriority().get());
}

@Setup
Expand All @@ -117,75 +135,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
BatchReadOnlyTransaction context =
BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
for (Partition p : execute(c.element(), context)) {
c.output(p);
}
}

private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction tx) {
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, tx, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, tx);
}
}

private List<Partition> executeWithoutPriority(ReadOperation op, BatchReadOnlyTransaction tx) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
}
// Read with index was selected.
if (op.getIndex() != null) {
return tx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns());
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(), op.getTable(), op.getKeySet(), op.getColumns());
}

private List<Partition> executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority rpcPriority) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(
op.getPartitionOptions(), op.getQuery(), Options.priority(rpcPriority));
ReadOperation op = c.element();

// While this creates a ServiceCallMetric for every input element, in reality, the number
// of input elements will either be very few (normally 1!), or they will differ and
// need different metrics.
ServiceCallMetric metric = ReadAll.buildServiceCallMetricForReadOp(config, op);

List<Partition> partitions;
try {
if (op.getQuery() != null) {
// Query was selected.
partitions =
batchTx.partitionQuery(
op.getPartitionOptions(),
op.getQuery(),
Options.priority(config.getRpcPriority().get()));
} else if (op.getIndex() != null) {
// Read with index was selected.
partitions =
batchTx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority().get()));
} else {
// Read from table was selected.
partitions =
batchTx.partitionRead(
op.getPartitionOptions(),
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority().get()));
}
metric.call("ok");
} catch (SpannerException e) {
metric.call(e.getErrorCode().getGrpcStatusCode().toString());
throw e;
}
// Read with index was selected.
if (op.getIndex() != null) {
return tx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(rpcPriority));
for (Partition p : partitions) {
c.output(PartitionedReadOperation.create(op, p));
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(),
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(rpcPriority));
}
}

private static class ReadFromPartitionFn extends DoFn<Partition, Struct> {
private static class ReadFromPartitionFn extends DoFn<PartitionedReadOperation, Struct> {

private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;

private transient SpannerAccessor spannerAccessor;
private transient String projectId;
private transient ServiceCallMetric serviceCallMetric;
private transient LoadingCache<ReadOperation, ServiceCallMetric> metricsForReadOperation;

public ReadFromPartitionFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
Expand All @@ -196,24 +201,28 @@ public ReadFromPartitionFn(
@Setup
public void setup() throws Exception {
spannerAccessor = SpannerAccessor.getOrCreate(config);
projectId =
this.config.getProjectId() == null
|| this.config.getProjectId().get() == null
|| this.config.getProjectId().get().isEmpty()
? SpannerOptions.getDefaultProjectId()
: this.config.getProjectId().get();

// Use a LoadingCache for metrics as there can be different read operations which result in
// different service call metrics labels. ServiceCallMetric items are created on-demand and
// added to the cache.
metricsForReadOperation =
CacheBuilder.newBuilder()
.maximumSize(SpannerIO.METRICS_CACHE_SIZE)
// worker.
.build(
new CacheLoader<ReadOperation, ServiceCallMetric>() {
@Override
public ServiceCallMetric load(ReadOperation op) {
return ReadAll.buildServiceCallMetricForReadOp(config, op);
}
});
}

@Teardown
public void teardown() throws Exception {
spannerAccessor.close();
}

@StartBundle
public void startBundle() throws Exception {
serviceCallMetric =
createServiceCallMetric(
projectId, this.config.getDatabaseId().get(), this.config.getInstanceId().get());
metricsForReadOperation.invalidateAll();
metricsForReadOperation.cleanUp();
}

@ProcessElement
Expand All @@ -223,8 +232,9 @@ public void processElement(ProcessContext c) throws Exception {
BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());

Partition p = c.element();
try (ResultSet resultSet = batchTx.execute(p)) {
PartitionedReadOperation op = c.element();
ServiceCallMetric serviceCallMetric = metricsForReadOperation.get(op.getReadOperation());
try (ResultSet resultSet = batchTx.execute(op.getPartition())) {
while (resultSet.next()) {
Struct s = resultSet.getCurrentRowAsStruct();
c.output(s);
Expand All @@ -236,22 +246,5 @@ public void processElement(ProcessContext c) throws Exception {
}
serviceCallMetric.call("ok");
}

private ServiceCallMetric createServiceCallMetric(
String projectId, String databaseId, String tableId) {
HashMap<String, String> baseLabels = new HashMap<>();
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner");
baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read");
baseLabels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId));
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId);
ServiceCallMetric serviceCallMetric =
new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
return serviceCallMetric;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -97,37 +99,26 @@ public void teardown() throws Exception {
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
ReadOperation op = c.element();
ServiceCallMetric serviceCallMetric =
SpannerIO.ReadAll.buildServiceCallMetricForReadOp(config, op);
BatchReadOnlyTransaction context =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
try (ResultSet resultSet = execute(op, context)) {
while (resultSet.next()) {
c.output(resultSet.getCurrentRowAsStruct());
}
} catch (SpannerException e) {
serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString());
throw (e);
}
serviceCallMetric.call("ok");
}

private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
RpcPriority rpcPriority = SpannerConfig.DEFAULT_RPC_PRIORITY;
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, readOnlyTransaction, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, readOnlyTransaction);
rpcPriority = config.getRpcPriority().get();
}
}

private ResultSet executeWithoutPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(op.getQuery());
}
if (op.getIndex() != null) {
return readOnlyTransaction.readUsingIndex(
op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns());
}
return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns());
}

private ResultSet executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction, RpcPriority rpcPriority) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(op.getQuery(), Options.priority(rpcPriority));
}
Expand Down
Loading

0 comments on commit 380110e

Please sign in to comment.