本文介绍了如何在Spark中按分区对键/值进行分组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark Streaming应用程序,该应用程序每秒接收几条JSON消息,每条消息都有一个标识其来源的ID.

I have a Spark Streaming application which receives several JSON messages per second, each of these having an ID which identifies their source.

使用此ID作为键,我能够执行MapPartitionsToPair,从而创建一个JavaPairDStream,具有键/值对的RDD,每个分区一个键值对(因此,例如,如果我收到5条JSON消息, ,我得到了一个具有5个分区的RDD,每个分区以消息的ID作为键,而JSON消息本身作为值).

Using this ID as a key, I am able to perform a MapPartitionsToPair, thus creating a JavaPairDStream, with an RDD of key/value pairs, one key value pair per partition (so if I received 5 JSON messages for example, I get an RDD with 5 partitions, each with the ID of the message as a key, and the JSON message itself as the value).

我现在想做的是将所有具有相同键的值分组到同一分区中.因此,例如,如果我有3个具有键"a"的分区和2个具有键"b"的分区,那么我想创建一个具有2个而不是5个分区的新RDD,每个分区都包含一个键具有的所有值,一个用于'a'和'b'.

What I want to do now, is I want to group all values that have the same key into the same partition. So for example, if I have 3 partitions with key 'a' and 2 partitions with key 'b', I want to create a new RDD with 2 partitions instead of 5, each partition containing all the values that one key has, one for'a' and one for 'b'.

我该怎么做?到目前为止,这是我的代码:

How can I accomplish this?This is my code so far:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
            StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
        @Override
        public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {

            ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();

            while (stringIterator.hasNext()){
                String c=stringIterator.next();
                if(c==null){
                    return null;

                }

                JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
                String key= retMap.getSid();
                Tuple2<String,String> b= new Tuple2<String,String>(key,c);
                a.add(b);

                System.out.print(b._1+"_"+b._2);
                // }
                //break;
            }


            return a;
        }
    });

///我创建一个JavaPairDStream,其中每个分区都包含一个键/值对.

//I create a JavaPairDStream in which each partition contains one key/value pair.

我尝试使用grouByKey(),但是无论消息数量是多少,我总是得到2的分区号.

I tried to use grouByKey(), but no matter what the number of messages were, I always got a partition number of 2.

我应该怎么做?非常感谢.

How should I do this?Thank you so much.

推荐答案

您可以使用

groupByKey(Integer numPartitions)

并将numPartitions设置为等于您拥有的不同键的数量.

and set the numPartitions equal to the number of distinct keys you have.

但是..您需要先了解 您有多少个不同的键 .你有那个信息吗?可能不是.因此,..您将需要做一些额外的(/多余的)工作.例如.使用

But .. you will need to know how many distinct keys do you have up front. Do you have that information? Probably not. So then .. you would need to do some extra (/redundant) work. E.g. use

countByKey

作为第一步.这比groupByKey快-因此,至少您没有将总处理时间翻倍.

as the first step. That is faster than groupByKey - so at least you were not doubling the total processing time.

更新.OP询问了为什么默认情况下它们会获得2个分区.

Update The OP asked about why they are getting 2 partitions by default.

默认groupByKey使用defaultPartitioner()方法

groupByKey(defaultPartitioner(self))

  • 从基数最大的父分区中选择Partitioner.
    • which selects the Partitioner from the parent partition with the largest cardinality.
    • -否则它将使用spark.default.parallelism

      这篇关于如何在Spark中按分区对键/值进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 02:57