本文介绍了如何正确读取 Ignite Cache的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有以下应用程序(我对这个框架很陌生),我希望看到缓存大小(增加),因为它从队列中读取消息,但它始终保持为 0.
I have the following application (I'm quite new to this framework) and I'd like to see the cache size (increasing) as it reads messages from the queue but it stays 0 all the time.
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
Properties settings = new Properties();
// Set a few key parameters
settings.put("bootstrap.servers", "localhost:9092");
settings.put("group.id", "test");
settings.put("zookeeper.connect", "localhost:2181");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create an instance of StreamsConfig from the Properties instance
kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);
IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache");
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic("test");
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(config);
// set decoders
StringDecoder keyDecoder = new StringDecoder(null);
StringDecoder valueDecoder = new StringDecoder(null);
kafkaStreamer.setKeyDecoder(keyDecoder);
kafkaStreamer.setValueDecoder(valueDecoder);
kafkaStreamer.start();
while (true) {
System.out.println(cache.metrics().getSize());
Thread.sleep(200);
}
谁能说出遗漏/错误的地方?
Can anyone tell what is missing / wrong?
谢谢!
推荐答案
可能您没有使用足够的条目来填充 IgniteDataStreamer
缓冲区.尝试设置刷新超时:
Probably you don't consume enough entries to fill up IgniteDataStreamer
buffers. Try to set flush timeout:
stmr.autoFlushFrequency(1000);
这篇关于如何正确读取 Ignite Cache的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!