我也是flink和流媒体的新手。我想将每个分区的某个功能应用于流的每个窗口(使用事件时间)。到目前为止,我所做的是:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val inputStream = env.readTextFile("dataset.txt")
      .map(transformStream(_))
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.id)
      .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))

def transformStream(input: String): EventStream = {...}

case class EventStream(val eventTime: Long, val id: String, actualEvent: String)

我想做的是对每个窗口批处理的每个分区应用通用功能,也许应用复杂的处理算法或类似的方法。我已经看到该方法适用于DataStream API,但我不了解它的工作原理。在Flink API中说它像在Scala中那样使用:
inputStream.apply { WindowFunction }

有人可以解释一下apply方法的作用或用法吗? Scala中的示例将是可取的。 apply方法可以满足我的要求吗?

最佳答案

因此,根据您要执行的计算类型,基本上可以遵循两个可能的方向。要么使用:fold / reduce / aggregate,要么更通用,您已经提到过-apply。它们都适用于Windows的密钥。

至于apply,这是一种非常通用的应用计算方式。最基本的版本(在Scala中)将是:

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R]

函数具有4个参数:

窗口的
  • 键(记住您正在使用keyedStream)
  • 窗口(您可以从中提取例如窗口的开始或结束)
  • 分配给此特定窗口的元素和键
  • 一个收集器,您应该向其发出处理结果

  • 但必须记住,此版本必须保持每个元素处于状态,直到发出窗口为止。更好的内存性能解决方案是使用带有preAgreggator的版本,该版本在触发上述功能之前执行一些计算。

    在这里,您可以看到带有预汇总内容的简短摘要:
    val stream: DataStream[(String,Int)] =   ...
    
    stream.keyBy(_._1)
          .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
          .apply((e1, e2) => (e1._1, e1._2 + e2._2),
                 (key, window, in, out: Collector[(String, Long, Long, Int)]) => {
                    out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
          })
    

    计算会话窗口中密钥的出现次数。

    因此,基本上,如果您不需要窗口的元信息,如果足够的话,我会坚持使用fold \ reduce \ aggregate。不要考虑将其应用于某种预聚合中,如果这还不够,请查看最通用的apply

    有关更完整的示例,请查看here

    10-01 06:36
    查看更多