官方介绍
Flink中的API
DataStream/DateSet API
预定义的 Source 和 Sink
官方文档
DataStream/DateSet API开发
从本篇开始,增加DataStream/DateSet API演示内容,在原有的工程基础上,扩展一个connectors模块;此模块会演示以下几个组件简单使用;
- elasticsearch
- file(text, csv)
- kafka
- jdbc (mysql)
- rabbitmq
- redis
新增connectors模块
在当前工程中,创建名称为connectors的maven工程模块
pom.xml
<artifactId>connectors</artifactId>
<dependencies>
<!-- Flink jdbc依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!-- mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- kafka依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- redis依赖 -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!-- rabbitMq依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- elasticsearch6依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
刷新工程maven,下载相关功能依赖组件包;
创建用户表(演示使用)
-- 数所据库 flink 下创建用户表
CREATE TABLE `t_user` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(3) DEFAULT NULL,
`sex` int(2) DEFAULT NULL,
`address` varchar(40) DEFAULT NULL,
`createTime` timestamp NULL DEFAULT NULL,
`createTimeSeries` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
创建实体Bean(演示使用)
TUser.java
package com.flink.examples;
/**
* @Description t_user表数据封装类
*/
public class TUser {
private Integer id;
private String name;
private Integer age;
private Integer sex;
private String address;
private Long createTimeSeries;
public TUser(){}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Integer getSex() {
return sex;
}
public void setSex(Integer sex) {
this.sex = sex;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Long getCreateTimeSeries() {
return createTimeSeries;
}
public void setCreateTimeSeries(Long createTimeSeries) {
this.createTimeSeries = createTimeSeries;
}
@Override
public String toString() {
return "TUser{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
", sex=" + sex +
", address='" + address + '\'' +
", createTimeSeries=" + createTimeSeries +
'}';
}
}
TCount.java
package com.flink.examples;
/**
* @Description 统计表
*/
public class TCount {
/**
* 性别
*/
private Integer sex;
/**
* 数量
*/
private Integer num;
public TCount(){}
public TCount(Integer sex, Integer num){
this.sex = sex;
this.num = num;
}
public Integer getSex() {
return sex;
}
public void setSex(Integer sex) {
this.sex = sex;
}
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
}
工程模块
后续关于DataStream/DateSet API演示示例均在此connectors模块下进行基础上开发;
源码下载