我正在尝试通过尽可能避免改组来优化我的火花作业。

我正在使用cassandraTable创建RDD。

列族的列名是动态的,因此定义如下:

CREATE TABLE "Profile" (
  key text,
  column1 text,
  value blob,
  PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
  bloom_filter_fp_chance=0.010000 AND
  caching='ALL' AND
  ...


此定义导致采用以下格式的CassandraRow RDD元素:

CassandraRow <key, column1, value>



键-RowKey
column1-column1的值是动态列的名称
value-动态列的值


因此,如果我的RK ='profile1',列名='George',age = '34',则最终的RDD为:

CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>


然后,我需要将共享同一密钥的元素组合在一起,以获得PairRdd:

PairRdd<String, Iterable<CassandraRow>>


重要的是,我需要分组的所有元素都在同一个Cassandra节点中(共享相同的行键),因此我希望连接器保持数据的局部性。

问题是使用groupBy或groupByKey会导致改组。我宁愿将它们本地分组,因为所有数据都在同一节点上:

JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
        .cassandraTable(ks, "Profile")
        .groupBy(new Function<ColumnFamilyModel, String>() {
            @Override
            public String call(ColumnFamilyModel arg0) throws Exception {
                return arg0.getKey();
            }
        })


我的问题是:


在RDD上使用keyBy是否会引起改组,还是将数据保留在本地?
有没有一种方法可以通过键对元素进行分组而不进行改组?我读了有关mapPartitions的文章,但不太了解它的用法。


谢谢,

hai

最佳答案

我认为您正在寻找spanByKey,这是一种Cassandra连接器特定的操作,该操作利用了cassandra提供的顺序来允许元素分组,而不会导致洗牌阶段。

在您的情况下,应如下所示:

sc.cassandraTable("keyspace", "Profile")
  .keyBy(row => (row.getString("key")))
  .spanByKey


在文档中阅读更多内容:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

08-28 14:44