flink版本1.14
flinksql 来源于kafka json格式数据
变化的表
业务中sql可能不完全满足使用,需要转换成DataStream 更灵活一些,所以需要互相转换,发挥各自的优势。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//两个来自kafka的表
tEnv.executeSql(CreateTableSQL.kafkaTableInfo);
tEnv.executeSql(CreateTableSQL.kafkaTablePerson);
//join两个表
Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);
tEnv.toDataStream(tableResult, Row.class);
dataStream.flatMap(new FlatMapFunction<Row, Object>() {
@Override
public void flatMap(Row value, Collector<Object> out) throws Exception {
String s = value.getField(0) + ":" + value.getField(1);
out.collect(s);
}
}).print();
env.execute();
会发现报错
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
那是因为在table-to-stream中,数据在发生变化,因此需要用toChangelogStream来转换
修改成如下内容:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//两个来自kafka的表
tEnv.executeSql(CreateTableSQL.kafkaTableInfo);
tEnv.executeSql(CreateTableSQL.kafkaTablePerson);
//join两个表
Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);
tEnv.toChangelogStream(tableResult).flatMap(new FlatMapFunction<Row, Object>() {
@Override
public void flatMap(Row value, Collector<Object> out) throws Exception {
String s = value.getField(0) + ":" + value.getField(1);
out.collect(s);
}
}).print();
env.execute();
在网上找了一圈,没有类似的文章,自己做个记录。
这里虽然也可以用createTemporaryView来把表转换成DataStream,但是这种方式的表是固定的,在实际应用的使用场景中,还是toChangelogStream的应用更广一些。
executeSql() 与 sqlQuery()
你可能会发现 有的地方用的是executeSql() 有的地方用的是 sqlQuery() ,这两者是什么不同呢,对此我特意去看了一下源码里的注释。
sqlQuery(String query)
针对sqlQuery()他是这么说的
评估对已注册表的 SQL 查询并返回一个 Table 对象,该对象描述了进一步转换的管道。
查询引用的所有表和其他对象都必须在 TableEnvironment 中注册。例如,使用 createTemporaryView(String, Table)) 来引用 Table 对象或使用 createTemporarySystemFunction(String, Class) 来引用函数。
或者,当调用 Table.toString() 方法时,会自动注册 Table 对象,例如当它被嵌入到字符串中时。因此,SQL 查询可以直接内联(即匿名)引用 Table 对象,如下所示:
Table table = ...;
String tableName = table.toString();
// 表没有注册到表环境
tEnv.sqlQuery("SELECT * FROM " + tableName + " WHERE a > 12");
请注意,返回的 Table 是一个 API 对象,仅包含管道描述。它实际上对应于 SQL 术语中的视图。调用 Table.execute() 触发执行或直接使用 executeSql(String)。
executeSql(String statement)
执行给定的单个语句并返回执行结果。
语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。对于 DML 和 DQL,此方法在提交作业后返回 TableResult。对于 DDL 和 DCL 语句,操作完成后将返回 TableResult。
如果多个管道应将数据作为单个执行的一部分插入到一个或多个接收器表中,请使用 StatementSet(请参阅 createStatementSet())。
默认情况下,所有 DML 操作都是异步执行的。使用 TableResult.await() 或 TableResult.getJobClient() 来监控执行。设置 TableConfigOptions.TABLE_DML_SYNC 以始终同步执行。
需要强调的是,这两个是不能强行互相转换的,出现以下报错
java.lang.ClassCastException:org.apache.flink.table.api.internal.TableResultImpl cannot be cast to org.apache.flink.table.api.Table
还有一点, sqlQuery(String query)不能执行复杂的语句,会出现以下报错
Unsupported SQL query! sqlQuery() only accepts a single SQL query of type SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.
固定的表
如果需求只是一个固定的表可以通过这种下面的案例
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// connector doris的一个表 PerCountName
tEnv.executeSql(CreateTableSQL.PerCountNDoris);
Table perCountName = tEnv.from("PerCountName");
// PerCountName表对应的实体类 PerCountName.class
tEnv.toDataStream(perCountName, PerCountName.class).flatMap(new FlatMapFunction<PerCountName, Object>() {
@Override
public void flatMap(PerCountName value, Collector<Object> out) throws Exception {
String s = value.getName()+"->"+value.getNum();
out.collect(s);
}
}).print();
env.execute();