一、环境说明
centos7(vm) + JDK1.8 + zookeeper3.5.5 + kafka2.11-2.3.1
下载JDK 8解压并安装,假设安装之后的目录为/usr/local/java/jdk1.8.0_231,以下的操作以此为前提
二、安装zookeeper
下载zookeeper并解压
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz tar -zxf apache-zookeeper-3.5.5-bin.tar.gz mv apache-zookeeper-3.5.5-bin zookeeper
配置
创建数据存放目录:mkdir -p /var/lib/zookeeper
使用基本的配置参数创建zoo.cfgcat > /usr/local/zookeeper/conf/zoo.cfg << EOF tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 EOF
启动zookeeper
export JAVA_HOME=/usr/local/java/jdk1.8.0_231 /usr/local/zookeeper/bin/zkServer.sh start
三、安装kafka
可以在官网的下载页面 http://kafka.apache.org/downloads.html 下载需要安装的版本;这里使用的是kafka2.11-2.3.1
- 解压
tar -zxf kafka_2.11-2.3.1.tgz mv kafka_2.11-0.9.0.1 /usr/local/kafka mkdir /tmp/kafka-logs
- 启动
如果想查看启动情况,可以把-daemon去掉,但去掉该参数之后,命令终端如果关闭,kafka也会随之关闭。export JAVA_HOME=/usr/java/jdk1.8.0_231 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 关闭
kafka启动后如果需要关闭,可以通过以下命令关闭/usr/local/kafka/bin/kafka-server-stop.sh
四、生产者示例
新建一个maven工程,引入如下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
创建生产者:
public class MessageProducer {
private static Properties kafkaProps;
private static Producer<String, String> kafkaProducer;
static{
kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.254.131:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(kafkaProps);
}
/**
* 一、发送并忘记(不关心消息是否正常到达)
* @param producerRecord
*/
public void sendMsgAndForget(ProducerRecord<String, String> producerRecord){
kafkaProducer.send(producerRecord);
}
/**
* 二、同步发送(等待返回Future对象)
* @param producerRecord
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public RecordMetadata sendSynMsg(ProducerRecord<String, String> producerRecord) throws ExecutionException, InterruptedException {
RecordMetadata metaData = kafkaProducer.send(producerRecord).get();
return metaData;
}
/**
* 三、异步发送(指定回调函数,服务器在返回响应时调用该函数)
* @param producerRecord
*/
public void sendAsynMsg(ProducerRecord<String, String> producerRecord){
kafkaProducer.send(producerRecord, new ProducerCallback());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MessageProducer messageProducer = new MessageProducer();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for(int i=0; i < 10; i++){
executorService.submit(new Runnable() {
@Override
public void run() {
while(true){
Random random = new Random();
int randNum = random.nextInt(3)%3 + 1;
ProducerRecord<String, String> record = null;
switch (randNum){
case 1 :
record = new ProducerRecord<String, String>("test.topic", "smaf", "send and forget");
messageProducer.sendMsgAndForget(record);
break;
case 2 :
record = new ProducerRecord<String, String>("test.topic", "send", "send");
try {
messageProducer.sendSynMsg(record);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case 3:
record = new ProducerRecord<String, String>("test.topic", "sendAsyn", "send asyn");
messageProducer.sendAsynMsg(record);
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}
}
/**
* 回调处理类
*/
class ProducerCallback implements Callback {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//回调处理逻辑
if(null != e){
e.printStackTrace();
}
}
}
五、消费者示例
创建消费者
public class MessageConsumer {
private static Properties kafkaProps;
private static Consumer<String, String> kafkaConsumer;
static{
kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.254.131:9092");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("group.id", "testGroup");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);
}
public void consumeMessage(String topic) throws InterruptedException {
kafkaConsumer.subscribe(Collections.singletonList(topic));
Duration duration = Duration.ofSeconds(10l);
while(true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);
System.out.println("new messages:");
if(records.count()==0) System.out.println("empty");
for(ConsumerRecord<String, String> record : records){
System.out.printf("topic=%s,partition=%s,key=%s,value=%s\n",record.topic(), record.partition(), record.key(), record.value());
}
}
}
public static void main(String[] args) throws InterruptedException {
MessageConsumer messageConsumer = new MessageConsumer();
messageConsumer.consumeMessage("test.topic");
}
}
六、注意
如果遇到连接超时的问题:
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test.topic not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1269)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:933)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:743)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test.topic not present in metadata after 60000 ms.
可以通过以下方式处理:
查看防火墙是否开启
firewall-cmd --state
查看9092端口是否对外开放
firewall-cmd --list-ports
如果没有对外开放,使用命令开放端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload