我正在尝试使用以下方法对巨大的Cassandra表的一小部分进行过滤:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")

我想将cassandra表中的行映射到分区键的一部分“创建”列中。

我的表键(表的分区键)定义为:
case class TableKey(imei: String, created: Long, when: Long)

结果是一个错误:



Documentation一样,它只对分区键中的一个对象起作用。

为什么多个分区键有问题?-回答。

编辑:我试图以正确的形式使用joinWithCassandraTable:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")

当我尝试在Spark上运行它时,没有任何错误,但是它永远卡在了[[stage 0:>(0 + 2)/2]]上。

怎么了?

最佳答案

该错误告诉您TableKey类需要3个组件来初始化,但仅传递了一个参数。这是Scala编译错误,与C *或Spark不相关。

 val snapshotsFiltered = sc.parallelize(startDate to endDate)
   .map(TableKey(_2))  /// Table Key does not have a single element constructor so this will fail
   .joinWithCassandraTable("listener","snapshots_test_b")

通常,C *使用整个partition key来确定特定行的位置。因此,只有在知道整个partition key的情况下,您才可以有效地提取数据,因此仅传递其中的一部分就没有任何值(value)。

joinWithCassandraTable需要完整的partition key值,因此它可以有效地完成工作。如果您只有parition key的一部分,则将需要执行全表扫描并使用Spark进行过滤。

如果只想基于clustering column进行过滤,则可以通过将where子句下推到C *来实现,例如
sc.cassandraTable("ks","test").where("clustering_key > someValue")

09-27 21:38