本文介绍了Kafka Streams限制堆外内存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在运行kafka流应用程序,并且频率出现了堆外内存问题.我们的应用程序已部署并使用kubernetes POD,并且它们不断重启.

We are running kafka streams applications and frequency running into off heap memory issues. Our applications are deployed and kubernetes PODs and they keep on restarting.

我正在做一些调查,发现可以通过实现RocksDBConfigSetter来限制堆外内存,如以下示例所示.

I am doing some investigation and found that we can limit the off heap memory by implementing RocksDBConfigSetter as shown in following example.

public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

  // See #1 below
  private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
  private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);

  @Override
  public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

    BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

    // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
    tableConfig.setBlockCache(cache);
    tableConfig.setCacheIndexAndFilterBlocks(true);
    options.setWriteBufferManager(writeBufferManager);

    // These options are recommended to be set when bounding the total memory
    // See #2 below
    tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
    tableConfig.setPinTopLevelIndexAndFilter(true);
    // See #3 below
    tableConfig.setBlockSize(BLOCK_SIZE);
    options.setMaxWriteBufferNumber(N_MEMTABLES);
    options.setWriteBufferSize(MEMTABLE_SIZE);

    options.setTableFormatConfig(tableConfig);
  }

  @Override
  public void close(final String storeName, final Options options) {
    // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
  }
}

在我们的应用程序中,我们具有6个分区的输入主题,并且大约有40个主题供我们使用数据.应用程序只有1种拓扑结构,这些拓扑结构消耗了这些主题,并将数据存储在状态存储中(用于dedup,外观和某些验证).因此,据我了解,kafka流应用程序将创建以下rocksdb实例,并且需要以下最大堆内存.如果我错了,请纠正我.

In our application, we have input topics with 6 partitions and there are about 40 topics from which we are consuming data. Out appplication has just 1 topology which consumes from these topics, stores the data in statestores ( for dedup, look and some verfication). So, as per my understanding, kafka streams application will create following rocksdb instances and will need following max off heap memory. Please correct me if i am wrong.

rocksdb实例总数(假设每个任务将创建自己的rocksdb实例)

6(partitions) * 40(topics) -> 240 rocksdb instances

已消耗的最大堆内存不足

 240 * ( 50 (Block cache)  + 16*3(memcache) + filters(unknown))
- 240 * ~110 MB
- 26400 MB
- 25 GB

似乎很多.计算正确吗?我知道实际上我们不应该达到这个最大数字,但是计算正确吗?

It seems to be a large number. Is the calculation correct? I know practically we should not hit this max number but is the calculation correct ?

此外,如果我们实现RocksDBConfigSetter并将最大堆外内存设置为4 GB.如果rocksdb请求更多的内存(因为它期望大约25 GB),应用程序会抱怨(崩溃OOM)吗?

Also, If we implement RocksDBConfigSetter and set the max off heap memory to 4 GB. Will the application complain(crash OOM) if rocksdb asks for more memory (since it is expecting about 25 GB) ?

更新:我将LRU减少到1GB,并且我的流应用开始抛出LRU完全异常

Update:I reduced LRU to 1GB and my streams app started throwing the LRU full exception

2021-02-07 23:20:47,443 15448195 [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:
org.apache.kafka.streams.errors.ProcessorStateException: stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] task [29_4] Exception caught while trying to restore state from dp-Corrigo-InvTreeObject-Store-changelog-4
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:425)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restoreChangelog(StoreChangelogReader.java:562)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:461)
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error restoring batch to store InvTreeObject-Store
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:647)
    at org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$0(StateRestoreCallbackAdapter.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:422)
    ... 6 common frames omitted
Caused by: org.rocksdb.RocksDBException: Insert failed due to LRU cache being full.
    at org.rocksdb.RocksDB.write0(Native Method)
    at org.rocksdb.RocksDB.write(RocksDB.java:806)
    at org.apache.kafka.streams.state.internals.RocksDBStore.write(RocksDBStore.java:439)
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:645)
    ... 8 common frames omitted

推荐答案

不确定您获得了多少个RocksDB实例.这取决于程序的结构.您应该签出 TopologyDescription (通过 Topology#describe()).子拓扑被实例化为任务(基于分区数),每个任务都有自己的RocksDB,以维护每个商店的总体状态碎片.

Not sure how many RocksDB instances you get. It depends on the structure of your program. You should check out TopologyDescription (via Topology#describe()). Sub-topologies are instantiated as tasks (base on number of partitions) and each task will have it's own RocksDB to maintain a shard of the overall state per store.

我建议您查看Kafka Summit演讲针对Kafka Streams州立存储的Performance Tuning RocksDB": https://videos.confluent.io/watch/Ud6dtEC3DMYEtmK3dMK5ci

I would recommend to check out the Kafka Summit talk "Performance Tuning RocksDB for Kafka Streams' State Store": https://videos.confluent.io/watch/Ud6dtEC3DMYEtmK3dMK5ci

它不会崩溃.RocksDB将溢出到磁盘.能够溢出到磁盘是默认情况下我们使用持久状态存储(而不是内存中状态存储)的原因.它允许保持大于主内存的状态.使用Kubernetes时,应将相应的卷附加到容器上并相应调整其大小(请参见 https://docs.confluent.io/platform/current/streams/sizing.html ).您可能还想观看Kafka Summit演讲使用Docker和Kubernetes部署Kafka Streams应用程序": https://www.confluent.io/kafka-summit-sf18/deploying-kafka-streams-applications/

It won't crash. RocksDB will spill to disk. Being able to spill to disk is the reason why we use a persistent state store (and not an in-memory state store) by default. It allows to hold state that is larger than main-memory. As you use Kubernetes, you should attach corresponding volumes to your containers and size them accordingly (cf https://docs.confluent.io/platform/current/streams/sizing.html). You might also want to watch Kafka Summit talk "Deploying Kafka Streams Applications with Docker and Kubernetes": https://www.confluent.io/kafka-summit-sf18/deploying-kafka-streams-applications/

如果状态大于主内存,则可能会希望监视RocksDB度量标准(如果遇到每个问题来调整不同的缓冲区").相应地: https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics

If state is larger than main-memory, you might also want to monitor RocksDB metrics if you run into per issues to tune the different "buffers" accordingly: https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics

这篇关于Kafka Streams限制堆外内存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-18 22:16