Skip to content

Commit

Permalink
feat(spannerio): terminate DetectNewPartitions properly (#16)
Browse files Browse the repository at this point in the history
* feat(spannerio): terminate DetectNewPartitionsDoFn properly

* Log if tryClaim fails.

* Remove the unnecessary change.

* Fix reading of resultSet.

* Use try resource for resultSet.
  • Loading branch information
hengfengli authored Jun 21, 2021
1 parent 3075581 commit 993b695
Showing 1 changed file with 52 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,51 +124,62 @@ public ProcessContinuation processElement(
// TODO(hengfeng): move this to DAO.
String query =
String.format("SELECT * FROM `%s` WHERE State = 'CREATED'", this.metadataTableName);
ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query));

long currentIndex = tracker.currentRestriction().getFrom();
try (ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query))) {
long currentIndex = tracker.currentRestriction().getFrom();

// Output the records.
while (resultSet.next()) {
// TODO(hengfeng): change the log level in this file.
LOG.debug("Reading record currentIndex:" + currentIndex);
if (!tracker.tryClaim(currentIndex)) {
return ProcessContinuation.stop();
}
PartitionMetadata metadata = buildPartitionMetadata(resultSet);
LOG.debug(
String.format("Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));

currentIndex++;

Instant now = Instant.now();
LOG.debug("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now);
receiver.output(metadata);

// TODO(hengfeng): investigate if we can move this to DAO.
this.databaseClient
.readWriteTransaction()
.run(
transaction -> {
// Update the record to SCHEDULED.
// TODO(hengfeng): use mutations instead.
Statement updateStatement =
Statement.newBuilder(
String.format(
"UPDATE `%s` "
+ "SET State = 'SCHEDULED' "
+ "WHERE PartitionToken = @PartitionToken",
this.metadataTableName))
.bind("PartitionToken")
.to(metadata.getPartitionToken())
.build();
transaction.executeUpdate(updateStatement);
LOG.debug("Updated the record:" + metadata.getPartitionToken());
return null;
});
}
}

// Output the records.
while (resultSet.next()) {
// TODO(hengfeng): change the log level in this file.
LOG.debug("Reading record currentIndex:" + currentIndex);
if (!tracker.tryClaim(currentIndex)) {
// If there are no partitions in the table, we should stop this SDF
// function.
// TODO(hengfeng): move this query to DAO.
query = String.format("SELECT COUNT(*) FROM `%s`", this.metadataTableName);
try (ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query))) {
if (resultSet.next() && resultSet.getLong(0) == 0) {
if (!tracker.tryClaim(Long.MAX_VALUE)) {
LOG.warning("Failed to claim the end of range in DetectNewPartitionsDoFn.");
}
return ProcessContinuation.stop();
}
PartitionMetadata metadata = buildPartitionMetadata(resultSet);
LOG.debug(
String.format("Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));

currentIndex++;

Instant now = Instant.now();
LOG.debug("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now);
receiver.output(metadata);

// TODO(hengfeng): investigate if we can move this to DAO.
this.databaseClient
.readWriteTransaction()
.run(
transaction -> {
// Update the record to SCHEDULED.
// TODO(hengfeng): use mutations instead.
Statement updateStatement =
Statement.newBuilder(
String.format(
"UPDATE `%s` "
+ "SET State = 'SCHEDULED' "
+ "WHERE PartitionToken = @PartitionToken",
this.metadataTableName))
.bind("PartitionToken")
.to(metadata.getPartitionToken())
.build();
transaction.executeUpdate(updateStatement);
LOG.debug("Updated the record:" + metadata.getPartitionToken());
return null;
});
}

// TODO(hengfeng): investigate how we can terminate this function.
return ProcessContinuation.resume().withResumeDelay(resumeDuration);
}

Expand Down

0 comments on commit 993b695

Please sign in to comment.