我正在使用Flink 1.4.0

假设我有一个POJO,如下所示:

public class Rating {
    public String name;
    public String labelA;
    public String labelB;
    public String labelC;
    ...
}


JOIN函数:

public class SetLabelA implements JoinFunction<Tuple2<String, Rating>, Tuple2<String, String>, Tuple2<String, Rating>> {

    @Override
        public Tuple2<String, Rating> join(Tuple2<String, Rating> rating, Tuple2<String, String> labelA) {
        rating.f1.setLabelA(labelA)
        return rating;
    }
}


并假设我想应用JOIN操作来设置DataSet<Tuple2<String, Rating>>中每个字段的值,可以按照以下步骤进行操作:

DataSet<Tuple2<String, Rating>> ratings = // [...]
DataSet<Tuple2<String, Double>> aLabels = // [...]
DataSet<Tuple2<String, Double>> bLabels = // [...]
DataSet<Tuple2<String, Double>> cLabels = // [...]
...
DataSet<Tuple2<String, Rating>>
            newRatings =
            ratings.leftOuterJoin(aLabels, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)

                   // key of the first input
                   .where("f0")

                   // key of the second input
                   .equalTo("f0")

                   // applying the JoinFunction on joining pairs
                   .with(new SetLabelA());


不幸的是,这是必需的,因为等级和所有xLabels都很大DataSets,我被迫调查每个xlabels以查找所需的字段值,但同时并非如此每个xlabels中都存在所有评级密钥。

这实际上意味着我必须对每个leftOuterJoin执行一个xlabel,为此,我还需要创建相应的JoinFunction实现,该实现利用来自Rating POJO的正确设置器。

有没有一种更有效的方法可以解决任何人都可以想到的问题?

就分区策略而言,我确保对DataSet<Tuple2<String, Rating>> ratings进行排序:

DataSet<Tuple2<String, Rating>> sorted_ratings = ratings.sortPartition(0, Order.ASCENDING).setParallelism(1);


通过将并行度设置为1,我可以确保整个数据集将被排序。然后,我使用.partitionByRange

DataSet<Tuple2<String, Rating>> partitioned_ratings = sorted_ratings.partitionByRange(0).setParallelism(N);


其中N是我的VM上具有的内核数。我在这里遇到的另一个问题是,将第一个.setParallelism设置为1是否对其余管道的执行方式有限制,即后续.setParallelism(N)是否可以更改DataSet的处理方式?

最后,我做了所有这些操作,以便当partitioned_ratingsxlabels DataSet连接时,JOIN操作将由JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE完成。根据Flinkv.1.4.0文档:


  REPARTITION_SORT_MERGE:系统对每个输入进行分区(混洗)(除非已对输入进行分区),并对每个输入进行排序(除非已对其进行排序)。输入通过已排序输入的流合并合并在一起。如果已经对一个或两个输入进行了排序,则此策略很好。


因此,在我的情况下,我认为ratings已排序,而每个xlabels DataSets都不排序,因此这是最有效的策略,这是有道理的。这有什么问题吗?还有其他方法吗?

最佳答案

到目前为止,我还无法实施该策略。似乎依靠JOINs太麻烦了,因为它们是昂贵的操作,除非真正必要,否则应避免使用它们。

例如,如果两个JOINs的大小都很大,则应使用Datasets。如果不是,则使用BroadCastVariables是一种方便的替代方法,通过该方法,无论使用什么目的,都在工作者之间广播两个Datasets中的一个(最小的)。下面显示一个示例(为方便起见,从此link复制)

DataSet<Point> points = env.readCsv(...);

DataSet<Centroid> centroids = ... ; // some computation

points.map(new RichMapFunction<Point, Integer>() {

    private List<Centroid> centroids;

    @Override
    public void open(Configuration parameters) {
        this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
    }

    @Override
    public Integer map(Point p) {
        return selectCentroid(centroids, p);
    }

}).withBroadcastSet("centroids", centroids);


同样,由于填充POJO的字段意味着将重复利用非常相似的代码,因此,一定要使用jlens以避免代码重复,并编写更简洁易懂的解决方案。

09-26 17:17