问题描述
问题
每次系统从带有滑动窗口的 pubsub 收到消息时,它都会被复制
Each time the system receive a message from pubsub with a Sliding Windows it been duplicated
代码
| 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))
| 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
| 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
输出
如果我只从 pub/sub 发送一条消息并尝试在滑动窗口完成后打印我所拥有的代码:
If I only send one message from pub/sub and try to print what I have after the sliding window finish with the code:
class print_row2(beam.DoFn):
def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))
结果
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000
如果我在 'window' >> 之前打印消息beam.WindowInto(window.SlidingWindows(30, 15))
我只得到一次
If I print the message before 'window' >> beam.WindowInto(window.SlidingWindows(30, 15))
I only get once
过程在图形模式":
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=X===========| : :
w2: |==============| :
...
消息X在slidingwindow开始时只发送了一次,应该只接收一次,但是已经接收了两次
The message X was sent only once at the begining of the slidingwindow, it should only be received once, but is been receiving twice
我已尝试使用两个 AccumulationMode 值,也使用 trigger=AftyerWatermark 但我无法解决问题.
I have tried with both AccumulationMode values, also with a trigger=AftyerWatermark but i can not fix the problem.
可能有什么问题?
额外
使用 FixedWindows 这是我的海豚的正确代码:
With FixedWindows this is the correct code for my porpouse:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())
或
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
推荐答案
我有完全相同的问题,但是在 java 中.我有一个持续时间为 10 秒和步长为 3 秒的窗口.当从我订阅的 mqtt 主题发出事件时,它看起来像我运行的 ParDo 函数,并将第一个也是唯一一个事件发送到所有三个构造的"事件.窗户.
I have exactly the same issue, however in java. I have a window with 10 seconds duration and a step of 3 seconds. When an event is emitted from the mqtt topic, that I subscribe to, it looks like the ParDo function that I have runs and emits the first and only event to all of the three "constructed" windows.
X 是我在随机时间戳发送的事件:2020-09-15T21:17:57.292Z
X is the event that i send at a random timestamp: 2020-09-15T21:17:57.292Z
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |X============| : :
w2: |X=============| :
w3: |X==============|
...
甚至为它们分配了相同的时间戳!!我一定真的做错了什么.
Even the same timestamp is assigned to them!! I must really doing something completely wrong.
我使用带有 Direct Runner 的 Scala 2.12 和 BEAM 2.23.
I use Scala 2.12 and BEAM 2.23 with a Direct Runner.
[提示]: 我在 processElement 函数中使用状态!每个键 + 窗口保持状态的位置.也许那里有错误?我将尝试在没有状态的情况下对其进行测试.
[Hint]: I use states in the processElement function! Where the state is being hold per key + window. Maybe there is a bug there? I will try to test it without states.
更新:删除了状态字段,并将单个事件分配给一个窗口.
UPDATE: Removed the state fields and the single event is assigned to one window.
这篇关于SlidingWindows Python Apache Beam 复制数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!