有这样的地图:
K1 -> [V1, V2]
K2 -> [V2, V3]
K3 -> [V3]
K4 -> [V4]
结果,我希望有一组分组键的列表,这些键中至少有一个来自值列表的公共元素。解决方案应支持传递关系(G1组):
G1 = [K1, K2, K3]
G2 = [K4]
我陷入了描述here的错误。如何在Spark中实现?
我的代码如下:
public class Grouping implements Serializable {
public void group(JavaSparkContext sc) {
List<Mapping> list = newArrayList();
list.add(new Mapping("K1", newArrayList("V1", "V2")));
list.add(new Mapping("K2", newArrayList("V2", "V3")));
list.add(new Mapping("K3", newArrayList("V3")));
list.add(new Mapping("K4", newArrayList("V4")));
JavaRDD<Tuple2<Mapping, String>> pairs = sc.parallelize(list).map(Mapping::toPairs).flatMap(p -> p);
JavaPairRDD<String, Iterable<Mapping>> valuesToMappings = pairs.groupBy(Tuple2::_2).
mapToPair(t -> new Tuple2<>(t._1, stream(t._2).map(tt -> tt._1).collect(toList())));
JavaRDD<Group> map = valuesToMappings.map(t -> new Group(traverse(newHashSet(t._2.iterator()), valuesToMappings)));
System.out.println(map.collect());
}
private Set<Mapping> traverse(Set<Mapping> mappings, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
Set<String> values = mappings.stream().map(Mapping::getValues).flatMap(Collection::stream).collect(toSet());
Set<Mapping> mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
while (!mappings.equals(mappingsHavingValues)) {
mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
}
return mappingsHavingValues;
}
private Set<Mapping> mappingsHavingValues(Set<String> values, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
Set<Mapping> result = newHashSet();
for (String value : values) {
List<Iterable<Mapping>> lookup = valuesToMappings.lookup(value);
result.addAll(newArrayList(lookup.get(0))); //here I get an exception
}
return result;
}
public <T> Stream<T> stream(Iterable<T> in) {
return StreamSupport.stream(in.spliterator(), false);
}
}
public class Mapping implements Serializable {
private String key;
private List<String> values;
public Mapping(String key, List<String> values) {
this.key = key;
this.values = values;
}
public String getKey() {
return key;
}
public List<String> getValues() {
return values;
}
public List<Tuple2<Mapping, String>> toPairs() {
return getValues().stream().map(v -> new Tuple2<>(this, v)).collect(toList());
}
}
public class Group {
private Set<Mapping> mappings;
public Group(Set<Mapping> mappings) {
this.mappings = mappings;
}
public Set<Mapping> getMappings() {
return mappings;
}
}
最佳答案
您正在图形中寻找连接的组件。 org.apache.spark.graphx.lib.ConnectedComponents
为此提供了分布式解决方案。