到目前为止,我有一个JavaDStream,它首先看起来像这样:

Value
---------------------
a,apple,spain
b,orange,italy
c,apple,italy
a,apple,italy
a,orange,greece


首先,我将行拆分并映射到JavaPairDStream中的键值对:

JavaPairDStream<String, String> pairDStream = inputStream.mapToPair(row -> {
    String[] cols = row.split(",");
    String key = cols[0];
    String value = cols[1] + "," + cols[2];

    return new Tuple2<String, String>(key, value);
});


这样我得到了:

Key  | Value
---------------------
a    | apple,spain
b    | orange,italy
c    | apple,italy
a    | apple,italy
a    | orange,greece


最后,输出应如下所示

Key  | Fruit | Country
-------------------------------
a    | 2     | 3
b    | 1     | 1
c    | 1     | 1


它计算每个密钥的独特水果和国家/地区的数量。

现在的最佳做法是什么?首先,groupByKey / reduceByKey然后再次拆分?还是像这样的键值对中的每个键都有两个值?

Key  | Value1 | Value2
----------------------
a    | apple  | spain
b    | orange | italy
c    | apple  | italy
a    | apple  | italy
a    | orange | greece

最佳答案

无法使用JavaPairDStream获得不同的值,因此您需要使用其.transformToPair(...)方法,以便首先将其转换为JavaPairRDD,然后获取不同的行,然后按键进行缩减,最后将其转换回JavaPairDStream

-使用地图为水果制作JavaPairDStream:<key, fruit>,然后在.distinct( ).reduceByKey( )内应用.transformToPair(...)以获取具有<key, distinct fruit count>的JavaPairDStream(我们将其称为<prds1>

-使用地图为以下国家/地区制作JavaPairDStream:<key, country>,然后在.distinct( ).reduceByKey( )内应用.transformToPair(...)以获取具有<key, distinct country count>的JavaPairDStream(将其称为<prds2>

-通过密钥同时加入:<key, distinct fruit count, distinct country count>:(应用prds1.join(prds2)

供以后参考,以防您希望使用Spark的Dataframe类进行相同的操作:

-从给定的输入数据中制作一个数据框(假设它有3列称为<key, fruit, country>(称为
df

-选择键和水果,应用不同键,然后按键分组:df.select("key", "fruit").distinct( ).groupBy("key").sum("fruit")(调用结果数据框df1

-选择键和国家/地区,应用不同的键,然后按键分组:df.select("key", "country").distinct( ).groupBy("key").sum("country")(调用结果数据框df2

通过键-df1.join(df2, col("key").equalTo(col("key")), "inner")-连接df1和df2

07-24 09:38
查看更多