本文介绍了无法通过Spark Scala程序对Cassandra集群进行身份验证的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请建议我解决以下问题,或者建议我采用任何其他方法来实现我的问题陈述。
我每天从某处获取数据并将其插入cassandra中,然后需要整周从cassandra中检索数据并进行一些处理,然后将结果插入cassandra中。

Please suggest me to solve the below issue, or suggest me any different approach to achieve my problem statement.I am getting data from somewhere and inserting it into cassandra daily basis then I need to retrieve the data from cassandra for whole week and do some processing and insert result back onto cassandra.

i有很多记录,每个记录执行以下大多数操作。根据我之前的帖子建议,为避免重新准备Prepared语句,试图保留查询字符串与

i have lot of records, each record executing most of the below operations. According to my previous post Repreparing preparedstatement warning suggestion, to avoid repreparing the prepared statement,tried to keep a map of query string vs prepared statements.

我尝试按照spark scala程序编写,我从cqlsh验证了cassandra主机的详细信息,我能够连接到它。但是通过程序,当我尝试时,出现错误。

I tried writing following spark scala program ,i validated the cassandra host details from cqlsh, i am able to connect to it. But through program when i try, i am getting error.

class StatementCache {
  val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

  val session = acluster.connect("keyspacename");

      val statementCache: ConcurrentHashMap[String,PreparedStatement] = new ConcurrentHashMap


      def getStatement(cql : String): BoundStatement = {
    var ps : PreparedStatement = statementCache.get(cql);
     if (ps == null) {
                ps = session.prepare(cql);
                statementCache.put(cql, ps);
            }
            return ps.bind();
        }
    }


object CassandraUtils {
  println("##########entered cassandrutils")
   //val st=new STMT();
 private val psCache  : StatementCache = new StatementCache();
 val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
  val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"

  val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
  val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and  summ_dt= ? and t_summ_id=?; "

  def insert(session: Session, data: TripHistoryData, batch: BatchStatement) {
   batch.add(psCache.getStatement(insertQuery));
  }

  def update(session: Session, data: TripHistoryData, batch: BatchStatement) {
    batch.add(psCache.getStatement(updateQuery));
    }

     def initialize(clusterNodes: String, uid: String, pwd: String, port: Int, racdc:String): Cluster = {

    val builder = Cluster.builder().addContactPoints(InetAddress.getByName(clusterNodes))
      .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(
          DCAwareRoundRobinPolicy.builder() //You can directly use the DCaware without TokenAware as well
            .withLocalDc(racdc) //This is case sensitive as defined in rac-dc properties file
            //.withUsedHostsPerRemoteDc(2) //Try at most 2 remote DC nodes in case all local nodes are dead in the current DC
            //.allowRemoteDCsForLocalConsistencyLevel()
            .build()))

    if (StringUtils.isNotEmpty(uid)) {
      builder.withCredentials(uid, pwd)
    }

    val cluster: Cluster = builder.build()
    cluster
  }
}

-----------------------------------------------------------------------------------------------------------------

我遇到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ExceptionInInitializerError
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:91)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:45)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host hostname1: Host hostname1 requires authentication, but no authenticator found in Cluster configuration
    at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:40)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:261)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:243)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutorService.execute(MoreExecutors.java:299)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1.run(Futures.java:632)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185)
    at com.datastax.driver.core.Connection$Future.onSet(Connection.java:1288)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1070)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:993)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    ... 1 more


推荐答案

您的问题是您尝试执行连接的手动管理-这不适用于Spark-集群 / <$应该将c $ c> Session 实例发送给执行者,但由于这些实例是在驱动程序中创建的,因此无法正常执行。当然,您可以使用执行 foreachPartition 等的典型模式,如。

Your problem is that you're trying to perform "manual" management of the connection - this doesn't work with Spark - the Cluster/Session instances should be sent to the executors, but it won't perform correctly, as these instances were created in driver. You can of course use the "typical" pattern of doing foreachPartition, etc., like described in this question.

与Cassandra一起工作的最佳方法来自Spark的是使用-它会自动在节点之间分散负载,并且执行正确的插入&数据更新。在这种情况下,您将配置连接参数,包括通过Spark属性进行身份验证( spark.cassandra.auth.username & spark.cassandra.auth.password )。有关连接的更多信息,请参见文档中的

The best way to work with Cassandra from Spark is to use Cassandra Spark Connector - it will automatically spread load between nodes, and perform correct insert & update of the data. In this case you configure connection parameters, including the authentication via Spark properties (spark.cassandra.auth.username & spark.cassandra.auth.password). More information about connecting is in the documentation.

这篇关于无法通过Spark Scala程序对Cassandra集群进行身份验证的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 01:57
查看更多