本文介绍了用于来自 Kafka 主题的 PySpark 结构化流的 Cassandra Sink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 PySpark Structured Streaming API 将结构流数据写入 Cassandra.

I want to write Structure Streaming Data into Cassandra using PySpark Structured Streaming API.

我的数据流如下:

REST API -> Kafka -> Spark 结构化流 (PySpark) -> Cassandra

REST API -> Kafka -> Spark Structured Streaming (PySpark) -> Cassandra

来源和版本如下:星火版本:2.4.3DataStax DSE:6.7.6-1

Source and Version in below:Spark version: 2.4.3DataStax DSE: 6.7.6-1

初始化火花:

spark = SparkSession.builder\
.master("local[*]")\
.appName("Analytics")\
.config("kafka.bootstrap.servers", "localhost:9092")\
.config("spark.cassandra.connection.host","localhost:9042")\
.getOrCreate()

从 Kafka 订阅主题:

subscribe topic from Kafka:

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "topic") \
    .load()

写入卡桑德拉:

    w_df_3 = df...

    write_db = w_df_3.writeStream \
    .option("checkpointLocation", '/tmp/check_point/') \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "analytics") \
    .option("table", "table") \
    .outputMode(outputMode="update")\
    .start()

使用以下命令执行:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,datastax:spark-cassandra-connector:2.4.0-s_2.11 Analytics.py localhost:9092 topic

我在写入流到 Cassandra 时遇到以下问题/异常:

I am facing below issue/exception while writestream into Cassandra:

py4j.protocol.Py4JJavaError: An error occurred while calling o81.start.
: java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:322)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

有人可以帮助我解决并进一步处理吗?任何帮助将不胜感激.

Could anyone help me out on how to resolve and proceed further? Any help will be appreciated.

提前致谢.

推荐答案

正如我在评论中提到的,如果您使用的是 DSE,您可以使用 OSS Apache Spark 和所谓的 BYOS(自带火花) - 包含 DataStax 版本的特殊 jar包含对结构化流媒体的直接支持的 Spark Cassandra 连接器 (SCC).

As i mentioned in the comment, if you're using DSE, you can use OSS Apache Spark with so-called BYOS (bring your own spark) - special jar that contains the DataStax's version of Spark Cassandra Connector (SCC) that contains direct support for structured streaming.

由于 SCC 2.5.0 对结构化流的支持也在开源版本中可用,因此您可以简单地使用 writeStream 和 Cassandra 的格式.2.5.0还包含了很多以前开源没有的好东西,比如额外的优化等等. 还有一个博文 详细描述了它们.

Since SCC 2.5.0 support for structured streaming is also available in open source version, so you can simply use writeStream with format for Cassandra. 2.5.0 also contains a lot of good things previously not available in the open source, such as additional optimizations, etc. There is a blog post that describes them in great details.

这篇关于用于来自 Kafka 主题的 PySpark 结构化流的 Cassandra Sink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 11:28