我有一个包含键值对的JavaPairDStream。我需要将其转换为HashMap。我尝试通过在其上调用“ collectAsMap()”函数及其正常工作来对普通JavaPairRDD进行相同操作,但是当我尝试在DStream上进行相同操作时,它失败了。

我试图通过使用“ foreachRDD”函数将“ JavaPairDStream”转换为“ JavaPairRDD”来实现相同的目的,然后在JavaPairRDD上使用“ collectAsMap()”函数。

Map<String,String> value= new HashMap<String,String>();
            value=line.collectAsMap();

//Here "line" is a "JavaPairRDD<String,String>".


它没有给出任何编译错误,但是当我运行程序时,它失败并抛出如下错误。

java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tuple2;
    at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:447)
    at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:464)
    at attempt1.CSV_Spark$3.call(CSV_Spark.java:109)
    at attempt1.CSV_Spark$3.call(CSV_Spark.java:1)


我不确定我的方法是否正确。普通的“ JavaPairRDD”与“ foreachRDD”函数创建的JavaPairRDD之间有什么区别吗?为什么相同的方法适用于普通的“ JavaPairRDD”,但是当我将其应用于通过在JavaPairDStream上应用“ foreachRDD”功能创建的“ JavaPairRDD”时却失败了。如果我在任何地方出问题了,请告诉我。另外,如果还有其他方法,请在此处发布。谢谢。

最佳答案

在编译时,向下转换被接受,因为Map和HashMap处于相同的继承中。尽管我们没有得到任何编译时错误,但我们将在运行时获取ClassCastException。为避免此问题,您可以尝试以下操作:

码:

JavaPairRDD<K, V> javaRDDPair  = rddInstance.mapToPair(new PairFunction<T, K, V>() {
   @Override
    public Tuple2<K, V> call(final T value) {
    // statements
    // operations on value
    return new Tuple2<K, V>(KTypeValue, VTypeValue);
    }
    });

    Map<K,V> map =  javaRDDPair.collectAsMap();
    HashMap<K,V> hmap = new HashMap<K,V>(map);


注意:rddInstance是JavaRDD类型的对象。

假设我们有一个JavaRDD,其中包含T类型值。
对其进行转换后,我们将创建JavaPairRDD,其中包含对。
现在的要求是将JavaPairRDD转换为HashMap对象,以便在应用程序中进行进一步的计算。使用collectAsMap方法并将其结果分配给Map对象本身。之后,您可以通过传递Map instance创建HashMap。

07-26 09:40