From e0f89f0608b46ad84a290edf67e59b3439f7e88d Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 2 Jan 2015 08:59:09 -0800 Subject: [PATCH 1/3] Added ByteswapPartitioner and made it the defaultPartitioner --- .../scala/org/apache/spark/Partitioner.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index e53a78ead2c0e..8e7e3dd459d37 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -60,9 +60,9 @@ object Partitioner { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { - new HashPartitioner(rdd.context.defaultParallelism) + new ByteswapPartitioner(rdd.context.defaultParallelism) } else { - new HashPartitioner(bySize.head.partitions.size) + new ByteswapPartitioner(bySize.head.partitions.size) } } } @@ -93,6 +93,18 @@ class HashPartitioner(partitions: Int) extends Partitioner { override def hashCode: Int = numPartitions } +/** + * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using + * Java's `Object.hashCode`. In order to spread-out hashCodes that are divisible by + * `numPartitions`, `byteswap32` is applied to the hashCodes before modding by `numPartitions`. + */ +class ByteswapPartitioner(partitions: Int) extends HashPartitioner(partitions) { + override def getPartition(key: Any): Int = key match { + case null => 0 + case _ => Utils.nonNegativeMod(byteswap32(key.hashCode), numPartitions) + } +} + /** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. From 466c8cf7cee2eb2c8c70cf37bb2349b22219e3c6 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 5 Jan 2015 15:18:34 -0800 Subject: [PATCH 2/3] Added spark.default.partitioner to conf --- .../main/scala/org/apache/spark/Partitioner.scala | 13 +++++++++++-- .../scala/org/apache/spark/rdd/PipedRDDSuite.scala | 10 ++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 8e7e3dd459d37..214b9be8c7d81 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -55,14 +55,23 @@ object Partitioner { * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { + val shortPartitionerNames = Map( + "hash" -> "org.apache.spark.HashPartitioner", + "byteswap" -> "org.apache.spark.ByteswapPartitioner" + ) + val defaultPartitionerName = rdd.conf.get("spark.default.partitioner", "hash") + val className = + shortPartitionerNames.getOrElse(defaultPartitionerName.toLowerCase, defaultPartitionerName) + val ctor = Class.forName(className, true, Utils.getContextOrSparkClassLoader) + .getConstructor(classOf[Int]) val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { - new ByteswapPartitioner(rdd.context.defaultParallelism) + ctor.newInstance(rdd.context.defaultParallelism: java.lang.Integer).asInstanceOf[Partitioner] } else { - new ByteswapPartitioner(bySize.head.partitions.size) + ctor.newInstance(bySize.head.partitions.size: java.lang.Integer).asInstanceOf[Partitioner] } } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index be972c5e97a7e..9f11db88f92b2 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -32,6 +32,16 @@ import scala.util.Try class PipedRDDSuite extends FunSuite with SharedSparkContext { + override def beforeAll() { + System.setProperty("spark.default.partitioner", "hash") + super.beforeAll() + } + + override def afterAll() { + System.clearProperty("spark.default.partitioner") + super.afterAll() + } + test("basic pipe") { if (testCommandAvailable("cat")) { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) From 8e01a6287d3858c8cc43cd0d73948631bd2fcbec Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 5 Jan 2015 16:56:19 -0800 Subject: [PATCH 3/3] spark.default.partitioner docs --- docs/configuration.md | 11 +++++++++++ mllib/pom.xml | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 13fc251c1733f..029213558fcd4 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -488,6 +488,17 @@ Apart from these, the following properties are also available, and may be useful (groupByKey, reduceByKey, etc) when not set by user. + + spark.default.partitioner + hash + + Implementation to use for partitioning key-value data pairs. There are two implementations + available:hash and byteswap. Both are based on the hashCode of + the keys mod the number of partitions, but the byteswap partitioner also + applies byteswap32 to the hash codes, which helps guarantee that all partitions are used + even when the hash codes are divisible by a factor of the number of partitions. + + spark.broadcast.factory org.apache.spark.broadcast.
TorrentBroadcastFactory diff --git a/mllib/pom.xml b/mllib/pom.xml index 5a29020b124ed..288bbecf97851 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.1--csd-1-SNAPSHOT + 1.1.1-csd-1-SNAPSHOT ../pom.xml