Skip to content

Commit

Permalink
Merge pull request #29 from markhamstra/1.1-byteswapPartitioner
Browse files Browse the repository at this point in the history
REL-505 Add new ByteswapPartitioner
  • Loading branch information
vnivargi committed Jan 12, 2015
2 parents 581792c + 8e01a62 commit cec04c0
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 3 deletions.
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,23 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val shortPartitionerNames = Map(
"hash" -> "org.apache.spark.HashPartitioner",
"byteswap" -> "org.apache.spark.ByteswapPartitioner"
)
val defaultPartitionerName = rdd.conf.get("spark.default.partitioner", "hash")
val className =
shortPartitionerNames.getOrElse(defaultPartitionerName.toLowerCase, defaultPartitionerName)
val ctor = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
.getConstructor(classOf[Int])
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
ctor.newInstance(rdd.context.defaultParallelism: java.lang.Integer).asInstanceOf[Partitioner]
} else {
new HashPartitioner(bySize.head.partitions.size)
ctor.newInstance(bySize.head.partitions.size: java.lang.Integer).asInstanceOf[Partitioner]
}
}
}
Expand Down Expand Up @@ -93,6 +102,18 @@ class HashPartitioner(partitions: Int) extends Partitioner {
override def hashCode: Int = numPartitions
}

/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`. In order to spread-out hashCodes that are divisible by
* `numPartitions`, `byteswap32` is applied to the hashCodes before modding by `numPartitions`.
*/
class ByteswapPartitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(byteswap32(key.hashCode), numPartitions)
}
}

/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ import scala.util.Try

class PipedRDDSuite extends FunSuite with SharedSparkContext {

override def beforeAll() {
System.setProperty("spark.default.partitioner", "hash")
super.beforeAll()
}

override def afterAll() {
System.clearProperty("spark.default.partitioner")
super.afterAll()
}

test("basic pipe") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,17 @@ Apart from these, the following properties are also available, and may be useful
(<code>groupByKey</code>, <code>reduceByKey</code>, etc) when not set by user.
</td>
</tr>
<tr>
<td><code>spark.default.partitioner</code></td>
<td>hash</td>
<td>
Implementation to use for partitioning key-value data pairs. There are two implementations
available:<code>hash</code> and <code>byteswap</code>. Both are based on the <code>hashCode</code> of
the keys <code>mod</code> the number of partitions, but the <code>byteswap</code> partitioner also
applies <code>byteswap32</code> to the hash codes, which helps guarantee that all partitions are used
even when the hash codes are divisible by a factor of the number of partitions.
</td>
</tr>
<tr>
<td><code>spark.broadcast.factory</code></td>
<td>org.apache.spark.broadcast.<br />TorrentBroadcastFactory</td>
Expand Down
2 changes: 1 addition & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.1--csd-1-SNAPSHOT</version>
<version>1.1.1-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down

0 comments on commit cec04c0

Please sign in to comment.