I have a spark-streaming app which looks like this:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
i =>{
row =>{
And, I run it on a yarn cluster using
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
当我尝试登录 kafkaDF.rdd.partitions.size
,结果原来是'1'或'5'居多。我很困惑,是不是可以控制我的数据框的分区的数量? KafkaUtils.createStream
似乎并不接受有关我想要的RDD分区数量的任何参数。我试图 kafkaDF.rdd.repartition(INT)
When I try to log kafkaDF.rdd.partitions.size
, the result turns out be '1' or '5' mostly. I am confused, is it possible to control the number of partitions of my DataFrame? KafkaUtils.createStream
doesn't seem to accept any parameters related to the number of partitions I want for the rdd. I tried kafkaDF.rdd.repartition( int )
, but it doesn't seem to work either.
如何我在code ++实现更多的并行?如果我的做法是错的,什么是实现这一目标的正确方法是什么?
How can I achieve more parallelism in my code? If my approach is wrong, what is the correct way to achieve it?
In Spark Streaming, parallelism can be achieved in two areas: (a) the consumers/receivers (in your case the Kafka consumers), and (b) the processing (done by Spark).
By default, spark streaming will assign one core (aka Thread) to each consumer. So if you need more data to be ingested you need to create more consumers. Each consumer will create a DStream. You can then union the DStreams to get one large stream.
// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B
val combineStream = messageStream1.union(messageStream2)
Alternatively, the number of receivers/consumers can be increased by repartitioning the input stream:
inputStream.repartition(<number of partitions>))
All the remaining cores available to the streaming app will be assigned to Spark.
所以,如果你有 N
内核(通过定义的 spark.cores.max
),你有<$你留下了 NC
内核可ç$ C> C 消费者的火花。
So if you have N
cores (defined via the spark.cores.max
) and you have C
consumers you are left with N-C
cores available for Spark.
#Partitions =~ #Consumers x (batch duration / block interval)
块间隔 =消费者等待多久它推动它作为一个火花块(定义为配置创建的数据之前 spark.streaming.blockInterval
block interval = how long a consumer waits before it pushes the data it created as a spark block (defined as configuration spark.streaming.blockInterval
Always keep in mind that Spark Streaming has two functions that constantly take place. A set of threads that read the current micro-batch (consumers), and a set of threads that process the previous micro-batch (Spark).
