我将以下数据保存在JavaPairRDD
中。
起始数据
key(nodeName) PointsTo 1/n n
node1 [node2,node3,node4] 0.33 3
node2 [node1,node5] 0.50 2
node3 [node1,node2,node4,node5] 0.25 4
node4 [node1,node2] 0.50 2
node5 [node2,node3,node4] 0.33 3
key(nodeName) PointsTo 1/n n
node2 node1 0.33 3
node3 node1 0.33 3
node4 node1 0.33 3
node1 node2 0.50 2
node5 node2 0.50 2
node1 node3 0.25 4
node2 node3 0.25 4
node4 node3 0.25 4
node5 node3 0.25 4
node1 node4 0.50 2
node2 node4 0.50 2
node2 node5 0.33 3
node3 node5 0.33 3
node4 node5 0.33 3
JavaPairRDD看起来像这样。
JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>>
其中
String = key(nodeName)
而Tuple3分别具有PointsTo, 1/n and n values
。现在我要执行2个步骤。
步骤1:
key(nodeName) PointsTo 1/n n
node1 [node2,node3,node4] 0.33 3
node2 [node1,node3,node4,node5] 0.50 2
node3 [node1,node5] 0.25 4
node4 [node1,node3,node5] 0.50 2
node5 [node2,node3] 0.33 3
第2步:
key(nodeName) PointsTo 1/n n
node1 [0.11,0.14,0.32] 0.33 3
node2 [0.92,0.14,0.32,0.67] 0.50 2
node3 [0.92,0.67] 0.25 4
node4 [0.92,0.14,0.67] 0.50 2
node5 [0.11,0.14] 0.33 3
如果我只解释一个节点,那会容易得多。因此,让我们使用
node4
。 node4
出现在PointsTo
为keys
的node1,node3,node5
中。因此,我们只需将PointsTo
其中的key=node4
更新为现在的node1,node3,node5
。请注意,
1/n and n
列中的值与键相同。在
step 2
中,我们只需将nodeNames
替换为其各自的1/n
值。因此,最终的JavaPairRDD将如下所示。
JavaPairRDD<String, Tuple3<Iterable<Double>,Double,Double>>
最佳答案
我对此的尝试:
从JavaPairRDD
获取参考数据(data1,data2:有关详细信息,请参见输出)并存储在Map
中
创建函数以扫描输入数据并使用新Iterable<String>
中的参考数据2 Map
替换为新的Iterable<String>
值:请参见getStep1Data()
创建函数以从源RDD扫描Iterable<String> PointsTo1
并在新的1/n
中使用引用Map
替换为其Iterable<Double>
值:请参见getPointsToN()
资源:
public class SparkTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");
SparkContext sc = new SparkContext(conf);
PairFunction<String, String,Tuple3<Iterable<String>,Double,Double>> keyData =
new PairFunction<String,String,Tuple3<Iterable<String>,Double,Double>>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Tuple3<Iterable<String>, Double, Double>> call(String line) throws Exception {
return new Tuple2(line.split("\\|")[0],
new Tuple3(Arrays.asList(line.split("\\|")[1].split(",")),
Double.parseDouble(line.split("\\|")[2]),Double.parseDouble(line.split("\\|")[3])));
}
};
System.out.println(" --- Input Data ---"); //Data to start with
JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>> masterData =
sc.textFile("Data\\node.txt", 1).toJavaRDD().mapToPair(keyData);
masterData.foreach(line -> System.out.println(line));
Map<String,Double> refData1 = new HashMap<String,Double>();
Map<String,Iterable<String>> refData2 = new LinkedHashMap<String,Iterable<String>>();
masterData.collect().forEach(line -> {refData1.put(line._1, line._2._2()); refData2.put(line._1, line._2._1());});
System.out.println(" --- Referacne Data 1-- "); //stored referance Data node and its 1/n value
for(String k: refData1.keySet())System.out.println(k + " --> " + refData1.get(k));
System.out.println(" --- Referacne Data 2-- "); //stored referance Data node and its 1/n value
for(String k: refData2.keySet()) System.out.println(k + " --> " + refData2.get(k));
System.out.println(" --- Data in Step 1 --- ");
JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>> step1RDD =
masterData.mapToPair(line ->new Tuple2(line._1, new Tuple3(getStep1Data(line._1,refData2), line._2._2(), line._2._3())));
step1RDD.foreach(line -> System.out.println(line));
System.out.println(" --- Data in Step 2 --- ");
JavaPairRDD<String, Tuple3<Iterable<Double>,Double,Double>> step2RDD =
step1RDD.mapToPair(line ->new Tuple2(line._1, new Tuple3(getPointsToN(refData1, line._2._1()), line._2._2(), line._2._3())));
step2RDD.foreach(line -> System.out.println(line));
}
// get 1/n values from referance Data for pointsTo in new list
public static Iterable<Double> getPointsToN(Map<String, Double> refData, Iterable<String> pointsTo){
ArrayList<Double> n1 = new ArrayList<Double>();
for(String node: pointsTo)
if(refData.containsKey(node))
n1.add(refData.get(node));
return n1;
}
// get node values from referance Data 2 for pointsTo in new list
public static Iterable<String> getStep1Data(String node,Map<String, Iterable<String>> refData2){
ArrayList<String> n1 = new ArrayList<String>();
for(String n: refData2.keySet())
refData2.get(n).forEach(element -> {if(element.equals(node)) n1.add(n);});
return n1;
}
}