Skip to content

Commit

Permalink
Merge pull request #115 from spotify/hdfs_cache
Browse files Browse the repository at this point in the history
Add hadoopDistCache - support all Hadoop filesystems
  • Loading branch information
nevillelyh committed May 10, 2016
2 parents a3e00cb + a3a5b92 commit 0c90f00
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 4 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ lazy val scioExamples: Project = Project(
scioCore,
scioBigtable,
scioSchemas,
scioHdfs,
scioTest % "test"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.examples.extra

import com.spotify.scio._
import com.spotify.scio.bigquery._
import com.spotify.scio.hdfs._
import com.spotify.scio.examples.common.ExampleData
import org.joda.time.Instant

/*
SBT
runMain
com.spotify.scio.examples.extra.DistCacheExample
--project=[PROJECT] --runner=DataflowPipelineRunner --zone=[ZONE]
--stagingLocation=gs://[BUCKET]/path/to/staging
--input=gs://dataflow-samples/wikipedia_edits/wiki_data-*.json
--output=gs://[BUCKET]/[PATH]/dist_cache_example
*/

// Use distributed cache inside a job
object HadoopDistCacheExample {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

// declare a HDFS file to be distributed to all workers and logic to load the file
val dc = sc.hadoopDistCache(args.getOrElse("months",
"hdfs://dataflow-samples/samples/misc/months.txt")) { f =>
scala.io.Source.fromFile(f).getLines().map { s =>
val t = s.split(" ")
(t(0).toInt, t(1))
}.toMap
}

sc
.tableRowJsonFile(args.getOrElse("input", ExampleData.EXPORTED_WIKI_TABLE))
.map(row => new Instant(row.getLong("timestamp") * 1000L).toDateTime.getMonthOfYear)
.countByValue
// distributed cache available inside a transform
.map(kv => dc().getOrElse(kv._1, "unknown") + " " + kv._2)
.saveAsTextFile(args("output"))

sc.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.spotify.scio.examples.extra

import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.examples.common.ExampleData
import com.spotify.scio.testing._
import org.joda.time.format.DateTimeFormat


class HadoopDistCacheTest extends PipelineSpec {
val fmt = DateTimeFormat.forPattern("yyyyMMdd")
def d2t(date: String): Long = fmt.parseDateTime(date).getMillis / 1000

val in = Seq(
TableRow("timestamp" -> d2t("20150101")),
TableRow("timestamp" -> d2t("20150102")),
TableRow("timestamp" -> d2t("20150103")),
TableRow("timestamp" -> d2t("20150201")),
TableRow("timestamp" -> d2t("20150202")),
TableRow("timestamp" -> d2t("20150301")))

val distCache = Map(1 -> "Jan", 2 -> "Feb", 3 -> "Mar")

val expected = Seq("Jan 3", "Feb 2", "Mar 1")

"HadoopDistCacheExample" should "work" in {
JobTest[com.spotify.scio.examples.extra.HadoopDistCacheExample.type]
.args("--output=out.txt")
.input(TableRowJsonIO(ExampleData.EXPORTED_WIKI_TABLE), in)
.distCache(DistCacheIO("hdfs://dataflow-samples/samples/misc/months.txt"), distCache)
.output(TextIO("out.txt"))(_ should containInAnyOrder(expected))
.run()
}
}
84 changes: 80 additions & 4 deletions scio-hdfs/src/main/scala/com/spotify/scio/hdfs/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

package com.spotify.scio

import java.io.{InputStream, SequenceInputStream}
import java.io.{File, InputStream, SequenceInputStream}
import java.net.URI
import java.nio.channels.Channels
import java.security.PrivilegedAction
import java.util.Collections

import com.google.api.client.util.ByteStreams
import com.google.cloud.dataflow.contrib.hadoop._
import com.google.cloud.dataflow.contrib.hadoop.simpleauth._
import com.google.cloud.dataflow.sdk.coders.AvroCoder
import com.google.cloud.dataflow.sdk.io.{Read, Write}
import com.google.cloud.dataflow.sdk.util.MimeTypes
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath
import com.google.cloud.dataflow.sdk.values.KV
import com.google.common.base.Charsets
import com.google.common.hash.Hashing
import com.spotify.scio.io.{Tap, Taps}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.values.SCollection
import com.spotify.scio.values.{DistCache, SCollection}
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.GenericDatumReader
Expand All @@ -38,11 +45,13 @@ import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, DefaultCodec, DeflateCodec}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, DefaultCodec}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapreduce.{Job, MRConfig, MRJobConfig}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.concurrent.Future
Expand All @@ -64,6 +73,7 @@ package object hdfs {
// TODO: scala 2.11
// implicit class HdfsScioContext(private val sc: ScioContext) extends AnyVal {
implicit class HdfsScioContext(val self: ScioContext) {
private val logger = LoggerFactory.getLogger(ScioContext.getClass)

/** Get an SCollection for a text file on HDFS. */
def hdfsTextFile(path: String, username: String = null): SCollection[String] = self.pipelineOp {
Expand Down Expand Up @@ -99,6 +109,72 @@ package object hdfs {
.map(_.getKey.datum())
}

/**
* Create a new [[com.spotify.scio.values.DistCache DistCache]] instance for a file on
* Hadoop/HDFS.
*
* @param path to the Hadoop/HDFS artifact
* @param conf optional custom Hadoop configuration
* @param username optional Hadoop Simple Authentication remote username
*/
def hadoopDistCache[F](path: String,
conf: Configuration = null,
username: String = null)
(initFn: File => F): DistCache[F] = self.pipelineOp {
if (self.isTest) {
self.distCache(path)(initFn)
} else {
//TODO: should upload be asynchronous, blocking on context close
require(self.options.getStagingLocation != null,
"Staging directory not set - use `--stagingLocation`!")
require(path != null, "Artifact path can't be null")

val _conf = Option(conf).getOrElse(new Configuration())

//TODO: should we add checksums on both src and GCS to reuse uploaded artifacts?
// to keep it simple, for now we upload each artifact, thus artifacts should be
// relatively small.
val pathHash = Hashing.sha1().hashString(path, Charsets.UTF_8)
val targetHash = pathHash.toString.substring(0, 8)

logger.debug(s"Add '$path' (hash: '$pathHash') to dist cache")

val targetDistCache = new Path("distcache", s"$targetHash-${path.split("/").last}")

val target = new Path(self.options.getStagingLocation, targetDistCache)

if (username != null) {
UserGroupInformation.createRemoteUser(username).doAs(new PrivilegedAction[Unit] {
override def run(): Unit = {
hadoopDistCacheCopy(new Path(path), target.toUri, _conf)
}
})
} else {
hadoopDistCacheCopy(new Path(path), target.toUri, _conf)
}

self.distCache(target.toString)(initFn)
}
}

private[scio] def hadoopDistCacheCopy(src: Path, target: URI, conf: Configuration): Unit = {
logger.debug(s"Will copy ${src.toUri}, to $target")

val fs = src.getFileSystem(conf)
val inStream = fs.open(src)

//TODO: Should we attempt to detect the Mime type rather than always using MimeTypes.BINARY?
val outChannel = Channels.newOutputStream(
self.options.getGcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY))

try {
ByteStreams.copy(inStream, outChannel)
} finally {
outChannel.close()
inStream.close()
}
}

}

/** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with HDFS methods. */
Expand Down

0 comments on commit 0c90f00

Please sign in to comment.