zookeeper的启动脚本:
#!/bin/sh
echo "start zookeeper server..."
hosts="hadoop0300 hadoop0301 hadoop0302"
for host in $hosts
do
ssh $host "source /etc/profile; /root/app/zookeeper-3.4.7/bin/zkServer.sh start"
done
kafka的启动脚本:
#!/bin/bash
for host in hadoop0300 hadoop0301 hadoop0302
do
echo $host
ssh root@$host "source /etc/profile;/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties"
done
//时间同步
ntpdate -u ntp.api.bz
//启动kafka
/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties
//创建一个topci为test
/usr/local/kafka_2.11-0.9.0.1/bin./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
//查看当前集群里面所有的topic
/usr/local/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --list --zookeeper 192.168.88.130:2181
//通过shell命令发送消息(模拟生产者)
/usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list 192.168.88.130:9092 --topic test
//通过shell消费消息(模拟消费者,另一客户端)
/usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper 192.168.88.130:2181 --from-beginning --topic test
//如果报的是下面的错
kafka.common.FailedToSendMessageException Failed to send messages after 3 tries
解决:将server.properties里面的host.name该为自己的ip地址
ProducerDemo模拟生产者:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
//创建producer配置信息,发到哪里去
Properties pro = new Properties();
//指定消息发送到kafka集群
pro.put("metadata.broker.list","192.168.88.130:9092,192.168.88.131:9092,192.168.88.132:9092");
//指定消息序列化方式
pro.put("serializer.class","kafka.serializer.StringEncoder");
//配置信息包装
ProducerConfig config = new ProducerConfig(pro);
//1.创建producer
Producer<String,String> producer = new Producer<String, String>(config);
for (int i = 0; i <= 100; i++) {
producer.send(new KeyedMessage<String,String>("test","message"+i));
}
}
}
ConsumerDemo模拟消费者:
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class ConsumerDemo {
//指定消费的主题(哪个类别的消息)
private static final String topic = "test";
//指定线程个数
private static final Integer thread = 2;
public static void main(String[] args) {
//创建消费者的配置信息
Properties pro = new Properties();
//指定连接zookeeper的消息
pro.put("zookeeper.connect","192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181");
//消费者是以组的形式消费,指定消费组信息
pro.put("group.id","testGroup");
//配置消费消息的开始位置,从偏移量为0的开始消费,smallest代表从topic的第一条消息开始消费
//对应的largest:代表从我们的消费者启动之后该topic下新产生的消息开始消费
pro.put("auto.offset.reset","smallest");
//
ConsumerConfig config = new ConsumerConfig(pro);
//创建消费者
kafka.javaapi.consumer.ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
//消费者可以消费多个topic数据,创建一个map存放top信息
Map<String,Integer> topicMaps = new HashMap<String,Integer>();
topicMaps.put(topic,thread);
//创建信息流
Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap=
consumer.createMessageStreams(topicMaps);
//获取topic信息
List<KafkaStream<byte[],byte[]>> kafkaStreams = consumerMap.get(topic);
//一直循环kafka拉取消息
for(final KafkaStream<byte[],byte[]> kafkaStream: kafkaStreams){
//创建一个线程,消费消息
new Thread(new Runnable() {
@Override
public void run() {
//循环读取每一条消息
for(MessageAndMetadata<byte[],byte[]> msg:kafkaStream){
//读到一条消息
String message =new String(msg.message());
System.out.println(message);
}
}
}).start();
}
}
}