由于我使用的是kafka_2.10-0.10.0.1,需要下载对应版本的kafka-clients-0.10.1.1.jar包
生产数据KafkaProducerEx:
1 package test.KafkaTest 2 3 import java.util.Properties 4 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} 5 import org.apache.kafka.clients.consumer 6 import org.apache.kafka.clients.consumer.ConsumerConfig 7 import org.apache.kafka.common.serialization 8 13 object KafkaProducerEx { 14 def main(args:Array[String]): Unit= { 15 val topic = "test" 16 val brokers = "localhost:9092" //Zookeeper地址,两个地址以逗号(,)分割 17 val props = new Properties() 18 props.put("bootstrap.servers", brokers) 19 props.put("acks","all") 20 props.put("retries","0") 21 props.put("batch.size","16384") 22 props.put("linger.ms","1") 23 props.put(ConsumerConfig.GROUP_ID_CONFIG,"test") 24 props.put("buffer.memory","33554432") 25 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 26 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 27 28 val producer = new KafkaProducer[String, String](props) 29 val t = System.currentTimeMillis() 30 val msg = "producer message::::::" 31 var i=0 32 for(i<-Range(1,1000)) 33 { 34 println(msg+i.toString()) 35 val record = new ProducerRecord[String, String](topic, "kafka_key", msg+i.toString()) 36 producer.send(record) 37 } 38 producer.close() 39 } 40 }
消费数据KafkaConsumerEx:
1 package test.KafkaTest 2 import java.util.{Collections, Properties} 3 4 import scala.collection.JavaConversions._ 5 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer} 6 import org.apache.kafka.common.serialization 7 8 object KafkaConsumerEx { 9 10 def main(args:Array[String]):Unit={ 11 val topic = "test" 12 val brokers = "localhost:9092" 13 val props = new Properties() 14 props.put("bootstrap.servers", brokers) 15 props.put("enable.auto.commit","true") 16 props.put("auto.commit.interval.ms","10000") 17 props.put(ConsumerConfig.GROUP_ID_CONFIG,"test") 18 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 19 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 20 val consumer = new KafkaConsumer[String, String](props) 21 consumer.subscribe(Collections.singleton(topic)) 22 while (true) 23 { 24 val records:ConsumerRecords[String,String] = consumer.poll(100) 25 for ( record <- records){ 26 println(record.value()) 27 } 28 } 29 consumer.close() 30 } 31 }