From 7237102ef180b9e35b65b53a0e61166257b2a516 Mon Sep 17 00:00:00 2001 From: Rafal Wojdyla Date: Thu, 28 Apr 2016 17:43:51 -0400 Subject: [PATCH] Add example and test for hadoopDistCache --- build.sbt | 1 + .../extra/HadoopDistCacheExample.scala | 59 +++++++++++++++++++ .../examples/extra/HadoopDistCacheTest.scala | 33 +++++++++++ 3 files changed, 93 insertions(+) create mode 100644 scio-examples/src/main/scala/com/spotify/scio/examples/extra/HadoopDistCacheExample.scala create mode 100644 scio-examples/src/test/scala/com/spotify/scio/examples/extra/HadoopDistCacheTest.scala diff --git a/build.sbt b/build.sbt index dcb89f0ef4..f019488d16 100644 --- a/build.sbt +++ b/build.sbt @@ -307,6 +307,7 @@ lazy val scioExamples: Project = Project( scioCore, scioBigtable, scioSchemas, + scioHdfs, scioTest % "test" ) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/HadoopDistCacheExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/HadoopDistCacheExample.scala new file mode 100644 index 0000000000..8df7770ecc --- /dev/null +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/HadoopDistCacheExample.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2016 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.examples.extra + +import com.spotify.scio._ +import com.spotify.scio.bigquery._ +import com.spotify.scio.hdfs._ +import com.spotify.scio.examples.common.ExampleData +import org.joda.time.Instant + +/* +SBT +runMain + com.spotify.scio.examples.extra.DistCacheExample + --project=[PROJECT] --runner=DataflowPipelineRunner --zone=[ZONE] + --stagingLocation=gs://[BUCKET]/path/to/staging + --input=gs://dataflow-samples/wikipedia_edits/wiki_data-*.json + --output=gs://[BUCKET]/[PATH]/dist_cache_example +*/ + +// Use distributed cache inside a job +object HadoopDistCacheExample { + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + + // declare a HDFS file to be distributed to all workers and logic to load the file + val dc = sc.hadoopDistCache(args.getOrElse("months", ExampleData.MONTHS)) { f => + scala.io.Source.fromFile(f).getLines().map { s => + val t = s.split(" ") + (t(0).toInt, t(1)) + }.toMap + } + + sc + .tableRowJsonFile(args.getOrElse("input", ExampleData.EXPORTED_WIKI_TABLE)) + .map(row => new Instant(row.getLong("timestamp") * 1000L).toDateTime.getMonthOfYear) + .countByValue + // distributed cache available inside a transform + .map(kv => dc().getOrElse(kv._1, "unknown") + " " + kv._2) + .saveAsTextFile(args("output")) + + sc.close() + } +} diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/HadoopDistCacheTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/HadoopDistCacheTest.scala new file mode 100644 index 0000000000..947926a5ab --- /dev/null +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/HadoopDistCacheTest.scala @@ -0,0 +1,33 @@ +package com.spotify.scio.examples.extra + +import com.spotify.scio.bigquery.TableRow +import com.spotify.scio.examples.common.ExampleData +import com.spotify.scio.testing._ +import org.joda.time.format.DateTimeFormat + + +class HadoopDistCacheTest extends PipelineSpec { + val fmt = DateTimeFormat.forPattern("yyyyMMdd") + def d2t(date: String): Long = fmt.parseDateTime(date).getMillis / 1000 + + val in = Seq( + TableRow("timestamp" -> d2t("20150101")), + TableRow("timestamp" -> d2t("20150102")), + TableRow("timestamp" -> d2t("20150103")), + TableRow("timestamp" -> d2t("20150201")), + TableRow("timestamp" -> d2t("20150202")), + TableRow("timestamp" -> d2t("20150301"))) + + val distCache = Map(1 -> "Jan", 2 -> "Feb", 3 -> "Mar") + + val expected = Seq("Jan 3", "Feb 2", "Mar 1") + + "HadoopDistCacheExample" should "work" in { + JobTest[com.spotify.scio.examples.extra.HadoopDistCacheExample.type] + .args("--output=out.txt") + .input(TableRowJsonIO(ExampleData.EXPORTED_WIKI_TABLE), in) + .distCache(DistCacheIO("hdfs://dataflow-samples/samples/misc/months.txt"), distCache) + .output(TextIO("out.txt"))(_ should containInAnyOrder(expected)) + .run() + } +}