问题描述
spark job 提交到 minicube 创建的 kubernetes 集群中的 spark 集群后的输出:
Output after spark job submitted to spark cluster in kubernetes cluster created by minicube:
----------------- RUNNING ----------------------
[Stage 0:> (0 + 0) / 2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to
xxxxxxxxxxxxxxxxxxxxx
[Stage 0:> (0 + 0) / 2]
来自 spark web ui 的信息:
Information from spark web ui:
foreachRDD at myfile.scala:49 +details
org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625)myfile.run(myfile.scala:49) Myjob$.main(Myjob.scala:100)Myjob.main(Myjob.scala)sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)java.lang.reflect.Method.invoke(Method.java:498)org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625) myfile.run(myfile.scala:49) Myjob$.main(Myjob.scala:100) Myjob.main(Myjob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我的代码:
println("----------------- RUNNING ----------------------");
eventsStream.foreachRDD { rdd =>
println("xxxxxxxxxxxxxxxxxxxxx")
//println(rdd.count());
if( !rdd.isEmpty )
{
println("yyyyyyyyyyyyyyyyyyyyyyy")
val df = sqlContext.read.json(rdd);
df.registerTempTable("data");
val rules = rulesSource.rules();
var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD;
rules.foreach { rule =>
...
}
sqlContext.dropTempTable("data")
}
else
{
println("-------");
println("NO DATA");
println("-------");
}
}
有什么想法吗?谢谢
更新
我的 spark 作业在独立 spark 的 docker 容器中运行良好.但是如果提交到kubernetes集群中的spark集群,就会卡在kafka流中.不知道为什么?
My spark job runs well in docker container of standalone spark. but if submitted to spark cluster in kubernetes cluster, it is stuck in kafka streaming. No idea why?
spark master 的 yaml 文件来自 https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml
The yaml file for spark master is from https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
name: spark-master
name: spark-master
spec:
replicas: 1
template:
metadata:
labels:
name: spark-master
spec:
containers:
- name : spark-master
image: spark-2.1.0-bin-hadoop2.6
imagePullPolicy: "IfNotPresent"
name: spark-master
ports:
- containerPort: 7077
protocol: TCP
command:
- "/bin/bash"
- "-c"
- "--"
args :
- './start-master.sh ; sleep infinity'
推荐答案
日志将有助于诊断问题.
Logs will be helpful to diagnose the issue.
基本上你不能在 RDD 操作中创建另一个 RDD.即 rdd1.map{rdd2.count()}
无效
essentially you can't create another RDD with in the RDD operation.i.e. rdd1.map{rdd2.count()}
is not valid
查看在隐式sqlContext
导入后如何将RDD转换为数据帧.
See how the RDD is converted to dataframe after the implicit sqlContext
import.
import sqlContext.implicits._
eventsStream.foreachRDD { rdd =>
println("yyyyyyyyyyyyyyyyyyyyyyy")
val df = rdd.toDF();
df.registerTempTable("data");
.... //Your logic here.
sqlContext.dropTempTable("data")
}
这篇关于为什么我的 Spark 工作卡在 kafka 流中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!