环境搭建:从零开始——Windows 环境下 Kafka 集群的 Docker 安装与配置全指南


本博客详细介绍了如何在 Windows 环境下使用 Docker 安装和配置 Kafka 集群。通过提供具体的操作步骤和命令示例,从零开始帮助用户配置 Docker、启动 Kafka 集群、并在 Spring Boot 应用程序中使用 Kafka 进行消息传递。该指南包括 Docker 安装、Kafka 集群搭建、启动 Kafka 服务、创建主题、配置 Spring Boot 应用以及测试 Kafka 功能。无论是新手还是有经验的开发者,都可以通过本指南轻松掌握 Kafka 在 Docker 环境下的配置和使用。

文章目录

1. Docker 安装

1.1 Windows 安装 Docker

(1) 环境及工具(点击下载)
  • Docker Desktop Installer.exe(Windows 环境下运行 Docker 的一款产品):点击下载
  • wsl_update_x64(Linux 内核包):点击下载
(2) 查看 Windows 相关配置
  • 打开任务管理器(CTRL+SHIFT+ESC)-> 选择性能 -> CPU -> 确认是否显示虚拟化已启用。
(3) 开启 Hyper-V

① 在控制面板打开程序,然后点击启动或关闭 Windows 功能:

  • 勾选 Hyper-V 选项并点击确定。

② 如果未找到 Hyper-V,可以按照以下步骤操作:

  • 桌面创建一个 Hyper-V.bat 文件,并将以下代码粘贴到文件中:
    pushd "%~dp0"
    dir /b %SystemRoot%\servicing\Packages\*Hyper-V*.mum >hyper-v.txt
    for /f %%i in ('findstr /i . hyper-v.txt 2^>nul') do dism /online /norestart /add-package:"%SystemRoot%\servicing\Packages\%%i"
    del hyper-v.txt
    Dism /online /enable-feature /featurename:Microsoft-Hyper-V-All /LimitAccess /ALL
    
  • 右键用管理员身份运行此文件。
(4) 开始安装

① 如果使用的是较旧版本的 Windows 10,可能需要手动安装 wsl_update_X64.msi

  • 下载并运行 wsl_update_X64.msi 安装包。

② 对于较新版本的 Windows 10及以上,可以直接使用 PowerShell 命令:

  • 打开 PowerShell,并运行以下命令安装 WSL:
    wsl --install
    

③ 安装完 WSL 后,开始安装 Docker Desktop:

  • 下载并运行 Docker Desktop 安装程序。
  • 按照安装向导完成 Docker Desktop 的安装。

④ 配置国内镜像加速器:

  • 打开 Docker Desktop 设置:
    • 单击系统托盘中的 Docker 图标,选择“Settings”或“Preferences”。
  • 选择“Docker Engine”或“Daemon”,在 registry-mirrors 字段中添加国内的镜像加速器 URL。例如:
       {
       "builder": {
           "gc": {
           "defaultKeepStorage": "20GB",
           "enabled": true
           }
       },
       "experimental": false,
       "features": {
           "buildkit": true
       },
       "registry-mirrors": [
           "http://hub-mirror.c.163.com",
           "https://mirror.baidubce.com",
           "https://registry.docker-cn.com",
           "https://hub.uuuadc.top",
           "https://docker.anyhub.us.kg",
           "https://dockerhub.jobcher.com",
           "https://dockerhub.icu",
           "https://docker.ckyl.me"
       ]
       }
    
  • 保存设置并重启 Docker。
(5) 查看 Docker 是否安装成功
(5) 查看 Docker 是否安装成功

① 打开终端或命令提示符,运行以下命令查看 Docker 版本:

docker --version

示例结果:

Docker version 20.10.8, build 3967b7d

说明:如果正确安装了 Docker,您将看到版本号和构建信息。

② 查看 Docker 服务状态:

docker ps

示例结果:

CONTAINER ID   IMAGE     COMMAND   CREATED   STATUS    PORTS     NAMES

说明:此命令显示正在运行的 Docker 容器列表。如果没有容器在运行,列表将为空。

(6) 常用 Docker 命令

① 查看 Docker 版本:

docker --version

示例结果:

Docker version 20.10.8, build 3967b7d

说明:这将返回当前安装的 Docker 版本信息。

② 查看运行中的容器:

docker ps

示例结果:

CONTAINER ID   IMAGE     COMMAND   CREATED   STATUS    PORTS     NAMES

说明:列出所有当前运行的容器。如果没有运行的容器,列表将为空。

③ 查看所有容器(包括已停止的容器):

docker ps -a

示例结果:

CONTAINER ID   IMAGE         COMMAND                  CREATED        STATUS                    PORTS     NAMES
e5bc1c1d6b78   mysql:latest  "docker-entrypoint.s…"   2 hours ago    Exited (0) 30 minutes ago            mysql

说明:显示所有容器,包括运行中的和已停止的容器。您可以看到每个容器的状态和名称。

④ 启动 Docker 服务:

systemctl start docker

示例结果:

(无输出表示成功启动)

说明:此命令在系统中启动 Docker 服务。如果成功启动,命令将没有任何输出。

⑤ 设置 Docker 服务开机自启动:

systemctl enable docker

示例结果:

Created symlink from /etc/systemd/system/multi-user.target.wants/docker.service to /usr/lib/systemd/system/docker.service.

说明:此命令配置 Docker 服务在系统启动时自动启动。如果成功,您将看到符号链接创建的确认消息。

⑥ 搜索 MySQL 镜像:

docker search mysql

示例结果:

NAME                                      DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
mysql                                     MySQL is a widely used, open-source relation…   10913     [OK]
mariadb                                   MariaDB is a community-developed fork of MyS…   4234      [OK]

说明:显示 Docker Hub 上与 MySQL 相关的镜像列表,包括描述和星级评分。

⑦ 运行 MySQL 容器:

docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -d mysql:latest

示例结果:

7d5d6f8e7c9b81d845b7bfa1e738e8f7a6871e123b4c6d217c905e7890a5e8e5

说明:启动一个新的 MySQL 容器,并设置 root 用户的密码。如果成功启动,您将看到新容器的 ID。

⑧ 进入容器:

docker exec -it <container_id> bash

示例结果:

root@7d5d6f8e7c9b:/#

说明:进入指定容器的终端会话,您可以在容器内执行命令。

2. Kafka 集群搭建

2.1 准备 docker-compose.yml 文件

  • 创建一个新的文件 docker-compose.yml,并将配置粘贴进去:
version: '3.8'  # 定义 Docker Compose 文件的版本
# version 这里可以删除
services:  # 定义一组服务
  zoo1:  # 定义第一个 Zookeeper 容器
    image: confluentinc/cp-zookeeper:7.3.2  # 使用 Confluent 提供的 Zookeeper 镜像,版本为 7.3.2
    hostname: zoo1  # 设置容器主机名为 zoo1
    container_name: zoo1  # 设置容器名称为 zoo1
    ports:
      - "2181:2181"  # 将主机的 2181 端口映射到容器的 2181 端口
    environment:  # 定义环境变量
      ZOOKEEPER_CLIENT_PORT: 2181  # 设置 Zookeeper 客户端连接端口为 2181
      ZOOKEEPER_SERVER_ID: 1  # 设置 Zookeeper 服务器 ID 为 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888  # 定义 Zookeeper 集群的服务器列表
    volumes:  # 定义数据卷,将主机目录映射到容器目录
      - ./data/zookeeper/zoo1/data:/data  # 将主机的 ./data/zookeeper/zoo1/data 映射到容器的 /data 目录
      - ./data/zookeeper/zoo1/datalog:/datalog  # 将主机的 ./data/zookeeper/zoo1/datalog 映射到容器的 /datalog 目录

  zoo2:  # 定义第二个 Zookeeper 容器
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo2
    container_name: zoo2
    ports:
      - "2182:2182"  # 将主机的 2182 端口映射到容器的 2182 端口
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo2/data:/data
      - ./data/zookeeper/zoo2/datalog:/datalog

  zoo3:  # 定义第三个 Zookeeper 容器
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo3
    container_name: zoo3
    ports:
      - "2183:2183"  # 将主机的 2183 端口映射到容器的 2183 端口
    environment:
      ZOOKEEPER_CLIENT_PORT: 2183
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo3/data:/data
      - ./data/zookeeper/zoo3/datalog:/datalog

  kafka1:  # 定义第一个 Kafka 容器
    image: confluentinc/cp-kafka:7.3.2  # 使用 Confluent 提供的 Kafka 镜像,版本为 7.3.2
    hostname: kafka1  # 设置容器主机名为 kafka1
    container_name: kafka1  # 设置容器名称为 kafka1
    ports:
      - "9092:9092"  # 将主机的 9092 端口映射到容器的 9092 端口
      - "29092:29092"  # 将主机的 29092 端口映射到容器的 29092 端口
    environment:  # 定义环境变量
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092  # 定义 Kafka 的监听地址
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT  # 定义 Kafka 的安全协议映射
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL  # 定义 Kafka 内部通信的监听器名称
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"  # 定义 Zookeeper 集群的连接地址
      KAFKA_BROKER_ID: 1  # 设置 Kafka Broker 的 ID 为 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"  # 定义 Kafka 的日志级别
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer  # 设置 Kafka 的授权器类名
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"  # 设置当没有找到 ACL 时是否允许所有人访问
    volumes:  # 定义数据卷,将主机目录映射到容器目录
      - ./data/kafka_data1:/kafka/data  # 将主机的 ./data/kafka_data1 映射到容器的 /kafka/data 目录
    depends_on:  # 定义服务依赖关系
      - zoo1
      - zoo2
      - zoo3  # 在启动 kafka1 容器前,确保 zoo1、zoo2 和 zoo3 容器已经启动

  kafka2:  # 定义第二个 Kafka 容器
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"  # 将主机的 9093 端口映射到容器的 9093 端口
      - "29093:29093"  # 将主机的 29093 端口映射到容器的 29093 端口
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 2  # 设置 Kafka Broker 的 ID 为 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data2:/kafka/data
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka3:  # 定义第三个 Kafka 容器
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"  # 将主机的 9094 端口映射到容器的 9094 端口
      - "29094:29094"  # 将主机的 29094 端口映射到容器的 29094 端口
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 3  # 设置 Kafka Broker 的 ID 为 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data

3:/kafka/data
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka-ui:  # 定义 Kafka UI 容器
    container_name: kafka-ui  # 设置容器名称为 kafka-ui
    image: provectuslabs/kafka-ui:latest  # 使用 Provectus 提供的 Kafka UI 镜像
    ports:
      - 9999:8080  # 将主机的 9999 端口映射到容器的 8080 端口
    depends_on:
      - kafka1
      - kafka2
      - kafka3  # 在启动 kafka-ui 容器前,确保 kafka1、kafka2 和 kafka3 容器已经启动
    environment:
      KAFKA_CLUSTERS_0_NAME: k1  # 定义第一个 Kafka 集群的名称
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092  # 定义第一个 Kafka 集群的引导服务器地址
      KAFKA_CLUSTERS_1_NAME: k2  # 定义第二个 Kafka 集群的名称
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093  # 定义第二个 Kafka 集群的引导服务器地址
      KAFKA_CLUSTERS_2_NAME: k3  # 定义第三个 Kafka 集群的名称
      KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094  # 定义第三个 Kafka 集群的引导服务器地址

注意:
docker-compose.yml 文件的 version 字段在 Docker Compose V2 中是多余的,虽然不会导致问题,但可以去掉或更新。你可以将 docker-compose.yml 文件中的版本声明删除或使用最新版本(例如,不指定版本)。

2.2 执行 Docker-Compose 命令

(1) 打开终端或命令提示符,导航到包含 docker-compose.yml 文件的目录。

  • 可以在文件资源管理器中打开包含 docker-compose.yml 文件的文件夹,然后在地址栏中输入 cmd 并按 Enter 键,这样会在该文件夹中打开命令提示符。
  • 或者,在终端中使用 cd 命令导航到该文件夹,例如:
    cd path/to/your/docker-compose-folder
    

(2) 运行以下命令启动所有服务:

docker-compose up -d

2.3 检查服务状态

运行以下命令查看所有服务的状态:

docker-compose ps

示例输出:

       Name                      Command               State                Ports
----------------------------------------------------------------------------------------------
kafka1              /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp, 29092/tcp
kafka2              /etc/confluent/docker/run        Up      0.0.0.0:9093->9093/tcp, 29093/tcp
kafka3              /etc/confluent/docker/run        Up      0.0.0.0:9094->9094/tcp, 29094/tcp
zoo1                /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp
zoo2                /etc/confluent/docker/run        Up      0.0.0.0:2182->2182/tcp
zoo3                /etc/confluent/docker/run        Up      0.0.0.0:2183->2183/tcp
kafka-ui            /bin/sh -c /usr/bin/dumb- ...    Up      0.0.0.0:9999->8080/tcp

解释:此命令显示当前运行的所有 Docker 容器及其状态。State 列表示容器是否正在运行,Ports 列显示容器端口映射情况。

2.4 自定义项目名称

  • 如果运行 docker-compose up -d 时出现project name must not be empty问题,则需要指定项目名称。执行以下命令:
     docker-compose -p my_project_name_docker up -d
    

示例解释:

  • -p my_project_name_docker:指定自定义项目名称 my_project_name_docker
  • up -d:启动所有服务,并在后台运行。

此命令可以避免项目名称冲突或其他与项目名称相关的问题。

2.5 错误解决方法

在执行yml文件时,出现

connecting to 127.0.0.1:10809: connectex: No connection could be made because the target machine actively refused it.

下面是解决方法:

(1) 检查并禁用代理设置

① 打开 Docker Desktop 设置:
- 单击系统托盘中的 Docker 图标,选择“Settings”或“Preferences”。
② 检查代理设置:
- 选择“Resources” -> “Proxies”。
- 确认 HTTP Proxy 和 HTTPS Proxy 字段为空。如果不为空,请清空它们。
③ 保存设置并重启 Docker。

(2) 配置 Docker 镜像加速器

① 打开 Docker Desktop 设置:
- 单击系统托盘中的 Docker 图标,选择“Settings”或“Preferences”。
② 配置镜像加速器:
- 选择“Docker Engine”或“Daemon”。
- 在 registry-mirrors 字段中添加国内的镜像加速器 URL。例如:

{
  "registry-mirrors": [
    "http://hub-mirror.c.163.com",
    "https://mirror.baidubce.com",
    "https://registry.docker-cn.com",
    "https://hub.uuuadc.top",
    "https://docker.anyhub.us.kg",
    "https://dockerhub.jobcher.com",
    "https://dockerhub.icu",
    "https://docker.ckyl.me"
  ]
}

③ 保存设置并重启 Docker。

(3) 手动拉取镜像

① 清除代理环境变量:

$env:HTTP_PROXY=""
$env:HTTPS_PROXY=""

② 手动拉取镜像:

docker pull confluentinc/cp-zookeeper:7.3.2
docker pull confluentinc/cp-kafka:7.3.2
docker pull provectuslabs/kafka-ui:latest
(4) 网络连接

这是最重要的一点,在我尝试了所有方法仍然有问题之后,我更换了更快的网络,此时运行成功。

2.6 最终成功

  • 浏览器打开 localhost:9999 可以访问 UI 后台,通过后台新建 topic 来验证集群是否工作。

3. 启动 Kafka

3.1 启动 Docker Desktop(不要忘记这一步!)

3.2 转到 docker-compose.yml 文件所在目录

可以在poweshell中输入:

cd path_to_the_file

3.3 执行命令

docker-compose -p my_project_name_docker up -d

4. PowerShell 命令字段解释

4.1 启动 Zookeeper 和 Kafka 服务的命令
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

(1)bin/zookeeper-server-start.sh:

  • bin/:表示脚本文件位于 bin 目录下。
  • zookeeper-server-start.sh:启动 Zookeeper 服务的脚本文件,用于启动 Zookeeper 服务。
  • config/zookeeper.properties:指定 Zookeeper 的配置文件路径,配置 Zookeeper 的启动参数。

(2)bin/kafka-server-start.sh:

  • bin/:表示脚本文件位于 bin 目录下。
  • kafka-server-start.sh:启动 Kafka 服务的脚本文件,用于启动 Kafka 服务。
  • config/server.properties:指定 Kafka 服务器的配置文件路径,配置 Kafka 的启动参数。
4.2 创建 Kafka 主题的命令
bin/kafka-topics.sh --create --topic test_topic1 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

(1)bin/kafka-topics.sh:

  • bin/:表示脚本文件位于 bin 目录下。
  • kafka-topics.sh:管理 Kafka 主题的脚本文件,用于创建、删除、列出 Kafka 主题。

(2)–create:

  • 创建一个新的 Kafka 主题。

(3)–topic:

  • test_topic1:指定要创建的主题名称为 test_topic1

(4)–bootstrap-server:

  • localhost:9092:指定 Kafka 集群的引导服务器地址,通常是主机名或 IP 地址加端口号(在这里是本地主机 localhost 和端口 9092)。

(5)–partitions:

  • 1:指定创建主题时的分区数量。分区是 Kafka 中的并行单位,分区越多,主题可以处理的并行任务就越多。

(6)–replication-factor:

  • 1:指定主题的副本因子,即每个分区的副本数量。副本因子决定了数据的容错性。值越大,Kafka 集群的容错能力越强。

5. 创建主题和配置步骤

目的:启动 Kafka 集群并创建 Kafka 主题,以便在 Java 应用程序中使用 Kafka 进行消息传递。

5.1 启动 Zookeeper 和 Kafka 服务

(1)打开 PowerShell 或终端。

(2)进入 Kafka 安装目录,依次运行以下命令来启动 Zookeeper 和 Kafka 服务:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
  • 这两个命令分别用于启动 Zookeeper 和 Kafka 服务。第一个命令启动 Zookeeper,第二个命令启动 Kafka。Zookeeper 用于管理 Kafka 集群的元数据,而 Kafka 是消息队列服务。
5.2 创建 Kafka 主题

(1)在同一个终端,运行以下命令来创建 Kafka 主题:

bin/kafka-topics.sh --create --topic test_topic1 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  • 此命令用于创建一个名为 test_topic1 的 Kafka 主题,指定了引导服务器地址、分区数量和副本因子。Kafka 主题是消息的逻辑集合,分区是并行处理的单位,副本因子决定数据的容错性。

6. Spring boot配置文件示例和目录结构

确保在 src/main/resources/application.properties 文件中以及项目目录的其他相关配置文件中已配置 Kafka 相关属性,以便 Spring Boot 应用程序能够连接到 Kafka:

6.1 项目目录结构

src
 └── main
     ├── java
     │   └── com
     │       └── example
     │           └── kafkastudy
     │               ├── KafkaStudyApplication.java
     │               ├── KafkaConfig.java
     │               ├── TicketController.java
     │               ├── SmsNotificationService.java
     │               ├── OrderRecordService.java
     │               └── TicketBusinessService.java
     └── resources
         ├── static
         ├── templates
         └── application.properties

6.2 application.properties 文件配置

src/main/resources/application.properties 文件中配置 Kafka 相关属性:

spring.application.name=KafkaStudy1  # 设置 Spring 应用程序的名称

spring.kafka.bootstrap-servers=localhost:9092  # 设置 Kafka 服务器地址
spring.kafka.consumer.group-id=group_id  # 设置 Kafka 消费者组 ID
spring.kafka.consumer.auto-offset-reset=earliest  # 设置消费者偏移量重置策略为 earliest(从最早的消息开始消费)
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置键反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置值反序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer  # 设置键序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer  # 设置值序列化器

kafka.topic=test_topic1  # 指定要使用的 Kafka 主题名称
kafka.group_id=01  # 指定消费者组 ID

6.3 KafkaConfig.java 文件配置

src/main/java/com/example/kafkastudy/KafkaConfig.java 文件中配置 Kafka:

package com.example.kafkastudy;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public KafkaAdmin admin() {
        KafkaAdmin admin = new KafkaAdmin(kafkaProperties.buildAdminProperties());
        admin.setFatalIfBrokerNotAvailable(true);
        return admin;
    }

    @Bean
    public NewTopic ticketBookingTopic() {
        return new NewTopic("ticket_booking", 1, (short) 1);
    }
}
  • KafkaConfig.java 文件中定义了 Kafka 的配置,包括创建 KafkaAdmin 和 NewTopic 的 Bean。

7. 运行 Spring Boot 应用程序

7.1 打开 IntelliJ IDEA

7.2 右键点击 KafkaStudyApplication 类,选择 Run 'KafkaStudyApplication' 来启动 Spring Boot 应用程序

  • KafkaStudyApplication 类是 Spring Boot 应用的主入口,运行它将启动整个应用程序,并使其能够与 Kafka 集群进行交互。

8. 运行测试

8.1 右键点击 KafkaProducerTest 类,选择 Run 'KafkaProducerTest' 来运行测试

  • KafkaProducerTest 类包含测试用例,用于验证 Kafka 生产者的功能是否正常运行。这些测试用例将消息发送到 Kafka 主题,并验证消息是否成功传递。
    通过这些步骤,您可以启动 Kafka 服务、创建主题并运行 Spring Boot 应用程序来发送和接收 Kafka 消息。
08-07 19:04