From 8c0e132695c5275d864c54125acac1fde58901fa Mon Sep 17 00:00:00 2001 From: Rafal Wojdyla Date: Thu, 28 Apr 2016 13:49:45 -0400 Subject: [PATCH] Add hadoopDistCache - support all Hadoop filesystems Hadoop DistCache will first upload artifacts from source to GCS and use those via standard GCS based cache mechanism. --- .../scala/com/spotify/scio/hdfs/package.scala | 83 ++++++++++++++++++- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/package.scala b/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/package.scala index af38630519..64b90892b3 100644 --- a/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/package.scala +++ b/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/package.scala @@ -17,18 +17,25 @@ package com.spotify.scio -import java.io.{InputStream, SequenceInputStream} +import java.io.{File, InputStream, SequenceInputStream} +import java.net.URI +import java.nio.channels.Channels +import java.security.PrivilegedAction import java.util.Collections +import com.google.api.client.util.ByteStreams import com.google.cloud.dataflow.contrib.hadoop._ import com.google.cloud.dataflow.contrib.hadoop.simpleauth._ import com.google.cloud.dataflow.sdk.coders.AvroCoder import com.google.cloud.dataflow.sdk.io.{Read, Write} +import com.google.cloud.dataflow.sdk.util.MimeTypes +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath import com.google.cloud.dataflow.sdk.values.KV import com.google.common.base.Charsets +import com.google.common.hash.Hashing import com.spotify.scio.io.{Tap, Taps} import com.spotify.scio.util.ScioUtil -import com.spotify.scio.values.SCollection +import com.spotify.scio.values.{DistCache, SCollection} import org.apache.avro.Schema import org.apache.avro.file.DataFileStream import org.apache.avro.generic.GenericDatumReader @@ -38,11 +45,13 @@ import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase} import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} -import org.apache.hadoop.io.compress.{CompressionCodecFactory, DefaultCodec, DeflateCodec} +import org.apache.hadoop.io.compress.{CompressionCodecFactory, DefaultCodec} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} -import org.apache.hadoop.mapreduce.{Job, MRConfig, MRJobConfig} +import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} +import org.apache.hadoop.security.UserGroupInformation +import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -64,6 +73,7 @@ package object hdfs { // TODO: scala 2.11 // implicit class HdfsScioContext(private val sc: ScioContext) extends AnyVal { implicit class HdfsScioContext(val self: ScioContext) { + private val logger = LoggerFactory.getLogger(ScioContext.getClass) /** Get an SCollection for a text file on HDFS. */ def hdfsTextFile(path: String, username: String = null): SCollection[String] = self.pipelineOp { @@ -99,6 +109,71 @@ package object hdfs { .map(_.getKey.datum()) } + /** + * Create a new [[com.spotify.scio.values.DistCache DistCache]] instance for a file on + * Hadoop/HDFS. + * + * @param path to the Hadoop/HDFS artifact + * @param conf optional custom Hadoop configuration + * @param username optional Hadoop Simple Authentication remote username + */ + def hadoopDistCache[F](path: String, + conf: Configuration = null, + username: String = null) + (initFn: File => F): DistCache[F] = self.pipelineOp { + if (self.isTest) { + //TODO: if test - can the path be remote HDFS/S3 etc? + self.distCache(path)(initFn) + } else { + //TODO: should upload be asynchronous, blocking on context close + require(self.options.getStagingLocation != null, + "Staging directory not set - use `--stagingLocation`!") + require(path != null, "Artifact path can't be null") + + val _conf = Option(conf).getOrElse(new Configuration()) + + //TODO: should we add checksums on both src and GCS to reuse uploaded artifacts? + val path_hash = Hashing.sha1().hashString(path, Charsets.UTF_8) + val target_hash = path_hash.toString.substring(0, 8) + + logger.debug(s"Add '$path' (hash: '$path_hash') to dist cache") + + val target_distcache = new Path("distcache", s"$target_hash-${path.split("/").last}") + + val target = new Path(self.options.getStagingLocation, target_distcache) + + if (username != null) { + UserGroupInformation.createRemoteUser(username).doAs(new PrivilegedAction[Unit] { + override def run(): Unit = { + hadoopDistCacheCopy(new Path(path), target.toUri, _conf) + } + }) + } else { + hadoopDistCacheCopy(new Path(path), target.toUri, _conf) + } + + self.distCache(target.toString)(initFn) + } + } + + private[scio] def hadoopDistCacheCopy(src: Path, target: URI, conf: Configuration): Unit = { + logger.debug(s"Will copy ${src.toUri}, to $target") + + val fs = src.getFileSystem(conf) + val inStream = fs.open(src) + + // TODO: Should we attempt to detect the Mime type rather tha always using MimeTypes.BINARY? + val outChannel = Channels.newOutputStream( + self.options.getGcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY)) + + try { + ByteStreams.copy(inStream, outChannel) + } finally { + outChannel.close() + inStream.close() + } + } + } /** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with HDFS methods. */