Skip to content

Commit

Permalink
Merge pull request #187 from spotify/bigtable-upgrade
Browse files Browse the repository at this point in the history
upgrade bigtable dependency to 0.9.0
  • Loading branch information
ravwojdyla authored Jun 29, 2016
2 parents 1e24e1a + e65f508 commit 1fddaa5
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 38 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.trueaccord.scalapb.{ScalaPbPlugin => PB}
val dataflowSdkVersion = "1.6.0"
val algebirdVersion = "0.12.1"
val avroVersion = "1.7.7"
val bigtableVersion = "0.3.0"
val bigtableVersion = "0.9.0"
val breezeVersion ="0.12"
val chillVersion = "0.8.0"
val commonsIoVersion = "2.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ private static void checkNotNullOrEmpty(String value, String type) {

private static void validateConfig(CloudBigtableConfiguration configuration) {
checkNotNullOrEmpty(configuration.getProjectId(), "projectId");
checkNotNullOrEmpty(configuration.getZoneId(), "zoneId");
checkNotNullOrEmpty(configuration.getClusterId(), "clusterId");
checkNotNullOrEmpty(configuration.getInstanceId(), "instanceId");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,29 @@ package object bigtable {

/** Get an SCollection for a Bigtable table. */
def bigTable(projectId: String,
clusterId: String,
zoneId: String,
instanceId: String,
tableId: String,
scan: Scan = null): SCollection[Result] = self.pipelineOp {
val _scan: Scan = if (scan != null) scan else new Scan()
val config = new bt.CloudBigtableScanConfiguration(
projectId, zoneId, clusterId, tableId, _scan)
val config = new bt.CloudBigtableScanConfiguration.Builder()
.withProjectId(projectId)
.withInstanceId(instanceId)
.withTableId(tableId)
.withScan(_scan)
.build
this.bigTable(config)
}

/** Get an SCollection for a Bigtable table. */
def bigTable(config: bt.CloudBigtableScanConfiguration): SCollection[Result] = self.pipelineOp {
if (self.isTest) {
val input = BigtableInput(
config.getProjectId, config.getClusterId, config.getZoneId, config.getTableId)
config.getProjectId, config.getInstanceId, config.getTableId)
self.getTestInput[Result](input)
} else {
self
.wrap(self.applyInternal(Read.from(bt.CloudBigtableIO.read(config))))
.setName(
s"${config.getProjectId} ${config.getClusterId} " +
s"${config.getZoneId} ${config.getTableId}")
.setName(s"${config.getProjectId} ${config.getInstanceId} ${config.getTableId}")
}
}

Expand All @@ -78,13 +79,12 @@ package object bigtable {

/** Save this SCollection as a Bigtable table. Note that elements must be of type Mutation. */
def saveAsBigtable(projectId: String,
clusterId: String,
zoneId: String,
instanceId: String,
tableId: String,
additionalConfiguration: Map[String, String] = Map.empty)
(implicit ev: T <:< Mutation): Future[Tap[Result]] = {
val config = new bt.CloudBigtableTableConfiguration(
projectId, zoneId, clusterId, tableId, additionalConfiguration.asJava)
projectId, instanceId, tableId, additionalConfiguration.asJava)
this.saveAsBigtable(config)
}

Expand All @@ -93,7 +93,7 @@ package object bigtable {
(implicit ev: T <:< Mutation): Future[Tap[Result]] = {
if (self.context.isTest) {
val output = BigtableOutput(
config.getProjectId, config.getClusterId, config.getZoneId, config.getTableId)
config.getProjectId, config.getInstanceId, config.getTableId)
self.context.testOut(output)(self)
} else {
bt.CloudBigtableIO.initializeForWrite(self.context.pipeline)
Expand All @@ -120,13 +120,12 @@ package object bigtable {
* Mutation.
*/
def saveAsMultipleBigtable(projectId: String,
clusterId: String,
zoneId: String,
instanceId: String,
additionalConfiguration: Map[String, String] = Map.empty)
(implicit ev: T <:< Mutation)
: Future[Tap[(String, Iterable[Result])]] = {
val config = new bt.CloudBigtableTableConfiguration(
projectId, zoneId, clusterId, null, additionalConfiguration.asJava)
projectId, instanceId, null, additionalConfiguration.asJava)
this.saveAsMultipleBigtable(config)
}

Expand All @@ -139,7 +138,7 @@ package object bigtable {
: Future[Tap[(String, Iterable[Result])]] = {
if (self.context.isTest) {
val output = MultipleBigtableOutput(
config.getProjectId, config.getClusterId, config.getZoneId)
config.getProjectId, config.getInstanceId)
self.context.testOut(output.asInstanceOf[TestIO[(String, Iterable[T])]])(self)
} else {
val transform = BigtableMultiTableWrite.writeToMultipleTables(config)
Expand All @@ -151,18 +150,16 @@ package object bigtable {
}
}

case class BigtableInput(projectId: String, clusterId: String, zoneId: String, tableId: String)
extends TestIO[Result](s"$projectId\t$clusterId\t$zoneId\t$tableId")
case class BigtableInput(projectId: String, instanceId: String, tableId: String)
extends TestIO[Result](s"$projectId\t$instanceId\t$tableId")

case class BigtableOutput[T <: Mutation](projectId: String,
clusterId: String,
zoneId: String,
instanceId: String,
tableId: String)
extends TestIO[T](s"$projectId\t$clusterId\t$zoneId\t$tableId")
extends TestIO[T](s"$projectId\t$instanceId\t$tableId")

case class MultipleBigtableOutput[T <: Mutation](projectId: String,
clusterId: String,
zoneId: String)
extends TestIO[(String, Iterable[T])](s"$projectId\t$clusterId\t$zoneId")
instanceId: String)
extends TestIO[(String, Iterable[T])](s"$projectId\t$instanceId")

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ runMain
--stagingLocation=gs://[BUCKET]/path/to/staging
--input=gs://dataflow-samples/shakespeare/kinglear.txt
--bigtableProjectId=[BIG_TABLE_PROJECT_ID]
--bigtableClusterId=[BIG_TABLE_CLUSTER_ID]
--bigtableZoneId=[BIG_TABLE_ZONE_ID]
--bigtableInstanceId=[BIG_TABLE_INSTANCE_ID]
--bigtableTableId=[BIG_TABLE_TABLE_ID]
*/

Expand Down Expand Up @@ -77,8 +76,7 @@ runMain
--project=[PROJECT] --runner=DataflowPipelineRunner --zone=[ZONE]
--stagingLocation=gs://[BUCKET]/path/to/staging
--bigtableProjectId=[BIG_TABLE_PROJECT_ID]
--bigtableClusterId=[BIG_TABLE_CLUSTER_ID]
--bigtableZoneId=[BIG_TABLE_ZONE_ID]
--bigtableInstanceId=[BIG_TABLE_INSTANCE_ID]
--bigtableTableId=[BIG_TABLE_TABLE_ID]
--output=gs://[BUCKET]/[PATH]/wordcount
*/
Expand Down Expand Up @@ -106,8 +104,7 @@ runMain
--stagingLocation=gs://[BUCKET]/path/to/staging
--input=gs://dataflow-samples/shakespeare/kinglear.txt
--bigtableProjectId=[BIG_TABLE_PROJECT_ID]
--bigtableClusterId=[BIG_TABLE_CLUSTER_ID]
--bigtableZoneId=[BIG_TABLE_ZONE_ID]
--bigtableInstanceId=[BIG_TABLE_INSTANCE_ID]
--bigtableTableId=[BIG_TABLE_TABLE_ID]
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ class BigtableExampleTest extends PipelineSpec {

val bigtableOptions = Seq(
"--bigtableProjectId=my-project",
"--bigtableClusterId=my-cluster",
"--bigtableZoneId=us-east1-a",
"--bigtableInstanceId=my-instance",
"--bigtableTableId=my-table")

val textIn = Seq("a b c d e", "a b a b")
Expand All @@ -62,7 +61,7 @@ class BigtableExampleTest extends PipelineSpec {
JobTest[com.spotify.scio.examples.extra.BigtableWriteExample.type]
.args(bigtableOptions :+ "--input=in.txt": _*)
.input(TextIO("in.txt"), textIn)
.output(bt.BigtableOutput[Put]("my-project", "my-cluster", "us-east1-a", "my-table")) {
.output(bt.BigtableOutput[Put]("my-project", "my-instance", "my-table")) {
_.map(comparablePut) should containInAnyOrder (expectedPuts.map(comparablePut))
}
.run()
Expand All @@ -81,7 +80,7 @@ class BigtableExampleTest extends PipelineSpec {
"BigtableReadExample" should "work" in {
JobTest[com.spotify.scio.examples.extra.BigtableReadExample.type]
.args(bigtableOptions :+ "--output=out.txt": _*)
.input(bt.BigtableInput("my-project", "my-cluster", "us-east1-a", "my-table"), resultIn)
.input(bt.BigtableInput("my-project", "my-instance", "my-table"), resultIn)
.output(TextIO("out.txt"))(_ should containInAnyOrder (expectedText))
.run()
}
Expand All @@ -100,7 +99,7 @@ class BigtableExampleTest extends PipelineSpec {
.args(bigtableOptions ++ Seq("--kinglear=k.txt", "--othello=o.txt"): _*)
.input(TextIO("k.txt"), kingLear)
.input(TextIO("o.txt"), othello)
.output(bt.MultipleBigtableOutput[Put]("my-project", "my-cluster", "us-east1-a")) {
.output(bt.MultipleBigtableOutput[Put]("my-project", "my-instance")) {
_.mapValues(_.map(comparablePut).toSet) should containInAnyOrder (
expectedMultiple.map(kv => (kv._1, kv._2.map(comparablePut).toSet)))
}
Expand Down

0 comments on commit 1fddaa5

Please sign in to comment.