有这样的地图:

K1 -> [V1, V2]
K2 -> [V2, V3]
K3 -> [V3]
K4 -> [V4]


结果,我希望有一组分组键的列表,这些键中至少有一个来自值列表的公共元素。解决方案应支持传递关系(G1组):

G1 = [K1, K2, K3]
G2 = [K4]


我陷入了描述here的错误。如何在Spark中实现?

我的代码如下:

public class Grouping implements Serializable {

    public void group(JavaSparkContext sc) {
        List<Mapping> list = newArrayList();
        list.add(new Mapping("K1", newArrayList("V1", "V2")));
        list.add(new Mapping("K2", newArrayList("V2", "V3")));
        list.add(new Mapping("K3", newArrayList("V3")));
        list.add(new Mapping("K4", newArrayList("V4")));

        JavaRDD<Tuple2<Mapping, String>> pairs = sc.parallelize(list).map(Mapping::toPairs).flatMap(p -> p);
        JavaPairRDD<String, Iterable<Mapping>> valuesToMappings = pairs.groupBy(Tuple2::_2).
            mapToPair(t -> new Tuple2<>(t._1, stream(t._2).map(tt -> tt._1).collect(toList())));

        JavaRDD<Group> map = valuesToMappings.map(t -> new Group(traverse(newHashSet(t._2.iterator()), valuesToMappings)));

        System.out.println(map.collect());
    }

    private Set<Mapping> traverse(Set<Mapping> mappings, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
        Set<String> values = mappings.stream().map(Mapping::getValues).flatMap(Collection::stream).collect(toSet());
        Set<Mapping> mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
        while (!mappings.equals(mappingsHavingValues)) {
            mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
        }

        return mappingsHavingValues;
    }

    private Set<Mapping> mappingsHavingValues(Set<String> values, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
        Set<Mapping> result = newHashSet();
        for (String value : values) {
            List<Iterable<Mapping>> lookup = valuesToMappings.lookup(value);
            result.addAll(newArrayList(lookup.get(0))); //here I get an exception
        }
        return result;
    }

    public <T> Stream<T> stream(Iterable<T> in) {
        return StreamSupport.stream(in.spliterator(), false);
    }
}

public class Mapping implements Serializable {
    private String key;
    private List<String> values;

    public Mapping(String key, List<String> values) {
        this.key = key;
        this.values = values;
    }

    public String getKey() {
        return key;
    }

    public List<String> getValues() {
        return values;
    }

    public List<Tuple2<Mapping, String>> toPairs() {
        return getValues().stream().map(v -> new Tuple2<>(this, v)).collect(toList());
    }
}

public class Group {

    private Set<Mapping> mappings;

    public Group(Set<Mapping> mappings) {
        this.mappings = mappings;
    }

    public Set<Mapping> getMappings() {
        return mappings;
    }
}

最佳答案

您正在图形中寻找连接的组件。 org.apache.spark.graphx.lib.ConnectedComponents为此提供了分布式解决方案。

09-11 18:58
查看更多