From e65f5087f8a93f4ffa324953dfafe6694162207a Mon Sep 17 00:00:00 2001 From: Robert Gruener Date: Wed, 29 Jun 2016 17:37:56 -0400 Subject: [PATCH] upgrade bigtable dependency to 0.9.0 --- build.sbt | 2 +- .../bigtable/BigtableMultiTableWrite.java | 3 +- .../com/spotify/scio/bigtable/package.scala | 45 +++++++++---------- .../scio/examples/extra/BigtableExample.scala | 9 ++-- .../examples/extra/BigtableExampleTest.scala | 9 ++-- 5 files changed, 30 insertions(+), 38 deletions(-) diff --git a/build.sbt b/build.sbt index 9133975f45..d51d4707d2 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ import sbtunidoc.Plugin.UnidocKeys._ val dataflowSdkVersion = "1.6.0" val algebirdVersion = "0.12.1" val avroVersion = "1.7.7" -val bigtableVersion = "0.3.0" +val bigtableVersion = "0.9.0" val breezeVersion ="0.12" val chillVersion = "0.8.0" val commonsIoVersion = "2.5" diff --git a/scio-bigtable/src/main/java/com/spotify/scio/bigtable/BigtableMultiTableWrite.java b/scio-bigtable/src/main/java/com/spotify/scio/bigtable/BigtableMultiTableWrite.java index 1f2a1c7828..be66cf285f 100644 --- a/scio-bigtable/src/main/java/com/spotify/scio/bigtable/BigtableMultiTableWrite.java +++ b/scio-bigtable/src/main/java/com/spotify/scio/bigtable/BigtableMultiTableWrite.java @@ -144,7 +144,6 @@ private static void checkNotNullOrEmpty(String value, String type) { private static void validateConfig(CloudBigtableConfiguration configuration) { checkNotNullOrEmpty(configuration.getProjectId(), "projectId"); - checkNotNullOrEmpty(configuration.getZoneId(), "zoneId"); - checkNotNullOrEmpty(configuration.getClusterId(), "clusterId"); + checkNotNullOrEmpty(configuration.getInstanceId(), "instanceId"); } } diff --git a/scio-bigtable/src/main/scala/com/spotify/scio/bigtable/package.scala b/scio-bigtable/src/main/scala/com/spotify/scio/bigtable/package.scala index 7646cf9d68..3fd83124fe 100644 --- a/scio-bigtable/src/main/scala/com/spotify/scio/bigtable/package.scala +++ b/scio-bigtable/src/main/scala/com/spotify/scio/bigtable/package.scala @@ -43,13 +43,16 @@ package object bigtable { /** Get an SCollection for a Bigtable table. */ def bigTable(projectId: String, - clusterId: String, - zoneId: String, + instanceId: String, tableId: String, scan: Scan = null): SCollection[Result] = self.pipelineOp { val _scan: Scan = if (scan != null) scan else new Scan() - val config = new bt.CloudBigtableScanConfiguration( - projectId, zoneId, clusterId, tableId, _scan) + val config = new bt.CloudBigtableScanConfiguration.Builder() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withTableId(tableId) + .withScan(_scan) + .build this.bigTable(config) } @@ -57,14 +60,12 @@ package object bigtable { def bigTable(config: bt.CloudBigtableScanConfiguration): SCollection[Result] = self.pipelineOp { if (self.isTest) { val input = BigtableInput( - config.getProjectId, config.getClusterId, config.getZoneId, config.getTableId) + config.getProjectId, config.getInstanceId, config.getTableId) self.getTestInput[Result](input) } else { self .wrap(self.applyInternal(Read.from(bt.CloudBigtableIO.read(config)))) - .setName( - s"${config.getProjectId} ${config.getClusterId} " + - s"${config.getZoneId} ${config.getTableId}") + .setName(s"${config.getProjectId} ${config.getInstanceId} ${config.getTableId}") } } @@ -78,13 +79,12 @@ package object bigtable { /** Save this SCollection as a Bigtable table. Note that elements must be of type Mutation. */ def saveAsBigtable(projectId: String, - clusterId: String, - zoneId: String, + instanceId: String, tableId: String, additionalConfiguration: Map[String, String] = Map.empty) (implicit ev: T <:< Mutation): Future[Tap[Result]] = { val config = new bt.CloudBigtableTableConfiguration( - projectId, zoneId, clusterId, tableId, additionalConfiguration.asJava) + projectId, instanceId, tableId, additionalConfiguration.asJava) this.saveAsBigtable(config) } @@ -93,7 +93,7 @@ package object bigtable { (implicit ev: T <:< Mutation): Future[Tap[Result]] = { if (self.context.isTest) { val output = BigtableOutput( - config.getProjectId, config.getClusterId, config.getZoneId, config.getTableId) + config.getProjectId, config.getInstanceId, config.getTableId) self.context.testOut(output)(self) } else { bt.CloudBigtableIO.initializeForWrite(self.context.pipeline) @@ -120,13 +120,12 @@ package object bigtable { * Mutation. */ def saveAsMultipleBigtable(projectId: String, - clusterId: String, - zoneId: String, + instanceId: String, additionalConfiguration: Map[String, String] = Map.empty) (implicit ev: T <:< Mutation) : Future[Tap[(String, Iterable[Result])]] = { val config = new bt.CloudBigtableTableConfiguration( - projectId, zoneId, clusterId, null, additionalConfiguration.asJava) + projectId, instanceId, null, additionalConfiguration.asJava) this.saveAsMultipleBigtable(config) } @@ -139,7 +138,7 @@ package object bigtable { : Future[Tap[(String, Iterable[Result])]] = { if (self.context.isTest) { val output = MultipleBigtableOutput( - config.getProjectId, config.getClusterId, config.getZoneId) + config.getProjectId, config.getInstanceId) self.context.testOut(output.asInstanceOf[TestIO[(String, Iterable[T])]])(self) } else { val transform = BigtableMultiTableWrite.writeToMultipleTables(config) @@ -151,18 +150,16 @@ package object bigtable { } } - case class BigtableInput(projectId: String, clusterId: String, zoneId: String, tableId: String) - extends TestIO[Result](s"$projectId\t$clusterId\t$zoneId\t$tableId") + case class BigtableInput(projectId: String, instanceId: String, tableId: String) + extends TestIO[Result](s"$projectId\t$instanceId\t$tableId") case class BigtableOutput[T <: Mutation](projectId: String, - clusterId: String, - zoneId: String, + instanceId: String, tableId: String) - extends TestIO[T](s"$projectId\t$clusterId\t$zoneId\t$tableId") + extends TestIO[T](s"$projectId\t$instanceId\t$tableId") case class MultipleBigtableOutput[T <: Mutation](projectId: String, - clusterId: String, - zoneId: String) - extends TestIO[(String, Iterable[T])](s"$projectId\t$clusterId\t$zoneId") + instanceId: String) + extends TestIO[(String, Iterable[T])](s"$projectId\t$instanceId") } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala index 560c67e4f2..8ad0043945 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala @@ -48,8 +48,7 @@ runMain --stagingLocation=gs://[BUCKET]/path/to/staging --input=gs://dataflow-samples/shakespeare/kinglear.txt --bigtableProjectId=[BIG_TABLE_PROJECT_ID] - --bigtableClusterId=[BIG_TABLE_CLUSTER_ID] - --bigtableZoneId=[BIG_TABLE_ZONE_ID] + --bigtableInstanceId=[BIG_TABLE_INSTANCE_ID] --bigtableTableId=[BIG_TABLE_TABLE_ID] */ @@ -77,8 +76,7 @@ runMain --project=[PROJECT] --runner=DataflowPipelineRunner --zone=[ZONE] --stagingLocation=gs://[BUCKET]/path/to/staging --bigtableProjectId=[BIG_TABLE_PROJECT_ID] - --bigtableClusterId=[BIG_TABLE_CLUSTER_ID] - --bigtableZoneId=[BIG_TABLE_ZONE_ID] + --bigtableInstanceId=[BIG_TABLE_INSTANCE_ID] --bigtableTableId=[BIG_TABLE_TABLE_ID] --output=gs://[BUCKET]/[PATH]/wordcount */ @@ -106,8 +104,7 @@ runMain --stagingLocation=gs://[BUCKET]/path/to/staging --input=gs://dataflow-samples/shakespeare/kinglear.txt --bigtableProjectId=[BIG_TABLE_PROJECT_ID] - --bigtableClusterId=[BIG_TABLE_CLUSTER_ID] - --bigtableZoneId=[BIG_TABLE_ZONE_ID] + --bigtableInstanceId=[BIG_TABLE_INSTANCE_ID] --bigtableTableId=[BIG_TABLE_TABLE_ID] */ diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/BigtableExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/BigtableExampleTest.scala index 06bba0f270..82f5483d39 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/BigtableExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/BigtableExampleTest.scala @@ -50,8 +50,7 @@ class BigtableExampleTest extends PipelineSpec { val bigtableOptions = Seq( "--bigtableProjectId=my-project", - "--bigtableClusterId=my-cluster", - "--bigtableZoneId=us-east1-a", + "--bigtableInstanceId=my-instance", "--bigtableTableId=my-table") val textIn = Seq("a b c d e", "a b a b") @@ -62,7 +61,7 @@ class BigtableExampleTest extends PipelineSpec { JobTest[com.spotify.scio.examples.extra.BigtableWriteExample.type] .args(bigtableOptions :+ "--input=in.txt": _*) .input(TextIO("in.txt"), textIn) - .output(bt.BigtableOutput[Put]("my-project", "my-cluster", "us-east1-a", "my-table")) { + .output(bt.BigtableOutput[Put]("my-project", "my-instance", "my-table")) { _.map(comparablePut) should containInAnyOrder (expectedPuts.map(comparablePut)) } .run() @@ -81,7 +80,7 @@ class BigtableExampleTest extends PipelineSpec { "BigtableReadExample" should "work" in { JobTest[com.spotify.scio.examples.extra.BigtableReadExample.type] .args(bigtableOptions :+ "--output=out.txt": _*) - .input(bt.BigtableInput("my-project", "my-cluster", "us-east1-a", "my-table"), resultIn) + .input(bt.BigtableInput("my-project", "my-instance", "my-table"), resultIn) .output(TextIO("out.txt"))(_ should containInAnyOrder (expectedText)) .run() } @@ -100,7 +99,7 @@ class BigtableExampleTest extends PipelineSpec { .args(bigtableOptions ++ Seq("--kinglear=k.txt", "--othello=o.txt"): _*) .input(TextIO("k.txt"), kingLear) .input(TextIO("o.txt"), othello) - .output(bt.MultipleBigtableOutput[Put]("my-project", "my-cluster", "us-east1-a")) { + .output(bt.MultipleBigtableOutput[Put]("my-project", "my-instance")) { _.mapValues(_.map(comparablePut).toSet) should containInAnyOrder ( expectedMultiple.map(kv => (kv._1, kv._2.map(comparablePut).toSet))) }