问题描述
提交给minicube创建的kubernetes集群中的spark集群的spark作业后的输出:
Output after spark job submitted to spark cluster in kubernetes cluster created by minicube:
----------------- RUNNING ----------------------
[Stage 0:> (0 + 0) / 2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to
xxxxxxxxxxxxxxxxxxxxx
[Stage 0:> (0 + 0) / 2]
spark web ui的信息:
Information from spark web ui:
org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625)myfile.run(myfile.scala:49)Myjob $ .main(Myjob.scala:100)Myjob.main(Myjob.scala)sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)java.lang.reflect.Method.invoke(Method.java:498)org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:743)org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:187)org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212)org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126)org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625) myfile.run(myfile.scala:49) Myjob$.main(Myjob.scala:100) Myjob.main(Myjob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我的代码:
println("----------------- RUNNING ----------------------");
eventsStream.foreachRDD { rdd =>
println("xxxxxxxxxxxxxxxxxxxxx")
//println(rdd.count());
if( !rdd.isEmpty )
{
println("yyyyyyyyyyyyyyyyyyyyyyy")
val df = sqlContext.read.json(rdd);
df.registerTempTable("data");
val rules = rulesSource.rules();
var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD;
rules.foreach { rule =>
...
}
sqlContext.dropTempTable("data")
}
else
{
println("-------");
println("NO DATA");
println("-------");
}
}
有什么主意吗?谢谢
更新
我的spark作业在独立spark的docker容器中运行良好.但是如果提交给kubernetes集群中的spark集群,它会卡在kafka流中.不知道为什么吗?
My spark job runs well in docker container of standalone spark. but if submitted to spark cluster in kubernetes cluster, it is stuck in kafka streaming. No idea why?
spark master的yaml文件来自 https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml
The yaml file for spark master is from https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
name: spark-master
name: spark-master
spec:
replicas: 1
template:
metadata:
labels:
name: spark-master
spec:
containers:
- name : spark-master
image: spark-2.1.0-bin-hadoop2.6
imagePullPolicy: "IfNotPresent"
name: spark-master
ports:
- containerPort: 7077
protocol: TCP
command:
- "/bin/bash"
- "-c"
- "--"
args :
- './start-master.sh ; sleep infinity'
推荐答案
日志将有助于诊断问题.
Logs will be helpful to diagnose the issue.
基本上,您不能在RDD操作中创建另一个RDD.即 rdd1.map {rdd2.count()}
无效
essentially you can't create another RDD with in the RDD operation.i.e. rdd1.map{rdd2.count()}
is not valid
查看在隐式sqlContext
导入后RDD如何转换为数据帧.
See how the RDD is converted to dataframe after the implicit sqlContext
import.
import sqlContext.implicits._
eventsStream.foreachRDD { rdd =>
println("yyyyyyyyyyyyyyyyyyyyyyy")
val df = rdd.toDF();
df.registerTempTable("data");
.... //Your logic here.
sqlContext.dropTempTable("data")
}
这篇关于为什么我的火花工作卡在kafka流中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!