本文介绍了在Jupyter Notebook中运行Pyspark和Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以运行示例在终端中.我的终端命令是:

I could run this example in the terminal. My terminal command is:

bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 examples/src/main/python/sql/streaming/structured_kafka_wordcount.py localhost:9092 subscribe test

现在,我想在Juypter python笔记本中运行它.我尝试遵循(我可以在链接).但就我而言,它失败了.以下是我的代码:

Now I wants to run it in Juypter python notebook. I tried to follow this (I could run the code in the link). But in my case, it failed. The following is my code:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell"

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

bootstrapServers = "localhost:9092"
subscribeType = "subscribe"
topics = "test"

spark = SparkSession\
    .builder\
    .appName("StructuredKafkaWordCount")\
    .getOrCreate()

# Create DataSet representing the stream of input lines from kafka
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .load()\
    .selectExpr("CAST(value AS STRING)")

# Split the lines into words
words = lines.select(
    # explode turns each item in an array into a separate row
    explode(
        split(lines.value, ' ')
    ).alias('word')
)

# Generate running word count
wordCounts = words.groupBy('word').count()

# Start running the query that prints the running counts to the console
query = wordCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .start()

query.awaitTermination()

错误消息是:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-0344129c7d54> in <module>()
     14
     15 # Create DataSet representing the stream of input lines from kafka
---> 16 lines = spark    .readStream    .format("kafka")    .option("kafka.bootstrap.servers", bootstrapServers)    .option(subscribeType, topics)    .load()    .selectExpr("CAST(value AS STRING)")
     ...

Py4JJavaError: An error occurred while calling o31.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
    at java.base/java.lang.ClassLoader.defineClass1(Native Method)
    ...

有什么想法吗?谢谢!

我试图按照答案进行操作,但仍然出现错误.以下是我的过程.我搜索了两个kernel.json,它们是

I tried to follow the answer but still got error. The following is my procedure. I searched that there are two kernel.json, they are

~/anaconda3/pkgs/ipykernel-4.6.1-py36h3208c25_0/share/jupyter/kernels/python3/kernel.json
~/anaconda3/share/jupyter/kernels/python3/kernel.json

然后我用以下内容更新了它们:

Then I updated them all with the following content:

{
    "display_name": "PySpark",
    "language": "python",
    "argv": [ "</usr>/anaconda3/bin/python", "-m", "ipykernel", "-f", "  {connection_file}" ],
    "env": {
        "SPARK_HOME": "</usr>/projects/spark-2.3.0",
        "PYSPARK_PYTHON": "</usr>/anaconda3/bin/python",
        "PYTHONPATH": "</usr>/projects/spark-2.3.0/spark/python/:</usr>/projects/spark-2.3.0/spark/python/lib/py4j-0.10.6-src.zip",
        "PYTHONSTARTUP": "</usr>/projects/spark-2.3.0/python/pyspark/shell.py",
        "PYSPARK_SUBMIT_ARGS":  "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell"
    }
}

然后我得到如下错误:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/<usr>/projects/spark-2.3.0/assembly/target/scala-2.11/jars/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil

推荐答案

正如@ user6910411所说,PYSPARK_SUBMIT_ARGS只能在实例化sparkContext之前起作用.

As @user6910411 said PYSPARK_SUBMIT_ARGS can only work before the instantiation of your sparkContext.

在您遵循的示例中,他们可能将python内核用于其jupyter笔记本,并使用pyspark库实例化spark上下文.

In the example you followed, they probably use a python Kernel for their jupyter notebook and they instantiate a spark context using the pyspark library.

我猜您正在使用pyspark内核,因此:

I'm guessing you're using a pyspark kernel, hence:

spark = SparkSession\
    .builder\
    .appName("StructuredKafkaWordCount")\
    .getOrCreate()

不会启动sparkSession,而只会获取已经存在的sparkSession.

won't start a sparkSession but only fetch the already existing one.

您可以在kernel.json文件中将参数传递给jupyter运行的spark-submit,以便每次运行新笔记本时都加载库:

You can pass arguments to the spark-submit ran by jupyter in your kernel.json file so the libraries get loaded every time you run a new notebook:

{
    "display_name": "PySpark",
    "language": "python",
    "argv": [ "/opt/anaconda3/bin/python", "-m", "ipykernel", "-f", "  {connection_file}" ],
    "env": {
        "SPARK_HOME": "/usr/iop/current/spark-client",
        "PYSPARK_PYTHON": "/opt/anaconda3/bin/python3",
        "PYTHONPATH": "/usr/iop/current/spark-client/python/:/usr/iop/current/spark-client/python/lib/py4j-0.9-src.zip",
        "PYTHONSTARTUP": "/usr/iop/current/spark-client/python/pyspark/shell.py",
        "PYSPARK_SUBMIT_ARGS":  "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell"
  }
}

这篇关于在Jupyter Notebook中运行Pyspark和Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 22:50