Skip to content

Commit

Permalink
feat(spannerio): add DetectNewPartitionsDoFn (#5)
Browse files Browse the repository at this point in the history
* Add DetectNewPartitionsDoFn.

* Fix issues.

* Fix PartitionRecordCoder.

* Fix lint issues.

* Debug code.

* Allow to pass spanner instance in build.gradle.

* Set pipeline's BlockOnRun to false.

* Update to use PartitionMetadata.

* Fix the initialization of DAO issue.

* Support to pass a metadata table name to the pipeline for testing.

* Fix issues and clean up the debug print-out.

* Delete the test db after finishing.

* Fix all comments.

* Fix comments.

* Fix comments.

* Fix comments.
  • Loading branch information
hengfengli authored Jun 18, 2021
1 parent 0c9d0e2 commit 11e3c9e
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 17 deletions.
2 changes: 2 additions & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ dependencies {
task integrationTest(type: Test, dependsOn: processTestResources) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpSpannerInstance = project.findProperty('gcpSpannerInstance') ?: 'beam-test-instance'
def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
"--project=${gcpProject}",
"--instanceId=${gcpSpannerInstance}",
"--tempRoot=${gcpTempRoot}",
])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.gcp.spanner.cdc.DetectNewPartitions;
import org.apache.beam.sdk.io.gcp.spanner.cdc.ChangeStreamSourceDescriptor;
import org.apache.beam.sdk.io.gcp.spanner.cdc.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.cdc.PipelineInitializer;
import org.apache.beam.sdk.io.gcp.spanner.cdc.ReadPartitionChangeStream;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangesRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -413,6 +413,9 @@ public static Write write() {
public static ReadChangeStream readChangeStream() {
return new AutoValue_SpannerIO_ReadChangeStream.Builder()
.setSpannerConfig(SpannerConfig.create())
// TODO(hengfeng): revisit this to find a better way to create an uninitialized instance.
.setChangeStreamName("")
.setInclusiveStartAt(Timestamp.MIN_VALUE)
.build();
}

Expand Down Expand Up @@ -1269,17 +1272,20 @@ static byte[] encode(MutationGroup g) {

@AutoValue
public abstract static class ReadChangeStream
extends PTransform<PBegin, PCollection<DataChangesRecord>> {
extends PTransform<PBegin, PCollection<PartitionMetadata>> {

abstract SpannerConfig getSpannerConfig();

abstract String getChangeStreamName();

abstract Timestamp getInclusiveStartAt();

abstract Timestamp getExclusiveEndAt();
abstract @Nullable Timestamp getExclusiveEndAt();

abstract Deserializer getDeserializer();
abstract @Nullable Deserializer getDeserializer();

// TODO(hengfeng): Remove this when we can write the real IT test for the connector.
abstract @Nullable String getTestMetadataTable();

abstract Builder toBuilder();

Expand All @@ -1296,6 +1302,8 @@ abstract static class Builder {

abstract Builder setDeserializer(Deserializer deserializer);

abstract Builder setTestMetadataTable(String metadataTable);

abstract ReadChangeStream build();
}

Expand Down Expand Up @@ -1352,6 +1360,11 @@ public ReadChangeStream withExclusiveEndAt(Timestamp timestamp) {
return toBuilder().setExclusiveEndAt(timestamp).build();
}

/** Specifies the change stream metadata table name for testing purpose. */
public ReadChangeStream withTestMetadataTable(String metadataTable) {
return toBuilder().setTestMetadataTable(metadataTable).build();
}

/**
* Specifies the class to be used to transform the data records read from the change stream into
* Java objects or other serial formats.
Expand All @@ -1361,7 +1374,7 @@ public ReadChangeStream withDeserializer(Deserializer deserializer) {
}

@Override
public PCollection<DataChangesRecord> expand(PBegin input) {
public PCollection<PartitionMetadata> expand(PBegin input) {
checkArgument(
getSpannerConfig() != null,
"SpannerIO.readChangeStream() requires the spanner config to be set.");
Expand Down Expand Up @@ -1400,6 +1413,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getExclusiveEndAt().toSqlTimesta
getSpannerConfig().getInstanceId().get(),
getSpannerConfig().getDatabaseId().get());
String partitionMetadataTableName = generateMetadataTableName(databaseId.getDatabase());
if (!getTestMetadataTable().isEmpty()) {
partitionMetadataTableName = getTestMetadataTable();
}

PartitionMetadataDao partitionMetadataDao =
new PartitionMetadataDao(databaseClient, partitionMetadataTableName);
Expand All @@ -1410,10 +1426,21 @@ && getInclusiveStartAt().toSqlTimestamp().after(getExclusiveEndAt().toSqlTimesta
getInclusiveStartAt(),
getExclusiveEndAt());

List<ChangeStreamSourceDescriptor> sources = new ArrayList<>();
sources.add(
ChangeStreamSourceDescriptor.of(
getChangeStreamName(),
partitionMetadataTableName,
getInclusiveStartAt(),
getExclusiveEndAt()));

return input
.apply("Execute query", Create.of(1))
.apply(ParDo.of(new DetectNewPartitions()))
.apply(new ReadPartitionChangeStream());
.apply("Generate change stream sources", Create.of(sources))
.apply(
"Detect new partitions",
ParDo.of(
new DetectNewPartitionsDoFn(getSpannerConfig(), partitionMetadataTableName)));
// .apply(new ReadPartitionChangeStreamDoFn());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import org.apache.beam.sdk.transforms.DoFn;
import com.google.auto.value.AutoValue;
import com.google.cloud.Timestamp;
import java.io.Serializable;
import javax.annotation.Nullable;

public class DetectNewPartitions extends DoFn<Integer, String> {}
/** Represents a Cloud Spanner Change Stream source description. */
@AutoValue
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public abstract class ChangeStreamSourceDescriptor implements Serializable {
abstract String getChangeStreamName();

abstract String getMetadataTableName();

abstract @Nullable Timestamp getStartAt();

abstract @Nullable Timestamp getEndAt();

public static ChangeStreamSourceDescriptor of(
String changeStreamName, String metadataTableName, Timestamp startAt, Timestamp endAt) {
return new AutoValue_ChangeStreamSourceDescriptor(
changeStreamName, metadataTableName, startAt, endAt);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A SplittableDoFn which reads from {@link ChangeStreamSourceDescriptor} and outputs {@link
* PartitionMetadata}.
*
* <p>{@link DetectNewPartitionsDoFn} implements the logic of querying the partition metadata table
* from Cloud Spanner. The element is a {@link ChangeStreamSourceDescriptor}, and the restriction is
* an {@link OffsetRange} which represents record offset. A {@link GrowableOffsetRangeTracker} is
* used to track an {@link OffsetRange} ended with {@code Long.MAX_VALUE}. For a finite range, a
* {@link OffsetRangeTracker} is created.
*/
@UnboundedPerElement
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class DetectNewPartitionsDoFn extends DoFn<ChangeStreamSourceDescriptor, PartitionMetadata> {
private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsDoFn.class);
private SpannerAccessor spannerAccessor;
private DatabaseClient databaseClient;
private SpannerConfig spannerConfig;
// TODO(hengfeng): Make this field configurable via constructor or spanner config.
private Duration resumeDuration = Duration.millis(100L);;
private String metadataTableName;
private PartitionMetadataDao partitionMetadataDao;

public DetectNewPartitionsDoFn(SpannerConfig config, String metadataTableName) {
this.spannerConfig = config;
this.metadataTableName = metadataTableName;
}

public DetectNewPartitionsDoFn(
SpannerConfig config, String metadataTableName, Duration resumeDuration) {
this(config, metadataTableName);
this.resumeDuration = resumeDuration;
}

@GetInitialRestriction
public OffsetRange initialRestriction(@Element ChangeStreamSourceDescriptor inputElement) {
return new OffsetRange(0, Long.MAX_VALUE);
}

@NewTracker
public OffsetRangeTracker restrictionTracker(
@Element ChangeStreamSourceDescriptor inputElement, @Restriction OffsetRange restriction) {
return new OffsetRangeTracker(new OffsetRange(restriction.getFrom(), Long.MAX_VALUE));
}

@ProcessElement
public ProcessContinuation processElement(
@Element ChangeStreamSourceDescriptor inputElement,
RestrictionTracker<OffsetRange, Long> tracker,
WatermarkEstimator watermarkEstimator,
OutputReceiver<PartitionMetadata> receiver) {

Instant start = Instant.now();

LOG.info("Calling process element:" + start);

// Find all records where their states are CREATED.
// TODO(hengfeng): move this to DAO.
String query =
String.format("SELECT * FROM `%s` WHERE State = 'CREATED'", this.metadataTableName);
ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query));

long currentIndex = tracker.currentRestriction().getFrom();

// Output the records.
while (resultSet.next()) {
// TODO(hengfeng): change the log level in this file.
LOG.info("Reading record currentIndex:" + currentIndex);
if (!tracker.tryClaim(currentIndex)) {
return ProcessContinuation.stop();
}
PartitionMetadata metadata = buildPartitionMetadata(resultSet);
LOG.info(
String.format("Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));

currentIndex++;

Instant now = Instant.now();
LOG.info("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now);
receiver.output(metadata);

// TODO(hengfeng): investigate if we can move this to DAO.
this.databaseClient
.readWriteTransaction()
.run(
transaction -> {
// Update the record to SCHEDULED.
// TODO(hengfeng): use mutations instead.
Statement updateStatement =
Statement.newBuilder(
String.format(
"UPDATE `%s` "
+ "SET State = 'SCHEDULED' "
+ "WHERE PartitionToken = @PartitionToken",
this.metadataTableName))
.bind("PartitionToken")
.to(metadata.getPartitionToken())
.build();
transaction.executeUpdate(updateStatement);
LOG.info("Updated the record:" + metadata.getPartitionToken());
return null;
});
}

// TODO(hengfeng): investigate how we can terminate this function.
return ProcessContinuation.resume().withResumeDelay(resumeDuration);
}

@Setup
public void setup() throws Exception {
this.spannerAccessor = SpannerAccessor.getOrCreate(this.spannerConfig);
this.databaseClient = spannerAccessor.getDatabaseClient();
this.partitionMetadataDao =
new PartitionMetadataDao(this.databaseClient, this.metadataTableName);
}

@Teardown
public void teardown() throws Exception {
this.spannerAccessor.close();
}

@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
return currentElementTimestamp;
}

@NewWatermarkEstimator
public WatermarkEstimator<Instant> newWatermarkEstimator(
@WatermarkEstimatorState Instant watermarkEstimatorState) {
return new MonotonicallyIncreasing(watermarkEstimatorState);
}

private PartitionMetadata buildPartitionMetadata(ResultSet resultSet) {
return new PartitionMetadata(
resultSet.getString(PartitionMetadataDao.COLUMN_PARTITION_TOKEN),
resultSet.getStringList(PartitionMetadataDao.COLUMN_PARENT_TOKEN),
resultSet.getTimestamp(PartitionMetadataDao.COLUMN_START_TIMESTAMP),
resultSet.getBoolean(PartitionMetadataDao.COLUMN_INCLUSIVE_START),
!resultSet.isNull(PartitionMetadataDao.COLUMN_END_TIMESTAMP)
? resultSet.getTimestamp(PartitionMetadataDao.COLUMN_END_TIMESTAMP)
: null,
resultSet.getBoolean(PartitionMetadataDao.COLUMN_INCLUSIVE_END),
resultSet.getLong(PartitionMetadataDao.COLUMN_HEARTBEAT_SECONDS),
PartitionMetadata.State.valueOf(resultSet.getString(PartitionMetadataDao.COLUMN_STATE)),
resultSet.getTimestamp(PartitionMetadataDao.COLUMN_CREATED_AT),
resultSet.getTimestamp(PartitionMetadataDao.COLUMN_UPDATED_AT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangesRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;

public class ReadPartitionChangeStream
extends PTransform<PCollection<String>, PCollection<DataChangesRecord>> {
public class ReadPartitionChangeStreamDoFn
extends PTransform<PCollection<PartitionMetadata>, PCollection<DataChangesRecord>> {

@Override
public PCollection<DataChangesRecord> expand(PCollection<String> input) {
public PCollection<DataChangesRecord> expand(PCollection<PartitionMetadata> input) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
* limitations under the License.
*/

/** User model for the Spanner change stream API. */
/** Data access layer for the Spanner change stream API. */
package org.apache.beam.sdk.io.gcp.spanner.cdc.dao;
Loading

0 comments on commit 11e3c9e

Please sign in to comment.