本文介绍了Cloudant数据库未使用Spark python连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是Spark版本2.0.1,并尝试使用Python代码连接cloudant数据库,但与此同时出现错误。

I am using Spark version 2.0.1 and trying to connect cloudant database using Python code but same time I am getting an error.

负载处抛出错误(cloudant_credentials ['db_name'])所以我缺少要导入的任何库吗?

Error is throwing at "load(cloudant_credentials['db_name'])" so is there any library I am missing to import?

我确定我使用了正确的Cloudant凭证。

I am sure that I am using correct credentials of Cloudant.

我尝试使用Java代码但遇到相同的错误。

I tried using Java code but getting same error.

这是我的Python代码,

Here is my Python code,

import pandas
import pyspark
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

#Needs to be created once.
sc = SparkContext("local[4]","demo")
sqlContext = SQLContext(sc)
print(sc.version) //2.0.1

tic = timeit.default_timer()
candidate_data = sqlContext.read.format("com.cloudant.spark").\
option("cloudant.host",cloudant_credentials['url']).\
option("cloudant.username",cloudant_credentials['username']).\
option("cloudant.password",cloudant_credentials['password']).\
load(cloudant_credentials['db_name'])
toc = timeit.default_timer()

我正在使用的依赖项,

<dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
            <version>2.0.0</version> </dependency> -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.10</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>sample</groupId>
            <artifactId>com.sample</artifactId>
            <version>1.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/src/resource/spark-cloudant-2.0.0-s_2.11.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>com.cloudant</groupId>
            <artifactId>cloudant-client</artifactId>
            <version>2.0.0</version>
        </dependency>
        <!-- <dependency> <groupId>com.cloudant</groupId> <artifactId>cloudant-client</artifactId>
            <version>2.6.2</version> </dependency> -->
        <!-- <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId>
            <version>1.2.1</version> </dependency> -->
        <dependency>
            <groupId>com.typesafe.play</groupId>
            <artifactId>play_2.11</artifactId>
            <version>2.5.10</version>
        </dependency>
        <dependency>
            <groupId>org.scalaj</groupId>
            <artifactId>scalaj-http_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

在我得到以下错误之后,

Below error I am getting,

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-12-44e0613fa6f4> in <module>()
      6 print(cloudant_credentials['db_name'])
      7
----> 8 candidate_data = sqlContext.read.format("com.cloudant.spark").option("cloudant.host",cloudant_credentials['url']).option("cloudant.username",cloudant_credentials['username']).option("cloudant.password",cloudant_credentials['password']).load(cloudant_credentials['db_name'])
      9
     10 toc = timeit.default_timer()

/home/spark/spark/python/pyspark/sql/readwriter.pyc in load(self, path, format, schema, **options)
    145         self.options(**options)
    146         if isinstance(path, basestring):
--> 147             return self._df(self._jreader.load(path))
    148         elif path is not None:
    149             if type(path) != list:

/home/spark/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134
   1135         for temp_arg in temp_args:

/home/spark/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/home/spark/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o111.load.
: java.lang.NoSuchMethodError: org.apache.spark.SparkEnv.actorSystem()Lakka/actor/ActorSystem;
    at com.cloudant.spark.DefaultSource.<init>(DefaultSource.scala:104)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)


推荐答案

尝试在spark-submit中使用cloudant软件包:

Try using cloudant package in spark-submit: https://spark-packages.org/package/cloudant-labs/spark-cloudant

使用以下程序将此包包含在Spark应用程序中:
spark-shell,pyspark或spark-submit

$SPARK_HOME/bin/spark-shell --packages cloudant-labs:spark-cloudant:2.0.0-s_2.11






注意:您还可以使用以下格式:

在此处查看示例:

这篇关于Cloudant数据库未使用Spark python连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 03:43