Skip to content

Commit

Permalink
Add hadoopDistCache - support all Hadoop filesystems
Browse files Browse the repository at this point in the history
Hadoop DistCache will first upload artifacts from source to GCS and
use those via standard GCS based cache mechanism.
  • Loading branch information
Rafal Wojdyla committed Apr 28, 2016
1 parent 4221346 commit 8c0e132
Showing 1 changed file with 79 additions and 4 deletions.
83 changes: 79 additions & 4 deletions scio-hdfs/src/main/scala/com/spotify/scio/hdfs/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

package com.spotify.scio

import java.io.{InputStream, SequenceInputStream}
import java.io.{File, InputStream, SequenceInputStream}
import java.net.URI
import java.nio.channels.Channels
import java.security.PrivilegedAction
import java.util.Collections

import com.google.api.client.util.ByteStreams
import com.google.cloud.dataflow.contrib.hadoop._
import com.google.cloud.dataflow.contrib.hadoop.simpleauth._
import com.google.cloud.dataflow.sdk.coders.AvroCoder
import com.google.cloud.dataflow.sdk.io.{Read, Write}
import com.google.cloud.dataflow.sdk.util.MimeTypes
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath
import com.google.cloud.dataflow.sdk.values.KV
import com.google.common.base.Charsets
import com.google.common.hash.Hashing
import com.spotify.scio.io.{Tap, Taps}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.values.SCollection
import com.spotify.scio.values.{DistCache, SCollection}
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.GenericDatumReader
Expand All @@ -38,11 +45,13 @@ import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, DefaultCodec, DeflateCodec}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, DefaultCodec}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapreduce.{Job, MRConfig, MRJobConfig}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.concurrent.Future
Expand All @@ -64,6 +73,7 @@ package object hdfs {
// TODO: scala 2.11
// implicit class HdfsScioContext(private val sc: ScioContext) extends AnyVal {
implicit class HdfsScioContext(val self: ScioContext) {
private val logger = LoggerFactory.getLogger(ScioContext.getClass)

/** Get an SCollection for a text file on HDFS. */
def hdfsTextFile(path: String, username: String = null): SCollection[String] = self.pipelineOp {
Expand Down Expand Up @@ -99,6 +109,71 @@ package object hdfs {
.map(_.getKey.datum())
}

/**
* Create a new [[com.spotify.scio.values.DistCache DistCache]] instance for a file on
* Hadoop/HDFS.
*
* @param path to the Hadoop/HDFS artifact
* @param conf optional custom Hadoop configuration
* @param username optional Hadoop Simple Authentication remote username
*/
def hadoopDistCache[F](path: String,
conf: Configuration = null,
username: String = null)
(initFn: File => F): DistCache[F] = self.pipelineOp {
if (self.isTest) {
//TODO: if test - can the path be remote HDFS/S3 etc?
self.distCache(path)(initFn)
} else {
//TODO: should upload be asynchronous, blocking on context close
require(self.options.getStagingLocation != null,
"Staging directory not set - use `--stagingLocation`!")
require(path != null, "Artifact path can't be null")

val _conf = Option(conf).getOrElse(new Configuration())

//TODO: should we add checksums on both src and GCS to reuse uploaded artifacts?
val path_hash = Hashing.sha1().hashString(path, Charsets.UTF_8)
val target_hash = path_hash.toString.substring(0, 8)

logger.debug(s"Add '$path' (hash: '$path_hash') to dist cache")

val target_distcache = new Path("distcache", s"$target_hash-${path.split("/").last}")

val target = new Path(self.options.getStagingLocation, target_distcache)

if (username != null) {
UserGroupInformation.createRemoteUser(username).doAs(new PrivilegedAction[Unit] {
override def run(): Unit = {
hadoopDistCacheCopy(new Path(path), target.toUri, _conf)
}
})
} else {
hadoopDistCacheCopy(new Path(path), target.toUri, _conf)
}

self.distCache(target.toString)(initFn)
}
}

private[scio] def hadoopDistCacheCopy(src: Path, target: URI, conf: Configuration): Unit = {
logger.debug(s"Will copy ${src.toUri}, to $target")

val fs = src.getFileSystem(conf)
val inStream = fs.open(src)

// TODO: Should we attempt to detect the Mime type rather tha always using MimeTypes.BINARY?
val outChannel = Channels.newOutputStream(
self.options.getGcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY))

try {
ByteStreams.copy(inStream, outChannel)
} finally {
outChannel.close()
inStream.close()
}
}

}

/** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with HDFS methods. */
Expand Down

0 comments on commit 8c0e132

Please sign in to comment.