Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REL-505 Add new ByteswapPartitioner #29

Merged
merged 3 commits into from
Jan 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,23 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val shortPartitionerNames = Map(
"hash" -> "org.apache.spark.HashPartitioner",
"byteswap" -> "org.apache.spark.ByteswapPartitioner"
)
val defaultPartitionerName = rdd.conf.get("spark.default.partitioner", "hash")
val className =
shortPartitionerNames.getOrElse(defaultPartitionerName.toLowerCase, defaultPartitionerName)
val ctor = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
.getConstructor(classOf[Int])
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
ctor.newInstance(rdd.context.defaultParallelism: java.lang.Integer).asInstanceOf[Partitioner]
} else {
new HashPartitioner(bySize.head.partitions.size)
ctor.newInstance(bySize.head.partitions.size: java.lang.Integer).asInstanceOf[Partitioner]
}
}
}
Expand Down Expand Up @@ -93,6 +102,18 @@ class HashPartitioner(partitions: Int) extends Partitioner {
override def hashCode: Int = numPartitions
}

/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`. In order to spread-out hashCodes that are divisible by
* `numPartitions`, `byteswap32` is applied to the hashCodes before modding by `numPartitions`.
*/
class ByteswapPartitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(byteswap32(key.hashCode), numPartitions)
}
}

/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ import scala.util.Try

class PipedRDDSuite extends FunSuite with SharedSparkContext {

override def beforeAll() {
System.setProperty("spark.default.partitioner", "hash")
super.beforeAll()
}

override def afterAll() {
System.clearProperty("spark.default.partitioner")
super.afterAll()
}

test("basic pipe") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,17 @@ Apart from these, the following properties are also available, and may be useful
(<code>groupByKey</code>, <code>reduceByKey</code>, etc) when not set by user.
</td>
</tr>
<tr>
<td><code>spark.default.partitioner</code></td>
<td>hash</td>
<td>
Implementation to use for partitioning key-value data pairs. There are two implementations
available:<code>hash</code> and <code>byteswap</code>. Both are based on the <code>hashCode</code> of
the keys <code>mod</code> the number of partitions, but the <code>byteswap</code> partitioner also
applies <code>byteswap32</code> to the hash codes, which helps guarantee that all partitions are used
even when the hash codes are divisible by a factor of the number of partitions.
</td>
</tr>
<tr>
<td><code>spark.broadcast.factory</code></td>
<td>org.apache.spark.broadcast.<br />TorrentBroadcastFactory</td>
Expand Down
2 changes: 1 addition & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.1--csd-1-SNAPSHOT</version>
<version>1.1.1-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down