我正在创建一个Spark结构化流应用程序,它将每10秒计算一次从Kafka接收的数据。

为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息。

如果我们对数据库表进行了一些更改,我会全神贯注于如何在整个群集中保持Cassandra数据的可用性,以及不时更新数据的方式。

目前,我正在使用Datastax Spark-Cassandra-connector在本地启动Spark时立即查询数据库

val cassandraSensorDf = spark
  .read
  .cassandraFormat("specifications", "sensors")
  .load

从这里开始,我可以将此cassandraSensorDs与我的结构化流数据集结合使用。
.join(
   cassandraSensorDs ,
   sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)

在运行结构化流时,如何执行其他查询来更新此Cassandra数据?
以及如何使查询的数据在群集设置中可用?

最佳答案

使用广播变量,您可以编写包装器以定期从Cassandra获取数据并更新广播变量。使用广播变量在流上进行 map 侧联接。我还没有测试过这种方法,我认为根据您的用例(吞吐量),这可能也是一个过大的杀伤力。

How can I update a broadcast variable in spark streaming?

另一种方法是查询流中每个项目的Cassandra,以优化连接,您应确保使用连接池并仅为JVM/分区创建一个连接。这种方法更简单,您不必担心定期预热Cassandra数据。

spark-streaming and connection pool implementation

10-06 13:50
查看更多