Spark连接器和过滤数据

Spark连接器和过滤数据

本文介绍了Cassandra Spark连接器和过滤数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark 1.3.1,我编写了一个小程序来过滤cassandra上的数据

I am using Spark 1.3.1 and I have written a small program to filter data on cassandra

val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()

该程序运行很长时间,打印出

This program runs for a very long time, printing messages like

16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)

如果我终止了程序并将我的代码更改为

If I terminate the program and change my code to

val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))

它仍然运行很长时间,并带有诸如

It still runs for a very long time with messages like

6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)

因此,似乎该程序将始终尝试将整个cassandra表加载到内存中(或尝试对其进行完全扫描),然后才应用过滤器。对我来说,这似乎效率极低。

So it seems like the program will always try to load the entire cassandra table in memory (or try to scan it completely) and only then apply the filter. Which seems extremely inefficient to me.

我该如何以更好的方式编写此代码,以使spark不会尝试加载整个cassandra表(或对其进行完全扫描) )放入RDD,然后才应用过滤器?

How can I write this code in a better way so that spark doesn't try to load the entire cassandra table (or scan it completely) into an RDD and only then apply filter?

推荐答案

您的第一段代码

val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusDays(30)
rdd.filter(r => r.getDate("date").after(date.toDate)).count // Count Filtered RDD

所以要小心。 RDD是不可变的,因此当您应用过滤器时,您需要使用返回的RDD,而不是应用该函数的RDD。

So be careful. RDDs are immutable so when you apply a filter you need to use the returned RDD and not the one you applied the function to.

val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusDays(30)
rdd.filter(r => r.getDate("date").after(date.toDate)) // Filters RDD
println(rdd.cassandraCount()) // Ignores filtered rdd and counts everything

为了更有效地阅读Cassandra:

For more efficency on reading from Cassandra:

如果您的约会column是一个聚簇键,您可以使用 .where 函数将谓词下推到Cassandra。除此之外,您无法做很多事情来修剪数据服务器端。

If your date column is a clustering key you can use the .where function to push down the predicate to Cassandra. Other than that there isn't much you can do to prune data server side.

这篇关于Cassandra Spark连接器和过滤数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:36