我正在通过Scala API使用apache flink,并在某个时候获得了DataSet[(Int, Int, Int)]
。使用方法writeAsCSV()
和writeAsText()
的结果是意外的。它创建一个目录。该目录具有位置并命名方法调用的第一个参数(例如filePath
。)在该目录中,出现了两个文件,名称分别为“ 1”和“ 2”。在这些文件中,我可以看到DataSets数据。他们似乎将DataSets内容划分为这两个文件。
尝试重新创建此行为以显示我无法显示的更简洁的代码片段。也就是说,我目睹了一个文件的创建,该文件在期望的位置具有期望的名称,而没有创建目录。
val mas = ma_ groupBy(0,1)sum(2)
mas.writeAsCsv(“ c:\ flink \ mas.csv”)
结果将创建一个名为“ mas.csv”的目录,并在其中包含两个文件“ 1”和“ 2”。什么时候会发生这种情况?
使用的flink 9.1本地模式,Windows 7,scala 2.10,eclipse3.0.3
最佳答案
这是预期的行为。如果要获取单个输出文件,则需要将接收器的并行度设置为1。
dataset = dataset.writeAsCsv("filename").setParallelism(1);
对于DataStream API,您需要插入其他
rebalane()
来中断运算符链。否则,将使用dop = 1来执行整个链,否则可能会忽略setParallelism()
。datastream = datastream.rebalance().writeAsCsv("filename").setParallelism(1);
关于scala - writeAsCSV()和writeAsText()是意外的,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32580970/