From b1fcf2b760b86e23d593f65d34e2b11d798c713d Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 26 Apr 2024 16:56:19 +0200 Subject: [PATCH] Prefer waitUndilDone over waitUntilFinish in test and examples (#5349) --- .../spotify/scio/bigtable/BigtableIT.scala | 23 ++++++++++--------- .../scio/examples/DebuggingWordCount.scala | 2 +- .../com/spotify/scio/examples/WordCount.scala | 2 +- .../extra/ElasticsearchMinimalExample.scala | 2 +- .../scio/examples/extra/MetricsExample.scala | 2 +- .../scio/examples/extra/Neo4JExample.scala | 2 +- .../examples/extra/TapOutputExample.scala | 2 +- .../scio/examples/extra/WordCountScioIO.scala | 2 +- 8 files changed, 19 insertions(+), 18 deletions(-) diff --git a/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala b/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala index 0a9a6ac500..29e0407d3c 100644 --- a/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala @@ -100,7 +100,7 @@ class BigtableIT extends PipelineSpec { sc .parallelize(data.map(kv => toWriteMutation(kv._1, kv._2))) .saveAsBigtable(projectId, instanceId, tableId) - }.waitUntilFinish() + }.waitUntilDone() // Read rows back // Filter rows in case there are other keys in the table @@ -112,16 +112,17 @@ class BigtableIT extends PipelineSpec { sc .bigtable(projectId, instanceId, tableId, rowFilter = rowFilter) .map(fromRow) should containInAnyOrder(data) - }.waitUntilFinish() + }.waitUntilDone() } catch { case e: Throwable => throw e - } finally { - // Delete rows afterwards - runWithRealContext() { sc => - sc.parallelize(data.map(kv => toDeleteMutation(kv._1))) - .saveAsBigtable(projectId, instanceId, tableId) - } - } + } finally + { + // Delete rows afterwards + runWithRealContext() { sc => + sc.parallelize(data.map(kv => toDeleteMutation(kv._1))) + .saveAsBigtable(projectId, instanceId, tableId) + } + }.waitUntilFinish() } it should "work in bulk mode" in { @@ -140,7 +141,7 @@ class BigtableIT extends PipelineSpec { sc .parallelize(data.map(kv => toWriteMutation(kv._1, kv._2))) .saveAsBigtable(options, tableId, 1) - }.waitUntilFinish() + }.waitUntilDone() // Read rows back // Filter rows in case there are other keys in the table @@ -152,7 +153,7 @@ class BigtableIT extends PipelineSpec { sc .bigtable(projectId, instanceId, tableId, rowFilter = rowFilter) .map(fromRow) should containInAnyOrder(data) - }.waitUntilFinish() + }.waitUntilDone() } catch { case e: Throwable => throw e } finally diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/DebuggingWordCount.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/DebuggingWordCount.scala index d13246ad69..233bdb33d4 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/DebuggingWordCount.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/DebuggingWordCount.scala @@ -74,7 +74,7 @@ object DebuggingWordCount { .containsInAnyOrder(List(("Flourish", 3L), ("stomach", 1L)).asJava) // Execute the pipeline and block until it finishes - val result = sc.run().waitUntilFinish() + val result = sc.run().waitUntilDone() // Retrieve metric values require(result.counter(matchedWords).committed.get == 2) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/WordCount.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/WordCount.scala index 130ab91627..cc34ecb324 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/WordCount.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/WordCount.scala @@ -74,7 +74,7 @@ object WordCount { .saveAsTextFile(output) // Execute the pipeline and block until it finishes - val result = sc.run().waitUntilFinish() + val result = sc.run().waitUntilDone() // Retrieve metric values logger.info("Max: " + result.distribution(lineDist).committed.map(_.getMax)) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ElasticsearchMinimalExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ElasticsearchMinimalExample.scala index e960d81ce4..2a38fde607 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ElasticsearchMinimalExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ElasticsearchMinimalExample.scala @@ -79,7 +79,7 @@ object ElasticsearchMinimalExample { .saveAsElasticsearch(clusterOpts)(indexRequestBuilder) // Run pipeline - sc.run().waitUntilFinish() + sc.run().waitUntilDone() } private def indexer(index: String): ((String, Long)) => Iterable[BulkOperation] = { diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MetricsExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MetricsExample.scala index ac95813579..b09ac39b89 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MetricsExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MetricsExample.scala @@ -70,7 +70,7 @@ object MetricsExample { sum2.inc(i) } - val result = sc.run().waitUntilFinish() + val result = sc.run().waitUntilDone() // # Retrieving metrics diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/Neo4JExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/Neo4JExample.scala index 26472f54ae..edf999f923 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/Neo4JExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/Neo4JExample.scala @@ -53,6 +53,6 @@ object Neo4JExample { |""".stripMargin ) // Run pipeline - sc.run().waitUntilFinish() + sc.run().waitUntilDone() } } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TapOutputExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TapOutputExample.scala index aeff96a848..b01fe5eb7a 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TapOutputExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TapOutputExample.scala @@ -60,7 +60,7 @@ object TapOutputExample { // Re-open taps in new `ScioContext` (t1.open(sc2) ++ t2.open(sc2).map(_.toInt)).sum // Execute the pipeline and block until it completes - val result = sc2.run().waitUntilFinish() + val result = sc2.run().waitUntilDone() println(result.state) } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/WordCountScioIO.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/WordCountScioIO.scala index 1c84135b72..4abe930833 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/WordCountScioIO.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/WordCountScioIO.scala @@ -76,7 +76,7 @@ object WordCountScioIO { .write(outputTextIO)(TextIO.DefaultWriteParam) // Execute the pipeline and block until it finishes - val result = sc.run().waitUntilFinish() + val result = sc.run().waitUntilDone() // Retrieve metric values logger.info("Max: " + result.distribution(lineDist).committed.map(_.getMax))