本文介绍了Kafka Streams 限制堆外内存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在运行 kafka 流应用程序,并且频率遇到了堆外内存问题.我们的应用程序已部署并在 kubernetes POD 中不断重启.

We are running kafka streams applications and frequency running into off heap memory issues. Our applications are deployed and kubernetes PODs and they keep on restarting.

我正在做一些调查,发现我们可以通过实现 RocksDBConfigSetter 来限制堆外内存,如下例所示.

I am doing some investigation and found that we can limit the off heap memory by implementing RocksDBConfigSetter as shown in following example.

public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

  // See #1 below
  private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
  private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);

  @Override
  public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

    BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

    // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
    tableConfig.setBlockCache(cache);
    tableConfig.setCacheIndexAndFilterBlocks(true);
    options.setWriteBufferManager(writeBufferManager);

    // These options are recommended to be set when bounding the total memory
    // See #2 below
    tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
    tableConfig.setPinTopLevelIndexAndFilter(true);
    // See #3 below
    tableConfig.setBlockSize(BLOCK_SIZE);
    options.setMaxWriteBufferNumber(N_MEMTABLES);
    options.setWriteBufferSize(MEMTABLE_SIZE);

    options.setTableFormatConfig(tableConfig);
  }

  @Override
  public void close(final String storeName, final Options options) {
    // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
  }
}

在我们的应用程序中,我们有 6 个分区的输入主题,大约有 40 个主题从中消费数据.Out 应用程序只有 1 个拓扑,它从这些主题中消费,将数据存储在状态存储中(用于重复数据删除、查找和一些验证).因此,根据我的理解,kafka 流应用程序将创建以下 Rocksdb 实例,并且需要以下最大堆内存.如果我错了,请纠正我.

In our application, we have input topics with 6 partitions and there are about 40 topics from which we are consuming data. Out appplication has just 1 topology which consumes from these topics, stores the data in statestores ( for dedup, look and some verfication). So, as per my understanding, kafka streams application will create following rocksdb instances and will need following max off heap memory. Please correct me if i am wrong.

rocksdb 实例总数(假设每个任务都会创建自己的rocksdb 实例)

6(partitions) * 40(topics) -> 240 rocksdb instances

消耗的最大堆外内存

 240 * ( 50 (Block cache)  + 16*3(memcache) + filters(unknown))
- 240 * ~110 MB
- 26400 MB
- 25 GB

这似乎是一个很大的数字.计算是否正确?我知道实际上我们不应该达到这个最大数量,但计算是否正确?

It seems to be a large number. Is the calculation correct? I know practically we should not hit this max number but is the calculation correct ?

此外,如果我们实现 RocksDBConfigSetter 并将最大关闭堆内存设置为 4 GB.如果rocksdb 要求更多内存(因为它期望大约25 GB),应用程序是否会抱怨(崩溃OOM)?

Also, If we implement RocksDBConfigSetter and set the max off heap memory to 4 GB. Will the application complain(crash OOM) if rocksdb asks for more memory (since it is expecting about 25 GB) ?

更新:我将 LRU 减少到 1GB,我的流应用程序开始抛出 LRU 完整异常

Update:I reduced LRU to 1GB and my streams app started throwing the LRU full exception

2021-02-07 23:20:47,443 15448195 [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:
org.apache.kafka.streams.errors.ProcessorStateException: stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] task [29_4] Exception caught while trying to restore state from dp-Corrigo-InvTreeObject-Store-changelog-4
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:425)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restoreChangelog(StoreChangelogReader.java:562)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:461)
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error restoring batch to store InvTreeObject-Store
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:647)
    at org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$0(StateRestoreCallbackAdapter.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:422)
    ... 6 common frames omitted
Caused by: org.rocksdb.RocksDBException: Insert failed due to LRU cache being full.
    at org.rocksdb.RocksDB.write0(Native Method)
    at org.rocksdb.RocksDB.write(RocksDB.java:806)
    at org.apache.kafka.streams.state.internals.RocksDBStore.write(RocksDBStore.java:439)
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:645)
    ... 8 common frames omitted

推荐答案

不确定您获得了多少 RocksDB 实例.这取决于您的程序结构.您应该查看TopologyDescription(通过Topology#describe()).子拓扑被实例化为任务(基于分区数量),每个任务都有自己的 RocksDB 来维护每个存储的整体状态的分片.

Not sure how many RocksDB instances you get. It depends on the structure of your program. You should check out TopologyDescription (via Topology#describe()). Sub-topologies are instantiated as tasks (base on number of partitions) and each task will have it's own RocksDB to maintain a shard of the overall state per store.

我建议查看 Kafka 峰会演讲用于 Kafka Streams 状态存储的性能调优 RocksDB":https://videos.confluent.io/watch/Ud6dtEC3DMYEtmK3dMK5ci

I would recommend to check out the Kafka Summit talk "Performance Tuning RocksDB for Kafka Streams' State Store": https://videos.confluent.io/watch/Ud6dtEC3DMYEtmK3dMK5ci

此外,如果我们实现 RocksDBConfigSetter 并将最大关闭堆内存设置为 4 GB.如果rocksdb 要求更多内存(因为它期望大约25 GB),应用程序是否会抱怨(崩溃OOM)?

它不会崩溃.RocksDB 将溢出到磁盘.能够溢出到磁盘是我们默认使用持久状态存储(而不是内存状态存储)的原因.它允许保持大于主内存的状态.当您使用 Kubernetes 时,您应该将相应的卷附加到您的容器并相应地调整它们的大小(参见 https://docs.confluent.io/platform/current/streams/sizing.html).您可能还想观看 Kafka 峰会演讲使用 Docker 和 Kubernetes 部署 Kafka Streams 应用程序":https://www.confluent.io/kafka-summit-sf18/deploying-kafka-streams-applications/

It won't crash. RocksDB will spill to disk. Being able to spill to disk is the reason why we use a persistent state store (and not an in-memory state store) by default. It allows to hold state that is larger than main-memory. As you use Kubernetes, you should attach corresponding volumes to your containers and size them accordingly (cf https://docs.confluent.io/platform/current/streams/sizing.html). You might also want to watch Kafka Summit talk "Deploying Kafka Streams Applications with Docker and Kubernetes": https://www.confluent.io/kafka-summit-sf18/deploying-kafka-streams-applications/

如果状态大于主内存,如果遇到每个问题来调整不同的缓冲区",您可能还需要监控 RocksDB 指标.相应地:https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics

If state is larger than main-memory, you might also want to monitor RocksDB metrics if you run into per issues to tune the different "buffers" accordingly: https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics

这篇关于Kafka Streams 限制堆外内存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-18 22:17