Skip to content

Commit

Permalink
Add example and test for hadoopDistCache
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Wojdyla committed Apr 28, 2016
1 parent 05ac1d7 commit 7237102
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ lazy val scioExamples: Project = Project(
scioCore,
scioBigtable,
scioSchemas,
scioHdfs,
scioTest % "test"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.examples.extra

import com.spotify.scio._
import com.spotify.scio.bigquery._
import com.spotify.scio.hdfs._
import com.spotify.scio.examples.common.ExampleData
import org.joda.time.Instant

/*
SBT
runMain
com.spotify.scio.examples.extra.DistCacheExample
--project=[PROJECT] --runner=DataflowPipelineRunner --zone=[ZONE]
--stagingLocation=gs://[BUCKET]/path/to/staging
--input=gs://dataflow-samples/wikipedia_edits/wiki_data-*.json
--output=gs://[BUCKET]/[PATH]/dist_cache_example
*/

// Use distributed cache inside a job
object HadoopDistCacheExample {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

// declare a HDFS file to be distributed to all workers and logic to load the file
val dc = sc.hadoopDistCache(args.getOrElse("months", ExampleData.MONTHS)) { f =>
scala.io.Source.fromFile(f).getLines().map { s =>
val t = s.split(" ")
(t(0).toInt, t(1))
}.toMap
}

sc
.tableRowJsonFile(args.getOrElse("input", ExampleData.EXPORTED_WIKI_TABLE))
.map(row => new Instant(row.getLong("timestamp") * 1000L).toDateTime.getMonthOfYear)
.countByValue
// distributed cache available inside a transform
.map(kv => dc().getOrElse(kv._1, "unknown") + " " + kv._2)
.saveAsTextFile(args("output"))

sc.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.spotify.scio.examples.extra

import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.examples.common.ExampleData
import com.spotify.scio.testing._
import org.joda.time.format.DateTimeFormat


class HadoopDistCacheTest extends PipelineSpec {
val fmt = DateTimeFormat.forPattern("yyyyMMdd")
def d2t(date: String): Long = fmt.parseDateTime(date).getMillis / 1000

val in = Seq(
TableRow("timestamp" -> d2t("20150101")),
TableRow("timestamp" -> d2t("20150102")),
TableRow("timestamp" -> d2t("20150103")),
TableRow("timestamp" -> d2t("20150201")),
TableRow("timestamp" -> d2t("20150202")),
TableRow("timestamp" -> d2t("20150301")))

val distCache = Map(1 -> "Jan", 2 -> "Feb", 3 -> "Mar")

val expected = Seq("Jan 3", "Feb 2", "Mar 1")

"HadoopDistCacheExample" should "work" in {
JobTest[com.spotify.scio.examples.extra.HadoopDistCacheExample.type]
.args("--output=out.txt")
.input(TableRowJsonIO(ExampleData.EXPORTED_WIKI_TABLE), in)
.distCache(DistCacheIO("hdfs://dataflow-samples/samples/misc/months.txt"), distCache)
.output(TextIO("out.txt"))(_ should containInAnyOrder(expected))
.run()
}
}

0 comments on commit 7237102

Please sign in to comment.