本文介绍了从 kafka 流中阅读 peek 主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主题名称,它是 push-processing-KSTREAM-PEEK-0000000014-repartition,这是 kafka 的内部主题.我没有创建这个主题,我在重新分区后使用 .peek() 方法并使用 peek 方法 3-4 次.

I have a topic name which is push-processing-KSTREAM-PEEK-0000000014-repartition and this is internal topic by kafka. I did not create this topic and I am using .peek() method after repartition and using peek method 3-4 times.

我的问题是我可以从主题topic read push-processing-KSTREAM-PEEK-0000000014-repartition读取,但是当我说topic read push-processing-KSTREAM时我无法读取-PEEK-0000000014-repartition --from-beginning.

My question is I can read from topic topic read push-processing-KSTREAM-PEEK-0000000014-repartition, but I can not read when I say topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning.

这个内部主题是由 peek 方法创建的吧?

This internal topic is created because of peek method right?

还是与其他重分区流代码相关,但它的名字是KSTREEAM-PEEK?

Or is it related with other repartition streams code, but its name is KSTREEAM-PEEK?

它有 50 个分区.因为peek是无状态的操作,它不应该创建内部主题,但为什么它的名字与peek有关,为什么我不能从头开始阅读?

It has 50 partitions. Because of peek is stateless operation, it should not create internal topics right but why is it name is related with peek and why I can not read from beginning?

任何想法请/

这是第一个拓扑:

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [appconnect_deviceIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
      --> KSTREAM-PEEK-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000005 (stores: [])
      --> KSTREAM-FILTER-0000000007
      <-- KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FILTER-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000005
    Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-PEEK-0000000005-repartition)
      <-- KSTREAM-FILTER-0000000007

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-PEEK-0000000005-repartition])
      --> KSTREAM-JOIN-0000000009
    Source: KSTREAM-SOURCE-0000000028 (topics: [KSTREAM-PEEK-0000000025-repartition])
      --> KSTREAM-JOIN-0000000029
    Processor: KSTREAM-JOIN-0000000009 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-MAP-0000000010
      <-- KSTREAM-SOURCE-0000000008
    Processor: KSTREAM-JOIN-0000000029 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-PEEK-0000000030
      <-- KSTREAM-SOURCE-0000000028
    Processor: KSTREAM-MAP-0000000010 (stores: [])
      --> KSTREAM-PEEK-0000000011
      <-- KSTREAM-JOIN-0000000009
    Processor: KSTREAM-PEEK-0000000030 (stores: [])
      --> KSTREAM-MAP-0000000031
      <-- KSTREAM-JOIN-0000000029
    Processor: KSTREAM-MAP-0000000031 (stores: [])
      --> KSTREAM-SINK-0000000032
      <-- KSTREAM-PEEK-0000000030
    Processor: KSTREAM-PEEK-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000012
      <-- KSTREAM-MAP-0000000010
    Source: KSTREAM-SOURCE-0000000002 (topics: [appconnect_device_stream])
      --> KTABLE-SOURCE-0000000003
    Sink: KSTREAM-SINK-0000000012 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-PEEK-0000000011
    Sink: KSTREAM-SINK-0000000032 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-MAP-0000000031
    Processor: KTABLE-SOURCE-0000000003 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> none
      <-- KSTREAM-SOURCE-0000000002

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000013 (topics: [appconnect_userIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FLATMAP-0000000017 (stores: [])
      --> KSTREAM-PEEK-0000000018
      <-- KSTREAM-SOURCE-0000000013
    Processor: KSTREAM-PEEK-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000020
      <-- KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FILTER-0000000020 (stores: [])
      --> KSTREAM-SINK-0000000019
      <-- KSTREAM-PEEK-0000000018
    Sink: KSTREAM-SINK-0000000019 (topic: KSTREAM-PEEK-0000000018-repartition)
      <-- KSTREAM-FILTER-0000000020

  Sub-topology: 3
    Source: KSTREAM-SOURCE-0000000021 (topics: [KSTREAM-PEEK-0000000018-repartition])
      --> KSTREAM-JOIN-0000000022
    Processor: KSTREAM-JOIN-0000000022 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> KSTREAM-PEEK-0000000023
      <-- KSTREAM-SOURCE-0000000021
    Processor: KSTREAM-PEEK-0000000023 (stores: [])
      --> KSTREAM-MAP-0000000024
      <-- KSTREAM-JOIN-0000000022
    Processor: KSTREAM-MAP-0000000024 (stores: [])
      --> KSTREAM-PEEK-0000000025
      <-- KSTREAM-PEEK-0000000023
    Processor: KSTREAM-PEEK-0000000025 (stores: [])
      --> KSTREAM-FILTER-0000000027
      <-- KSTREAM-MAP-0000000024
    Processor: KSTREAM-FILTER-0000000027 (stores: [])
      --> KSTREAM-SINK-0000000026
      <-- KSTREAM-PEEK-0000000025
    Source: KSTREAM-SOURCE-0000000015 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000016
    Sink: KSTREAM-SINK-0000000026 (topic: KSTREAM-PEEK-0000000025-repartition)
      <-- KSTREAM-FILTER-0000000027
    Processor: KTABLE-SOURCE-0000000016 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> none
      <-- KSTREAM-SOURCE-0000000015

这是第二步,

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-PEEK-0000000014-repartition])
      --> KSTREAM-JOIN-0000000018
    Processor: KSTREAM-JOIN-0000000018 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> KSTREAM-FILTER-0000000019
      <-- KSTREAM-SOURCE-0000000017
    Processor: KSTREAM-FILTER-0000000019 (stores: [])
      --> KSTREAM-SINK-0000000020
      <-- KSTREAM-JOIN-0000000018
    Source: KSTREAM-SOURCE-0000000001 (topics: [appconnect_push_processing_submissions])
      --> KTABLE-SOURCE-0000000002
    Sink: KSTREAM-SINK-0000000020 (topic: appconnect_push_send_bulk)
      <-- KSTREAM-FILTER-0000000019
    Processor: KTABLE-SOURCE-0000000002 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> none
      <-- KSTREAM-SOURCE-0000000001

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000003 (topics: [appconnect_devices_exported_for_push])
      --> KSTREAM-MAP-0000000007
    Processor: KSTREAM-MAP-0000000007 (stores: [])
      --> KSTREAM-PEEK-0000000008
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-PEEK-0000000008 (stores: [])
      --> KSTREAM-FILTER-0000000010
      <-- KSTREAM-MAP-0000000007
    Processor: KSTREAM-FILTER-0000000010 (stores: [])
      --> KSTREAM-SINK-0000000009
      <-- KSTREAM-PEEK-0000000008
    Sink: KSTREAM-SINK-0000000009 (topic: KSTREAM-PEEK-0000000008-repartition)
      <-- KSTREAM-FILTER-0000000010

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000011 (topics: [KSTREAM-PEEK-0000000008-repartition])
      --> KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-LEFTJOIN-0000000012 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> KSTREAM-KEY-SELECT-0000000013
      <-- KSTREAM-SOURCE-0000000011
    Processor: KSTREAM-KEY-SELECT-0000000013 (stores: [])
      --> KSTREAM-PEEK-0000000014
      <-- KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-PEEK-0000000014 (stores: [])
      --> KSTREAM-FILTER-0000000016
      <-- KSTREAM-KEY-SELECT-0000000013
    Processor: KSTREAM-FILTER-0000000016 (stores: [])
      --> KSTREAM-SINK-0000000015
      <-- KSTREAM-PEEK-0000000014
    Source: KSTREAM-SOURCE-0000000005 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000006
    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-PEEK-0000000014-repartition)
      <-- KSTREAM-FILTER-0000000016
    Processor: KTABLE-SOURCE-0000000006 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> none
      <-- KSTREAM-SOURCE-0000000005

而且我所有的这些操作都使用相同的 KEY.我有 5 个代理和 50 个分区用于所有主题.我有 10 个并发,我将我的应用程序扩展到 5.但是就像我说的那样,我正在对同一个键进行 3-4 次重新分区和传输数据.这意味着我所有与 flatMap 相关的值,映射操作都转到同一个分区.1 或 2 次我使用不同的密钥,因此消息分发到不同的分区,只有 1-2 次.这会影响我的表现吗?或者我绝对应该分布在不同的分区上以提高我的性能.

And all of my these operations use same KEY. I have 5 brokers and 50 partitions for all topics. I have 10 concurrency and I scaled my app to 5. But like I said I am doing repartition and transfer data 3-4 times on a same key. That means all my values related flatMap, map operations go to same partition. 1 or 2 times I am using different key so messages distributed to different partitions, just 1-2 times. Does this affect my performance? Or I should definitely distribute on different partitions to increase my performance.

所以基本上是 kafka 在仅使用主题之间的分区执行 3-4 次连接或重新分区操作时表现出更好的性能,因为 kafka 将仅从一个分区读取,并且实际上知道从哪里读取并立即读取所有数据因为磁盘上的数据物理并行(我的意思是 ssd 或 hdd).或者我的第二种情况;我肯定应该使用更多的分区来在分区之间并行读取?

So basically is kafka showing better performance when performing join or repartition operation with 3-4 times using only a partition between the topics, because kafka will read from only and only a partition and actually knows where to read and read immediately all the data because the data on the physically parallel on the disk (I mean ssd or hdd). Or my second scenario; I should definitely use more partitions to read parallel between the partitions?

而且我还认为使用 peek 会减慢我的进程.

And I also think that using peek slows my process.

推荐答案

peek() 操作是无关的.查看您发布的程序(部分)的拓扑描述如下:

The peek() operation is unrelated. Looking at the topology description you posted you program (partly) is as follows:

KStream inputUser = builder.stream().flatMap().peek().filter();
KStream inputDevice = builder.stream().flatMap().peek().filter();
inputUser.join(inputDevice,...)

(如果您也将代码发布在问题中会更容易).

(Would be easier if you would post your code in the question, too).

因为您调用 flatMap() Kafka Streams 假定您更改了密钥,因此调用 join() 会触发数据重新分区.重新分区主题名称由上游操作员生成(我不是 100% 确定为什么选择 PEEK 而不是 FILTER 是公平的.)

Because you call flatMap() Kafka Streams assumes that you change the key, and hence, calling join() triggers the data repartitioning. The repartition topic name is generated by upstream operatore (I am not 100% sure why PEEK is picked instead of FILTER to be fair.)

而且我所有的这些操作都使用相同的 KEY.

对于这种情况,您可能需要使用 flatMapValues() 而不是 flatMap().对于这种情况,Kafka Streams 知道密钥没有改变,因此不会创建重新分区主题.

For this case, you might want to use flatMapValues() instead of flatMap(). For this case, Kafka Streams knows that the key did not change and thus it would not create a repartition topic.

同样,如果键没有更改,您可能希望使用 mapValues() 而不是 map() 以避免不必要的重新分区.

Similarly, you might want to use mapValues() instead of map() if the key does not change to avoid unnecessary repartitioning.

我的问题是我可以从主题topic read push-processing-KSTREAM-PEEK-0000000014-repartition"中读取,但是当我说topic read push-processing-KSTREAM-PEEK-0000000014-repartition --从头开始"

我不确定您的意思.什么

I am not sure what you mean by this. What does

当我说topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"

是什么意思?你是指命令行工具bin/kafka-consumer.sh吗?一般来说,是的,您可以从重新分区主题中读取,但我不确定为什么这会有用?

mean? Do you refer to the command line tool bin/kafka-consumer.sh? In general, yes, you can read from a repartition topic, but I am not sure why this would be useful?

这篇关于从 kafka 流中阅读 peek 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-13 20:39