我正在尝试使用以下方法对巨大的Cassandra表的一小部分进行过滤:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
我想将cassandra表中的行映射到分区键的一部分“创建”列中。
我的表键(表的分区键)定义为:
case class TableKey(imei: String, created: Long, when: Long)
结果是一个错误:
像Documentation一样,它只对分区键中的一个对象起作用。
为什么多个分区键有问题?-回答。
编辑:我试图以正确的形式使用joinWithCassandraTable:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
当我尝试在Spark上运行它时,没有任何错误,但是它永远卡在了[[stage 0:>(0 + 2)/2]]上。
怎么了?
最佳答案
该错误告诉您TableKey
类需要3个组件来初始化,但仅传递了一个参数。这是Scala编译错误,与C *或Spark不相关。
val snapshotsFiltered = sc.parallelize(startDate to endDate)
.map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail
.joinWithCassandraTable("listener","snapshots_test_b")
通常,C *使用整个
partition key
来确定特定行的位置。因此,只有在知道整个partition key
的情况下,您才可以有效地提取数据,因此仅传递其中的一部分就没有任何值(value)。joinWithCassandraTable需要完整的
partition key
值,因此它可以有效地完成工作。如果您只有parition key
的一部分,则将需要执行全表扫描并使用Spark进行过滤。如果只想基于
clustering column
进行过滤,则可以通过将where
子句下推到C *来实现,例如sc.cassandraTable("ks","test").where("clustering_key > someValue")