我将以下数据保存在JavaPairRDD中。

起始数据

key(nodeName)    PointsTo                         1/n     n

node1           [node2,node3,node4]               0.33    3
node2           [node1,node5]                     0.50    2
node3           [node1,node2,node4,node5]         0.25    4
node4           [node1,node2]                     0.50    2
node5           [node2,node3,node4]               0.33    3


key(nodeName)   PointsTo        1/n        n

node2           node1           0.33       3
node3           node1           0.33       3
node4           node1           0.33       3

node1           node2           0.50       2
node5           node2           0.50       2

node1           node3           0.25       4
node2           node3           0.25       4
node4           node3           0.25       4
node5           node3           0.25       4

node1           node4           0.50       2
node2           node4           0.50       2

node2           node5           0.33       3
node3           node5           0.33       3
node4           node5           0.33       3


JavaPairRDD看起来像这样。

JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>>


其中String = key(nodeName)而Tuple3分别具有PointsTo, 1/n and n values

现在我要执行2个步骤。

步骤1:

key(nodeName)    PointsTo                         1/n     n

node1           [node2,node3,node4]               0.33    3
node2           [node1,node3,node4,node5]         0.50    2
node3           [node1,node5]                     0.25    4
node4           [node1,node3,node5]               0.50    2
node5           [node2,node3]                     0.33    3


第2步:

key(nodeName)    PointsTo                         1/n     n

node1           [0.11,0.14,0.32]                  0.33    3
node2           [0.92,0.14,0.32,0.67]             0.50    2
node3           [0.92,0.67]                       0.25    4
node4           [0.92,0.14,0.67]                  0.50    2
node5           [0.11,0.14]                       0.33    3


如果我只解释一个节点,那会容易得多。因此,让我们使用node4node4出现在PointsTokeysnode1,node3,node5中。因此,我们只需将PointsTo其中的key=node4更新为现在的node1,node3,node5

请注意,1/n and n列中的值与键相同。

step 2中,我们只需将nodeNames替换为其各自的1/n值。

因此,最终的JavaPairRDD将如下所示。

JavaPairRDD<String, Tuple3<Iterable<Double>,Double,Double>>

最佳答案

我对此的尝试:


JavaPairRDD获取参考数据(data1,data2:有关详细信息,请参见输出)并存储在Map
创建函数以扫描输入数据并使用新Iterable<String>中的参考数据2 Map替换为新的Iterable<String>值:请参见getStep1Data()
创建函数以从源RDD扫描Iterable<String> PointsTo1并在新的1/n中使用引用Map替换为其Iterable<Double>值:请参见getPointsToN()


资源:

public class SparkTest {
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");
        SparkContext sc = new SparkContext(conf);

        PairFunction<String, String,Tuple3<Iterable<String>,Double,Double>> keyData =
                new PairFunction<String,String,Tuple3<Iterable<String>,Double,Double>>() {

            private static final long serialVersionUID = 1L;
            public Tuple2<String, Tuple3<Iterable<String>, Double, Double>> call(String line) throws Exception {
                return new Tuple2(line.split("\\|")[0],
                        new Tuple3(Arrays.asList(line.split("\\|")[1].split(",")),
                                Double.parseDouble(line.split("\\|")[2]),Double.parseDouble(line.split("\\|")[3])));
            }
        };

        System.out.println(" --- Input Data ---"); //Data to start with
        JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>> masterData =
                sc.textFile("Data\\node.txt", 1).toJavaRDD().mapToPair(keyData);
        masterData.foreach(line -> System.out.println(line));

        Map<String,Double> refData1 = new HashMap<String,Double>();
        Map<String,Iterable<String>> refData2 = new LinkedHashMap<String,Iterable<String>>();
        masterData.collect().forEach(line -> {refData1.put(line._1, line._2._2()); refData2.put(line._1, line._2._1());});
        System.out.println(" --- Referacne Data 1-- "); //stored referance Data node and its 1/n value
        for(String k: refData1.keySet())System.out.println(k + " --> " + refData1.get(k));

        System.out.println(" --- Referacne Data 2-- "); //stored referance Data node and its 1/n value
        for(String k: refData2.keySet()) System.out.println(k + " --> " + refData2.get(k));

        System.out.println(" ---  Data in Step 1 --- ");
        JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>> step1RDD =
            masterData.mapToPair(line ->new Tuple2(line._1, new Tuple3(getStep1Data(line._1,refData2), line._2._2(), line._2._3())));

        step1RDD.foreach(line -> System.out.println(line));

        System.out.println(" --- Data in Step 2 --- ");
        JavaPairRDD<String, Tuple3<Iterable<Double>,Double,Double>> step2RDD =
                step1RDD.mapToPair(line ->new Tuple2(line._1, new Tuple3(getPointsToN(refData1, line._2._1()), line._2._2(), line._2._3())));

        step2RDD.foreach(line -> System.out.println(line));
    }
    // get 1/n values from referance Data for pointsTo in new list
    public static Iterable<Double> getPointsToN(Map<String, Double> refData, Iterable<String> pointsTo){
        ArrayList<Double> n1 = new ArrayList<Double>();
        for(String node: pointsTo)
            if(refData.containsKey(node))
                n1.add(refData.get(node));
        return n1;
    }
    // get node values from referance Data 2 for pointsTo in new list
        public static Iterable<String> getStep1Data(String node,Map<String, Iterable<String>> refData2){
            ArrayList<String> n1 = new ArrayList<String>();
            for(String n: refData2.keySet())
             refData2.get(n).forEach(element -> {if(element.equals(node)) n1.add(n);});
            return n1;
        }
}


java - 使用Apache Spark将可迭代的值映射到键-LMLPHP

08-26 02:07