本文介绍了如何组织一个复杂的Apache Flink应用程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用Flink从一些物联网传感器生成事件。每个传感器都可用于生成不同类型的事件(如温度、湿度等)。一对多比率(传感器启用的事件)。
传感器与存储在关系数据库中的启用事件之间的映射

为了丰富传感器数据,我们将连接传感器数据流和表API。正在添加具有已启用事件列表的元数据。

那么,如果某些特定的sensor-123只启用了TEMPPRESSURE两个事件,如何才能只向这两个定义的流程函数发送传感器数据呢?

脑海中浮现出如下内容:

val enriched: DataStream[EnrichedSensorData] = ...

val temp = enriched.filter(x => isTempEnabled(x)).process(....)
val humd = enriched.filter(x => isHumdEnabled(x)).process(....)
val press = enriched.filter(x => isPressEnabled(x)).process(....)
  1. 效果如何?就Flink最佳实践而言,如何做得最好?据我所知,在我的例子中,我将数据流乘以几倍,尽管我随后用过滤将结果发过滤

  2. 在我的案例中,执行数据丰富过程的最佳方式是什么?将传感器数据流与表流连接(通过Flink-CDC-Connector)+在丰富进程函数中使用状态缓存映射传感器ID->;list(EnabledEvents)?

推荐答案

  1. 使用丰富函数的辅助输出生成三个事件流。如果您有一个似乎与复制数据有关的性能问题,您可以尝试以管道方式处理它(使用内联的温度、湿度和压力函数,只需转发任何不适合处理的记录)。

  2. 如果您有数百万个传感器,每个传感器都有元数据,则使用JDBC源,并与传感器数据进行(有状态)联接。您必须处理在相应的元数据记录之前获取传感器数据记录的情况,在这种情况下,您需要将其存储在状态中,然后在元数据记录到达时生成结果(并清除状态)。

这篇关于如何组织一个复杂的Apache Flink应用程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-23 01:11