本文介绍了为什么我的火花工作卡在kafka流中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

提交给minicube创建的kubernetes集群中的spark集群的spark作业后的输出:

Output after spark job submitted to spark cluster in kubernetes cluster created by minicube:

----------------- RUNNING ----------------------
[Stage 0:>                                                          (0 + 0) / 2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to
xxxxxxxxxxxxxxxxxxxxx
[Stage 0:>                                                          (0 + 0) / 2]

spark web ui的信息:

Information from spark web ui:

org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625)myfile.run(myfile.scala:49)Myjob $ .main(Myjob.scala:100)Myjob.main(Myjob.scala)sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)java.lang.reflect.Method.invoke(Method.java:498)org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:743)org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:187)org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212)org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126)org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625) myfile.run(myfile.scala:49) Myjob$.main(Myjob.scala:100) Myjob.main(Myjob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我的代码:

  println("----------------- RUNNING ----------------------");
    eventsStream.foreachRDD { rdd =>
        println("xxxxxxxxxxxxxxxxxxxxx")
        //println(rdd.count());
    if( !rdd.isEmpty )
    {
      println("yyyyyyyyyyyyyyyyyyyyyyy")
        val df = sqlContext.read.json(rdd);
        df.registerTempTable("data");

        val rules = rulesSource.rules();
        var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD;
        rules.foreach { rule =>
        ...
        }

        sqlContext.dropTempTable("data")
    }
    else
    {
        println("-------");
        println("NO DATA");
        println("-------");
    }
}

有什么主意吗?谢谢

更新

我的spark作业在独立spark的docker容器中运行良好.但是如果提交给kubernetes集群中的spark集群,它会卡在kafka流中.不知道为什么吗?

My spark job runs well in docker container of standalone spark. but if submitted to spark cluster in kubernetes cluster, it is stuck in kafka streaming. No idea why?

spark master的yaml文件来自 https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

The yaml file for spark master is from https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    name: spark-master
  name: spark-master
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: spark-master
    spec:
      containers:
      - name : spark-master
        image: spark-2.1.0-bin-hadoop2.6
        imagePullPolicy: "IfNotPresent"
        name: spark-master
        ports:
        - containerPort: 7077
          protocol: TCP
        command:
         - "/bin/bash"
         - "-c"
         - "--"
        args :
- './start-master.sh ; sleep infinity'

推荐答案

日志将有助于诊断问题.

Logs will be helpful to diagnose the issue.

基本上,您不能在RDD操作中创建另一个RDD.即 rdd1.map {rdd2.count()} 无效

essentially you can't create another RDD with in the RDD operation.i.e. rdd1.map{rdd2.count()} is not valid

查看在隐式sqlContext 导入后RDD如何转换为数据帧.

See how the RDD is converted to dataframe after the implicit sqlContext import.

        import sqlContext.implicits._
        eventsStream.foreachRDD { rdd =>

            println("yyyyyyyyyyyyyyyyyyyyyyy")

            val df = rdd.toDF();
            df.registerTempTable("data");
            .... //Your logic here.
            sqlContext.dropTempTable("data")
        }

这篇关于为什么我的火花工作卡在kafka流中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 10:19