Kafka消息队列
Kafka消息队列
一、概念
Kafka是一个基于发布订阅者模式的消息队列,实现数据缓存、流量削峰等等功能。
在大数据生态下,Kafka主要是用来和实时计算框架对接去处理海量的实时数据
1.1 消息队列一般有两种架构
1.1.1 点对点模式
1.1.2 发布/订阅者模式
二、架构
2.1 生产者producer
生产消息、数据
Kafka的生产者,生产者是为了给topic的partition生产数据的,生产者可以是Flume,也可以是我们自定义的操作,还可以是Kafka自带的控制台生产者。 生产者生产的数据放到Kafka主题的哪个分区? 生产者生产的数据都是key和value格式类型的数据,只不过key值可以不存在。
-
如果生产的消息只有value,没有key,那么消息会采用轮询机制选址一个主题分区放入数据\
-
如果生产的消息key和value都有,但是没有指定分区机制,会按照key的hashcode值和topic的分区数取一个余数,放到对应的分区
-
如果生产的消息key和value都有。那么为了避免数据倾斜,我们可以自定义分区机制
2.2 Kafka集群cluster
2.2.1 Broker
Kafka集群的一个节点,每一个broker节点都会有一个唯一的编号
2.2.2 Topic
主题,就是消息队列中的消息队,一个Broker中可以存在多个主题,一个主题也可以存在于多个Broker上
Kafka中消息主题,就是消息队列,是Kafka用来存储消息的组件,topic中存放的数据是有序的
2.2.3 Partition
分区,每一个Topic主题都可以指定存储的分区数,一般情况下,一个Broker会存储一个主题的一个分区数据,而且每一个分区还可以设置副本数保证存储数据的安全性,分区和分区副本之间有一个主从架构关系。分区副本数不能随便设置,必须小于等于broker的数量。
Topic存放的消息最底层是以分区的形式存在的,Topic所谓的数据有序,不是整体有序,而是每一个分区内部是有序的。分区设置副本机制的,副本数量必须小于等于broker的节点数量 Kafka主题分区的数据不是永久存在的,而是有一个数据清理机制(基于时间的清理机制、基于分区数据大小的清理机制)
2.2.4 zookeeper
Kafka中主题、分区、消费者等等元数据信息都是交给zookeeper统一管理的
2.3 消费者Consumer
订阅主题,消费数据
消费者:消费数据的最小单位,但是一个消费者可以订阅多个topic的数据
2.4 消费者组Consumer Group
将多个消费者组合起来,同时消费同一个主题的数据
消费者组:一个消费者组可以有多个消费者,其中topic一个分区的数据只能被消费者组的一个消费者消费,如果我们想要让一个消费者消费topic所有分区的数据,那么我们需要保证消费者组中只有一个消费者。
三、Kafka的安装
3.1 解压、重命名
3.2 修改Broker配置文件
server.properties
3.2.1 启用删除主题的功能
3.2.2、logs日志文件目录的配置
3.2.3、分区日志文件的滚动和删除规则
3.2.4、broker的编号
3.2.5、配置zookeeper的地址
四、Kafka的启动和关闭
4.1 启动
4.1.1 启动zookeeper
4.1.2 启动kafka
kafka-server-start.sh /opt/xxxx/server.properties &
4.2 关闭
kafka-server-stop.sh
五、kafka的基本使用
5.1 Kafka的命令行操作方式
5.1.1 主题的操作
5.1.1.1 创建主题
kafka-topic.sh --create --topic topicName --partitions num --replication-factor num<=borkerCount --zookeeper zkserverxxx
5.1.1.2删除主题
kafka-topic.sh --delete --topic topicName --zookeeper xxx 必须开启主题的删除功能
5.1.1.3修改主题
kafka-topic.sh --alter --topic topicName --partitions num --zookeeper xxx 主题分区数一般只能增加
5.1.1.4查询某个主题的详细信息
kafka-topic.sh --describe --topic topicName --zookeeper xxx
5.1.1.5查询所有的主题
kafka-topic.sh --list --zookeeper xxx
5.1.2 生产者的操作
kafka-console-producer.sh --bootstrap-server ip:9092,ip:9092 --topic topicName
5.1.3 消费者的操作
kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic topicName --group groupName
5.2 Kafa的Java API操作方法
5.2.1 主题的操作
主要通过AdminClient
类来实现kafka的各种操作
创建AdminClient类需要写一个配置项 bootstrap.servers
5.2.2 生产者的操作
生产者:KafkaProducer ProducerRecord
生产者创建需要赋予一些参数:参数的key都是在ProducerConfig类中封装的
5.2.3 消费者的操作
消费者:KafkaConsumer ConsumerRecords ConsumerRecord
消费者创建需要赋予一些参数:参数的key都是ConsumerConfig类中封装的
5.2.4 earliest、latest区别
六、kafka的可视化监控工具
offset explorer
kafka eagle
七、kafka和Flume的整合
后期我们在做实时计算的时候,我们经常会做如下操作,会通过Flume采集相关数据到Kafka中缓存,然后再使用实时计算框架对接kafka进行计算。
Flume采集的数据给kafka,那么此时也就意味着Flume就相当于是Kafka的生产者,kafka相当于是Flume的sink下沉地
kafka除了当作flume的sink,也可以充当flume的source和channel
八、Kafka和Spark Streaming的整合
通过Spark Streaming消费Kafka中的数据,然后对数据进行实时计算处理
8.1 Spark Streaming整合Kafka有两种方式
8.1.1 Receiver模式
采用一个Reciver接受器去接受Kafka的数据,然后数据缓存到Spark的executor内存中,这种方式很容易出现数据丢失问题,如果想要实现数据的安全性,需要开启Spark的 WAL预写日志机制保证数据的安全性 receiver模式连接的zookooper实现
8.1.2 Direct模式
不需要接收器,直接连接Kafka节点获取数据,同时由Spark自动维护offset偏移量,此时我们不需要担心数据丢失。
8.2 整合步骤
8.2.1 引入一个编程依赖
spark-streaming-kafka-0.10/0.8
在Spark3版本之后,在KafkaUtils中把receiver模式移除了
九、整合案例的流程
想使用Flume采集端口的数据(以空格分割的单词)到kafka的某个主题中,然后借助Spark Streaming统计端口数据中每一个单词出现的总次数。
9.1 分析
9.1.1 编写Flume脚本
编写Flume脚本采集端口数据到Kafka中 sink指定到kafka即可,flume充当kafka的生产者
9.1.2 整合Spark Streaming
整合Spark Streaming代码读取kafka中的数据,此时Spark相当于kafka的消费者
十、相关代码
#1、起别名
demo.sources=s1
demo.channels=c1
demo.sinks=k1
#2、配置source数据源
demo.sources.s1.type=netcat
demo.sources.s1.bind=single
demo.sources.s1.port=44444
#3、配置channel管道
demo.channels.c1.type=memory
demo.channels.c1.capacity=20000
demo.channels.c1.transactionCapacity=10000
#4、配置sink下沉地
demo.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
demo.sinks.k1.kafka.bootstrap.servers=single:9092
demo.sinks.k1.kafka.topic=flume-topic
# 5、关联
demo.sources.s1.channels=c1
demo.sinks.k1.channel=c1
a.sources=s1
a.channels=c1
a.sinks=k1
a.sources.s1.type=org.apache.flume.source.kafka.KafkaSource
a.sources.s1.kafka.bootstrap.servers=single:9092
a.sources.s1.kafka.consumer.group.id=flume
a.sources.s1.kafka.topics=flume-topic
a.channels.c1.type=memory
a.channels.c1.capacity=20000
a.channels.c1.transactionCapacity=10000
a.sinks.k1.type=logger
a.sources.s1.channels=c1
a.sinks.k1.channel=c1