今天看到有小伙伴在问,就想着自己实现一下。

问题: Flink FileSink根据输入数据指定输出位置,比如讲对应日期的数据输出到对应目录

输入数据:
20190716 输出到路径 20190716
20190717 输出到路径 20190717
20190718 输出到路径 20190718

目前flink 对与输出到文件有两种实现(write 算子不算,只能指定目录):Rolling File Sink 和 Streaming File Sink

Rolling File Sink 的实现就是 BucketingSink,使用也很简单,直接指定路径就可以了,也可以设置如:目录名称格式(按时间格式滚动),输出文件格式,文件大小、滚动间隔、文件前缀、后缀一类的

// the SequenceFileWriter only works with Flink Tuples
import org.apache.flink.api.java.tuple.Tuple2
val input: DataStream[Tuple2[A, B]] = ...

val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
sink.setWriter(new SequenceFileWriter[IntWritable, Text])
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink)

当然,如果是这么简单,就不会有这篇博客了,下面进入主题

--------------------------------------

默认的 DateTimeBucketer 只能根据时间指定文件名的滚动是规则,没办法根据数据指定文件的输出位置,这需要实现 BasePathBucketer 自定义输出路径

实现如下:

import java.io.File
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer
import org.apache.hadoop.fs.Path

/**
  * 根据实际数据返回数据输出的路径
  */
class DayBasePathBucketer extends BasePathBucketer[String]{

  /**
    * 返回路径
    * @param clock
    * @param basePath
    * @param element
    * @return
    */
  override def getBucketPath(clock: Clock, basePath: Path, element: String): Path = {
    // yyyyMMdd
    val day = element.substring(1, 9)
    new Path(basePath + File.separator + day)
  }
}

调用如下:

import java.io.File
import java.text.SimpleDateFormat
import com.venn.index.conf.Common
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.fs.StringWriter
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._

/**
  * 使用BucketingSink 实现 根据‘数据’自定义输出目录
  */
object RollingFileSinkDemo {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val sdf = new SimpleDateFormat("yyyyMMddHHmmss")
    val source = new FlinkKafkaConsumer[ObjectNode]("roll_file_sink", new JsonNodeDeserializationSchema, Common.getProp)

    val sink = new BucketingSink[String]("D:\\idea_out\\rollfilesink")
    sink.setBucketer(new DayBasePathBucketer)
    sink.setWriter(new StringWriter[String])
    sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    //    sink.setBatchRolloverInterval(24 * 60 * 60 * 1000) // this is 24 hour
//    sink.setInProgressPrefix("inProcessPre")
// sink.setPendingPrefix("pendingpre")
// sink.setPartPrefix("partPre")

env.addSource(source) .assignAscendingTimestamps(json => { sdf.parse(json.get("date").asText()).getTime }) .map(json => { json.get("date") + "-" + json.toString // 将日期拼接到前面,方便后面使用 }) .addSink(sink) env.execute("rollingFileSink") } }

测试数据如下:

{"id" : 1, "name" : "venn1563288621091", "date" : "20190716230020"}
{"id" : 2, "name" : "venn1563288621310", "date" : "20190716231020"}
...
{"id" : 263, "name" : "venn1563288648926", "date" : "20190718184020"}
{"id" : 264, "name" : "venn1563288649029", "date" : "20190718185020"}
{"id" : 265, "name" : "venn1563288649132", "date" : "20190718190020"}

测试结果如下:

Flink FileSink 自定义输出路径——BucketingSink-LMLPHP

Flink FileSink 自定义输出路径——BucketingSink-LMLPHP

可以看到,当天的数据都输出到当天对应的目录中。

遇到个问题:

这里有个问题,因为重写了BasePathBucketer,自定义了输出文件,所有会同时打开多个输出文件,带来文件刷新的问题,在当前文件写完后(这里的表现是:当天的数据以及全部流过,
下一天的文件以及开始写了),会发现当天的文件中的数据不全,因为数据还没有全部刷到文件,这个时候下一个文件又开始写了,会发现上一个文件还没刷完

猜想:

猜想:每个文件都有个输出缓冲,上一个文件最后一点数据还在缓冲区,下一个文件又使用新的缓冲区,没办法刷到上一个文件的数据,只有等缓冲区数据满、超时一类的操作触发刷写 ??

验证:

源码BucketingSink.closePartFilesByTime 默认每60秒或大于滚动时间间隔(batchRolloverInterval)(系统时间) 将当前park文件,将状态从 in-process 修改为 pending,随后关闭当前的part 文件,数据刷到磁盘

代码如下:

private void closePartFilesByTime(long currentProcessingTime) throws Exception {

        synchronized (state.bucketStates) {
            for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
                if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)
                        || (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {
                    LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
                        getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
                    closeCurrentPartFile(entry.getValue());
                }
            }
        }
    }

 下篇: Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较

搞定

03-29 16:18