2014 年的时候,Kafka 的三个主要开发人员从 LinkedIn 出来创业,开了一家叫作 Confluent 的公司。和其他大数据公司类似,Confluent 的产品叫作 Confluent Platform。这个产品的核心是 Kafka,分为三个版本:Confluent Open Source、Confluent Enterprise 和 Confluent Cloud。
这里就不过多说confluent的背景,详细的情况可以查看官方网站https://www.confluent.io,这里主要介绍,如利用confluent平台实时的捕获mysql中的数据。
安装jdbc-mysql-driver
wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz
tar xzvf mysql-connector-java-5.1.39.tar.gz
sed -i '$a export CLASSPATH=/root/mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar:$CLASSPATH' /etc/profile
source /etc/profile
安装confluent
下载confluent的tar包解压安装。
cd /usr/local
# tar zxvf confluent.tar.gz
confluent平台各组件的默认端口号
Component | Default Port |
Zookeeper | 2181 |
Apache Kafka brokers (plain text) | 9092 |
Schema Registry REST API | 8081 |
REST Proxy | 8082 |
Kafka Connect REST API | 8083 |
Confluent Control Center | 9021 |
confluent的mysql数据源配置
创建一个confluent从mysql加载数据的配置文件quickstart-mysql.properties
name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.user=root
connection.password=root
connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true
#数据表白名单
#table.whitelist=t1
mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id
#topic的前缀,confulent平台会为每张表创建一个topic,topic的名称为前缀+表名
topic.prefix=mysql-test-
自定义查询模式:
如果使用上面的配置来启动服务,则confluent平台将会监测拉取所有表的数据,有时候可能并不需要这样做,confulent平台提供了自定义查询模式。配置参考如下:
#User defined connector instance name
name=mysql-whitelist-timestamp-source
#The class implementing the connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
#Maximum number of tasks to run for this connector instance
tasks.max=10
connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true
connection.user=root
connection.password=root
query=SELECT f.`name`,p.price,f.create_time from foods f join price p on (f.id = p.food_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=mysql-joined-data
query模式下使用where查询语句容易造成kafka拼接sql错误,最好采用join
1.启动zookeeper
因为zookeeper是一个长期的服务,最好在后台运行,同时需要有写权限到/var/lib在这一步以及之后的步骤,如果没有权限请查看安装confulent的用户是否具有/var/lib的写权限
# cd /usr/local/confulent-3.2.2
# ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &
# 以守护进程方式启动
# sudo confluent-3.2.2/bin/zookeeper-server-start -daemon /etc/kafka/zookeeper.properties
停止zookeeper
$ ./bin/zookeeper-server-stop
2.启动kafka
# cd /usr/local/confluent-3.2.2
# ./bin/kafka-server-start ./etc/kafka/server.properties &
停止kafka服务
./bin/kafka-server-stop
3.启动Schema Registry
# cd /usr/local/confluent-3.2.2
# ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
停止schema-registry
# ./bin/schema-registry-stop
4.启动监听mysql数据的producer
# cd /usr/local/confluent-3.2.2
# ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/quickstart-mysql.properties &
5.启动消费数据的consumer
# cd /usr/local/confluent-3.2.2
#./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic mysql-test-t1 --from-beginning
测试sql
DROP TABLE IF EXISTS `t1`;
CREATE TABLE `t1` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(200) DEFAULT NULL,
`createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`modified` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t1
-- ----------------------------
INSERT INTO `t1` VALUES ('1', 'aa', '2017-07-10 08:03:51', '2017-07-10 23:03:30');
INSERT INTO `t1` VALUES ('3', 'bb', '2017-07-10 08:03:45', '2017-07-10 23:03:34');
INSERT INTO `t1` VALUES ('4', '年内', '2017-07-10 08:05:51', '2017-07-10 23:05:45');
INSERT INTO `t1` VALUES ('5', '年内', '2017-07-10 08:44:28', '2017-07-10 23:15:45');
INSERT INTO `t1` VALUES ('6', '公共', '2017-07-18 06:05:11', '2017-07-18 21:04:58');
INSERT INTO `t1` VALUES ('7', '哈哈', '2017-07-18 19:05:04', '2017-07-18 07:32:13');
INSERT INTO `t1` VALUES ('8', '公共经济', '2017-07-27 20:33:10', '2017-07-18 07:34:43');
数据插入语句
INSERT INTO `t1` (name,createtime,modified)VALUES ('公共经济2', '2017-07-27 20:33:10', '2017-07-18 07:34:43');
插入新数据后将会在consumer端实时输出我们插入的数据
{"id":7,"name":{"string":"哈哈"},"createtime":1500429904000,"modified":1500388333000}
{"id":8,"name":{"string":"公共经济"},"createtime":1501212790000,"modified":1500388483000}
{"id":9,"name":{"string":"公共经济1"},"createtime":1501212790000,"modified":1500388483000}
{"id":10,"name":{"string":"公共经济2"},"createtime":1501212790000,"modified":1500388483000}
关于confluent的使用国内目前使用似乎很少,相关的中文文档也极少。本文是去年7月份我在做实时数据交换技术调研是根据官方文档实践的记录。