我正在尝试一个简单的Flink程序,该程序只接收一个文件,反转文件中的字符串并将其写出。
该程序可以正常工作,只有个别行顺序混乱。
例如。
文件输入
Thing,Name
Person,Vineet
Fish,Karp
Dog,Fido
输出文件
Fish,praK
Thing,emaN
Person,teeniV
Dog,odiF
我期待:
Thing,emaN
Person,teeniV
Fish,praK
Dog,odiF
以下是我为实现此目的而编写的程序:
package testflink;
import java.util.Iterator;
import java.util.StringJoiner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;
public class BatchJob {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
System.err.println(env.getParallelism());
DataSource<String> file = env.readTextFile("./data.csv");
file.mapPartition((Iterable<String> values, Collector<String> out) -> {
System.err.println("************* " + out.hashCode() + " Begin");
Iterator<String> iterator = values.iterator();
while (iterator.hasNext()) {
String tuple = iterator.next();
System.err.println("************* " + out.hashCode() + tuple);
String[] split = tuple.split(",");
String tuple1Rev = new StringBuilder(split[1]).reverse().toString();
out.collect(new StringJoiner(",").add(split[0]).add(tuple1Rev).toString());
}
System.err.println("************* " + out.hashCode() + " End");
}).returns(String.class).writeAsText("./dataO.csv", WriteMode.OVERWRITE).setParallelism(1);
env.execute("Flink Batch Java API Skeleton");
System.out.println("Done");
}
}
是否可以维持输入顺序?有什么好的解决方法吗?
我知道有
readAsCsv()
方法可用时,我正在读取csv和拆分字符串。问题是csv每行/元组可以有动态的行数。我无法弄清楚如何将其转换为每个元组具有动态列数的DataSource。 MapPartition需要定义的类型-如何在运行时替换Tuple0
-Tuple25
?还有,最后一个问题-我可以限制分区在
Iterable<String> values
参数中使用的值永远不能超过n个吗?提前致谢! :)
最佳答案
Flink的mapPartition
维护每个并行分区内记录的顺序。但是,用例中的问题是如何将数据分配给MapPartition运算符的并行任务。
您正在使用TextInputFormat
将输入文件分为几个输入拆分,这些拆分由数据源运算符的并行实例独立处理。每个数据源实例在本地将其所有记录转发到后续的MapPartition运算符,这会将其结果记录转发到接收器。管道如下所示:
source_1 -> mapPartition_1 -> sink_1
source_2 -> mapPartition_2 -> sink_2
source_3 -> mapPartition_3 -> sink_3
...
因此,从源头开始,所有记录都是按顺序处理的。但是,由于将输入拆分随机分配给源任务,并且接收器独立运行(无协调),因此输出仅部分排序(从同一拆分读取的记录被排序)。
将源的并行度设置为1不会有帮助,因为它将以循环方式将其结果记录发送到后续任务,以利用后续运算符的并行度。将整个作业的并行度设置为1也无济于事,因为拆分仍然可以由单个源任务以随机顺序处理。我知道的唯一解决方案是在写入结果之前给每个输入记录编号并sorting on that number (with range partitioning for parallel processing)。
关于java - Apache Flink:使用mapPartition顺序处理数据,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44840029/