我正在使用Apache Spark 1.1.0。在实施过程中,我正在JavaPairRDD<String, SomeClassObjects>上调用cogroup。

JavaPairRDD返回的结果为<String, wrappers$iterable>形式。

有谁知道如何在Java中迭代wrappers$iterable?我尝试将其强制转换为Iterable或JavaIterableWrapperSerializer,但它们都抛出ClassCastException。

代码是:

JavaPairRDD mtPairRDD的结果类型为Tuple2<String, ClassMTObj>

JavaPairRDD mcPairRDD的结果类型为Tuple2<String,ClassMCObj>

JavaPairRDD mCmTPairRDD = mcPairRDD.cogroup(mtPairRDD, 100);
List        lst         = mCmTPairRDD.collect()

        for(int i=0;i <= lst2.size(); i++) {
            Tuple2    obj1    = (Tuple2) lst2.get(i);
            String    mCompId = (String) obj1._1();

            Tuple2     obj2   = (Tuple2) obj1._2();
            ClassMCObj mcBean = (ClassMCObj) obj2._1(); //ClassCastException:due to wrappers$iterable
            ClassMTObj mcT    = (ClassMTObj) obj2._2(); //ClassCastException:due to wrappers$iterable
        }


我已经尝试过铸造

Iterable mcBean = (Iterable) obj2._1()


要么

JavaIterableWrapperSerializer mcBean = (JavaIterableWrapperSerializer) obj2._1();


但是所有类型转换选项都将引发Exception。

谢谢,
尼丁

最佳答案

如果您不使用原始类型JavaPairRDDList,则编译器可以帮助您找到真正的问题。您正在将对象转换为错误的类型。这是我对您代码的修正:

    JavaPairRDD<String, Tuple2<String, ClassMTObj>> mcPairRDD = null;
    JavaPairRDD<String, Tuple2<String, ClassMCObj>> mtPairRDD = null;

    JavaPairRDD<String, Tuple2<Iterable<Tuple2<String, ClassMTObj>>, Iterable<Tuple2<String, ClassMCObj>>>> mCmTPairRDD
            = mcPairRDD.cogroup(mtPairRDD, 100);
    List<Tuple2<String, Tuple2<Iterable<Tuple2<String, ClassMTObj>>, Iterable<Tuple2<String, ClassMCObj>>>>> lst = mCmTPairRDD.collect();
    for (Tuple2<String, Tuple2<Iterable<Tuple2<String, ClassMTObj>>, Iterable<Tuple2<String, ClassMCObj>>>> kv : lst) {
        String key = kv._1();
        Iterable<Tuple2<String, ClassMTObj>> v1 = kv._2()._1();
        Iterable<Tuple2<String, ClassMCObj>> v2 = kv._2()._2();
        for (Tuple2<String, ClassMTObj> kv1 : v1) {
            //...
        }
        for (Tuple2<String, ClassMCObj> kv2 : v2) {
            //...
        }
    }

09-26 05:53