问题描述
我想使用RDD对Tuple2<byte[], obj>
,但是具有相同内容的byte[]
被认为是不同的值,因为它们的参考值是不同的.
I would like to work with RDD pairs of Tuple2<byte[], obj>
, but byte[]
s with the same contents are considered as different values because their reference values are different.
我没有看到任何传入自定义比较器的内容.我可以使用显式字符集将byte[]
转换为String
,但是我想知道是否有更有效的方法.
I didn't see any to pass in a custom comparer. I could convert the byte[]
into a String
with an explicit charset, but I'm wondering if there's a more efficient way.
推荐答案
自定义比较器是不够的,因为Spark使用对象的hashCode
来组织分区中的键. (至少HashPartitioner会做到这一点,您可以提供一个可以处理数组的自定义分区程序)
Custom comparers are insufficient because Spark uses the hashCode
of the objects to organize keys in partitions. (At least the HashPartitioner will do that, you could provide a custom partitioner that can deal with arrays)
包装数组以提供正确的equals
和hashCode
应该可以解决此问题.一个轻量级的包装程序可以解决这个问题:
Wrapping the array to provide proper equals
and hashCode
should address the issue.A lightweight wrapper should do the trick:
class SerByteArr(val bytes: Array[Byte]) extends Serializable {
override val hashCode = bytes.deep.hashCode
override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}
快速测试:
import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.
// let's use the wrapper instead
val keyable = rdd.map(elem => new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100
这篇关于以字节数组为键的ReduceByKey的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!